first batch of license fixes (boring)
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  */
15
16 /**
17  * @file psycstore/plugin_psycstore_sqlite.c
18  * @brief sqlite-based psycstore backend
19  * @author Gabor X Toth
20  * @author Christian Grothoff
21  */
22
23 /*
24  * FIXME: SQLite3 only supports signed 64-bit integers natively,
25  *        thus it can only store 63 bits of the uint64_t's.
26  */
27
28 #include "platform.h"
29 #include "gnunet_psycstore_plugin.h"
30 #include "gnunet_psycstore_service.h"
31 #include "gnunet_multicast_service.h"
32 #include "gnunet_crypto_lib.h"
33 #include "gnunet_psyc_util_lib.h"
34 #include "psycstore.h"
35 #include <sqlite3.h>
36
37 /**
38  * After how many ms "busy" should a DB operation fail for good?  A
39  * low value makes sure that we are more responsive to requests
40  * (especially PUTs).  A high value guarantees a higher success rate
41  * (SELECTs in iterate can take several seconds despite LIMIT=1).
42  *
43  * The default value of 1s should ensure that users do not experience
44  * huge latencies while at the same time allowing operations to
45  * succeed with reasonable probability.
46  */
47 #define BUSY_TIMEOUT_MS 1000
48
49 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
50
51 /**
52  * Log an error message at log-level 'level' that indicates
53  * a failure of the command 'cmd' on file 'filename'
54  * with the message given by strerror(errno).
55  */
56 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
57
58 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
59
60 enum Transactions {
61   TRANSACTION_NONE = 0,
62   TRANSACTION_STATE_MODIFY,
63   TRANSACTION_STATE_SYNC,
64 };
65
66 /**
67  * Context for all functions in this plugin.
68  */
69 struct Plugin
70 {
71
72   const struct GNUNET_CONFIGURATION_Handle *cfg;
73
74   /**
75    * Database filename.
76    */
77   char *fn;
78
79   /**
80    * Native SQLite database handle.
81    */
82   sqlite3 *dbh;
83
84   /**
85    * Current transaction.
86    */
87   enum Transactions transaction;
88
89   sqlite3_stmt *transaction_begin;
90
91   sqlite3_stmt *transaction_commit;
92
93   sqlite3_stmt *transaction_rollback;
94
95   /**
96    * Precompiled SQL for channel_key_store()
97    */
98   sqlite3_stmt *insert_channel_key;
99
100   /**
101    * Precompiled SQL for slave_key_store()
102    */
103   sqlite3_stmt *insert_slave_key;
104
105
106   /**
107    * Precompiled SQL for membership_store()
108    */
109   sqlite3_stmt *insert_membership;
110
111   /**
112    * Precompiled SQL for membership_test()
113    */
114   sqlite3_stmt *select_membership;
115
116
117   /**
118    * Precompiled SQL for fragment_store()
119    */
120   sqlite3_stmt *insert_fragment;
121
122   /**
123    * Precompiled SQL for message_add_flags()
124    */
125   sqlite3_stmt *update_message_flags;
126
127   /**
128    * Precompiled SQL for fragment_get()
129    */
130   sqlite3_stmt *select_fragments;
131
132   /**
133    * Precompiled SQL for fragment_get()
134    */
135   sqlite3_stmt *select_latest_fragments;
136
137   /**
138    * Precompiled SQL for message_get()
139    */
140   sqlite3_stmt *select_messages;
141
142   /**
143    * Precompiled SQL for message_get()
144    */
145   sqlite3_stmt *select_latest_messages;
146
147   /**
148    * Precompiled SQL for message_get_fragment()
149    */
150   sqlite3_stmt *select_message_fragment;
151
152   /**
153    * Precompiled SQL for counters_get_message()
154    */
155   sqlite3_stmt *select_counters_message;
156
157   /**
158    * Precompiled SQL for counters_get_state()
159    */
160   sqlite3_stmt *select_counters_state;
161
162   /**
163    * Precompiled SQL for state_modify_end()
164    */
165   sqlite3_stmt *update_state_hash_message_id;
166
167   /**
168    * Precompiled SQL for state_sync_end()
169    */
170   sqlite3_stmt *update_max_state_message_id;
171
172   /**
173    * Precompiled SQL for state_modify_op()
174    */
175   sqlite3_stmt *insert_state_current;
176
177   /**
178    * Precompiled SQL for state_modify_end()
179    */
180   sqlite3_stmt *delete_state_empty;
181
182   /**
183    * Precompiled SQL for state_set_signed()
184    */
185   sqlite3_stmt *update_state_signed;
186
187   /**
188    * Precompiled SQL for state_sync()
189    */
190   sqlite3_stmt *insert_state_sync;
191
192   /**
193    * Precompiled SQL for state_sync()
194    */
195   sqlite3_stmt *delete_state;
196
197   /**
198    * Precompiled SQL for state_sync()
199    */
200   sqlite3_stmt *insert_state_from_sync;
201
202   /**
203    * Precompiled SQL for state_sync()
204    */
205   sqlite3_stmt *delete_state_sync;
206
207   /**
208    * Precompiled SQL for state_get_signed()
209    */
210   sqlite3_stmt *select_state_signed;
211
212   /**
213    * Precompiled SQL for state_get()
214    */
215   sqlite3_stmt *select_state_one;
216
217   /**
218    * Precompiled SQL for state_get_prefix()
219    */
220   sqlite3_stmt *select_state_prefix;
221
222 };
223
224 #if DEBUG_PSYCSTORE
225
226 static void
227 sql_trace (void *cls, const char *sql)
228 {
229   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
230 }
231
232 #endif
233
234 /**
235  * @brief Prepare a SQL statement
236  *
237  * @param dbh handle to the database
238  * @param sql SQL statement, UTF-8 encoded
239  * @param stmt set to the prepared statement
240  * @return 0 on success
241  */
242 static int
243 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
244 {
245   char *tail;
246   int result;
247
248   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
249                                (const char **) &tail);
250   LOG (GNUNET_ERROR_TYPE_DEBUG,
251        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
252   if (result != SQLITE_OK)
253     LOG (GNUNET_ERROR_TYPE_ERROR,
254          _("Error preparing SQL query: %s\n  %s\n"),
255          sqlite3_errmsg (dbh), sql);
256   return result;
257 }
258
259
260 /**
261  * @brief Prepare a SQL statement
262  *
263  * @param dbh handle to the database
264  * @param sql SQL statement, UTF-8 encoded
265  * @return 0 on success
266  */
267 static int
268 sql_exec (sqlite3 *dbh, const char *sql)
269 {
270   int result;
271
272   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
273   LOG (GNUNET_ERROR_TYPE_DEBUG,
274        "Executed `%s' / %d\n", sql, result);
275   if (result != SQLITE_OK)
276     LOG (GNUNET_ERROR_TYPE_ERROR,
277          _("Error executing SQL query: %s\n  %s\n"),
278          sqlite3_errmsg (dbh), sql);
279   return result;
280 }
281
282
283 /**
284  * Initialize the database connections and associated
285  * data structures (create tables and indices
286  * as needed as well).
287  *
288  * @param plugin the plugin context (state for this module)
289  * @return GNUNET_OK on success
290  */
291 static int
292 database_setup (struct Plugin *plugin)
293 {
294   char *filename;
295
296   if (GNUNET_OK !=
297       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
298                                                "FILENAME", &filename))
299   {
300     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
301                                "psycstore-sqlite", "FILENAME");
302     return GNUNET_SYSERR;
303   }
304   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
305   {
306     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
307     {
308       GNUNET_break (0);
309       GNUNET_free (filename);
310       return GNUNET_SYSERR;
311     }
312   }
313   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
314   plugin->fn = filename;
315
316   /* Open database and precompile statements */
317   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
318   {
319     LOG (GNUNET_ERROR_TYPE_ERROR,
320          _("Unable to initialize SQLite: %s.\n"),
321          sqlite3_errmsg (plugin->dbh));
322     return GNUNET_SYSERR;
323   }
324
325 #if DEBUG_PSYCSTORE
326   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
327 #endif
328
329   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
330   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
331   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
332   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
333   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
334 #if ! DEBUG_PSYCSTORE
335   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
336 #endif
337   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
338
339   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
340
341   /* Create tables */
342
343   sql_exec (plugin->dbh,
344             "CREATE TABLE IF NOT EXISTS channels (\n"
345             "  id INTEGER PRIMARY KEY,\n"
346             "  pub_key BLOB(32) UNIQUE,\n"
347             "  max_state_message_id INTEGER,\n" // last applied state message ID
348             "  state_hash_message_id INTEGER\n" // last message ID with a state hash
349             ");");
350
351   sql_exec (plugin->dbh,
352             "CREATE TABLE IF NOT EXISTS slaves (\n"
353             "  id INTEGER PRIMARY KEY,\n"
354             "  pub_key BLOB(32) UNIQUE\n"
355             ");");
356
357   sql_exec (plugin->dbh,
358             "CREATE TABLE IF NOT EXISTS membership (\n"
359             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
360             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
361             "  did_join INTEGER NOT NULL,\n"
362             "  announced_at INTEGER NOT NULL,\n"
363             "  effective_since INTEGER NOT NULL,\n"
364             "  group_generation INTEGER NOT NULL\n"
365             ");");
366   sql_exec (plugin->dbh,
367             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
368             "ON membership (channel_id, slave_id);");
369
370   /** @todo messages table: add method_name column */
371   sql_exec (plugin->dbh,
372             "CREATE TABLE IF NOT EXISTS messages (\n"
373             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
374             "  hop_counter INTEGER NOT NULL,\n"
375             "  signature BLOB,\n"
376             "  purpose BLOB,\n"
377             "  fragment_id INTEGER NOT NULL,\n"
378             "  fragment_offset INTEGER NOT NULL,\n"
379             "  message_id INTEGER NOT NULL,\n"
380             "  group_generation INTEGER NOT NULL,\n"
381             "  multicast_flags INTEGER NOT NULL,\n"
382             "  psycstore_flags INTEGER NOT NULL,\n"
383             "  data BLOB,\n"
384             "  PRIMARY KEY (channel_id, fragment_id),\n"
385             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
386             ");");
387
388   sql_exec (plugin->dbh,
389             "CREATE TABLE IF NOT EXISTS state (\n"
390             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
391             "  name TEXT NOT NULL,\n"
392             "  value_current BLOB,\n"
393             "  value_signed BLOB,\n"
394             "  PRIMARY KEY (channel_id, name)\n"
395             ");");
396
397   sql_exec (plugin->dbh,
398             "CREATE TABLE IF NOT EXISTS state_sync (\n"
399             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
400             "  name TEXT NOT NULL,\n"
401             "  value BLOB,\n"
402             "  PRIMARY KEY (channel_id, name)\n"
403             ");");
404
405   /* Prepare statements */
406
407   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
408
409   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
410
411   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
412
413   sql_prepare (plugin->dbh,
414                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
415                &plugin->insert_channel_key);
416
417   sql_prepare (plugin->dbh,
418                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
419                &plugin->insert_slave_key);
420
421   sql_prepare (plugin->dbh,
422                "INSERT INTO membership\n"
423                " (channel_id, slave_id, did_join, announced_at,\n"
424                "  effective_since, group_generation)\n"
425                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
426                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
427                "        ?, ?, ?, ?);",
428                &plugin->insert_membership);
429
430   sql_prepare (plugin->dbh,
431                "SELECT did_join FROM membership\n"
432                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
433                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
434                "      AND effective_since <= ? AND did_join = 1\n"
435                "ORDER BY announced_at DESC LIMIT 1;",
436                &plugin->select_membership);
437
438   sql_prepare (plugin->dbh,
439                "INSERT OR IGNORE INTO messages\n"
440                " (channel_id, hop_counter, signature, purpose,\n"
441                "  fragment_id, fragment_offset, message_id,\n"
442                "  group_generation, multicast_flags, psycstore_flags, data)\n"
443                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
444                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
445                &plugin->insert_fragment);
446
447   sql_prepare (plugin->dbh,
448                "UPDATE messages\n"
449                "SET psycstore_flags = psycstore_flags | ?\n"
450                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
451                "      AND message_id = ? AND fragment_offset = 0;",
452                &plugin->update_message_flags);
453
454   sql_prepare (plugin->dbh,
455                "SELECT hop_counter, signature, purpose, fragment_id,\n"
456                "       fragment_offset, message_id, group_generation,\n"
457                "       multicast_flags, psycstore_flags, data\n"
458                "FROM messages\n"
459                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
460                "      AND ? <= fragment_id AND fragment_id <= ?;",
461                &plugin->select_fragments);
462
463   /** @todo select_messages: add method_prefix filter */
464   sql_prepare (plugin->dbh,
465                "SELECT hop_counter, signature, purpose, fragment_id,\n"
466                "       fragment_offset, message_id, group_generation,\n"
467                "       multicast_flags, psycstore_flags, data\n"
468                "FROM messages\n"
469                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
470                "      AND ? <= message_id AND message_id <= ?"
471                "LIMIT ?;",
472                &plugin->select_messages);
473
474   sql_prepare (plugin->dbh,
475                "SELECT * FROM\n"
476                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
477                "        fragment_offset, message_id, group_generation,\n"
478                "        multicast_flags, psycstore_flags, data\n"
479                " FROM messages\n"
480                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
481                " ORDER BY fragment_id DESC\n"
482                " LIMIT ?)\n"
483                "ORDER BY fragment_id;",
484                &plugin->select_latest_fragments);
485
486   /** @todo select_latest_messages: add method_prefix filter */
487   sql_prepare (plugin->dbh,
488                "SELECT hop_counter, signature, purpose, fragment_id,\n"
489                "       fragment_offset, message_id, group_generation,\n"
490                "        multicast_flags, psycstore_flags, data\n"
491                "FROM messages\n"
492                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
493                "      AND message_id IN\n"
494                "      (SELECT message_id\n"
495                "       FROM messages\n"
496                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
497                "       GROUP BY message_id\n"
498                "       ORDER BY message_id\n"
499                "       DESC LIMIT ?)\n"
500                "ORDER BY fragment_id;",
501                &plugin->select_latest_messages);
502
503   sql_prepare (plugin->dbh,
504                "SELECT hop_counter, signature, purpose, fragment_id,\n"
505                "       fragment_offset, message_id, group_generation,\n"
506                "       multicast_flags, psycstore_flags, data\n"
507                "FROM messages\n"
508                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
509                "      AND message_id = ? AND fragment_offset = ?;",
510                &plugin->select_message_fragment);
511
512   sql_prepare (plugin->dbh,
513                "SELECT fragment_id, message_id, group_generation\n"
514                "FROM messages\n"
515                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
516                "ORDER BY fragment_id DESC LIMIT 1;",
517                &plugin->select_counters_message);
518
519   sql_prepare (plugin->dbh,
520                "SELECT max_state_message_id\n"
521                "FROM channels\n"
522                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
523                &plugin->select_counters_state);
524
525   sql_prepare (plugin->dbh,
526                "UPDATE channels\n"
527                "SET max_state_message_id = ?\n"
528                "WHERE pub_key = ?;",
529                &plugin->update_max_state_message_id);
530
531   sql_prepare (plugin->dbh,
532                "UPDATE channels\n"
533                "SET state_hash_message_id = ?\n"
534                "WHERE pub_key = ?;",
535                &plugin->update_state_hash_message_id);
536
537   sql_prepare (plugin->dbh,
538                "INSERT OR REPLACE INTO state\n"
539                "  (channel_id, name, value_current, value_signed)\n"
540                "SELECT new.channel_id, new.name,\n"
541                "       new.value_current, old.value_signed\n"
542                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
543                "             AS channel_id,\n"
544                "             ? AS name, ? AS value_current) AS new\n"
545                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
546                "           FROM state) AS old\n"
547                "ON new.channel_id = old.channel_id AND new.name = old.name;",
548                &plugin->insert_state_current);
549
550   sql_prepare (plugin->dbh,
551                "DELETE FROM state\n"
552                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
553                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
554                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
555                &plugin->delete_state_empty);
556
557   sql_prepare (plugin->dbh,
558                "UPDATE state\n"
559                "SET value_signed = value_current\n"
560                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
561                &plugin->update_state_signed);
562
563   sql_prepare (plugin->dbh,
564                "DELETE FROM state\n"
565                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
566                &plugin->delete_state);
567
568   sql_prepare (plugin->dbh,
569                "INSERT INTO state_sync (channel_id, name, value)\n"
570                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
571                &plugin->insert_state_sync);
572
573   sql_prepare (plugin->dbh,
574                "INSERT INTO state\n"
575                " (channel_id, name, value_current, value_signed)\n"
576                "SELECT channel_id, name, value, value\n"
577                "FROM state_sync\n"
578                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
579                &plugin->insert_state_from_sync);
580
581   sql_prepare (plugin->dbh,
582                "DELETE FROM state_sync\n"
583                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
584                &plugin->delete_state_sync);
585
586   sql_prepare (plugin->dbh,
587                "SELECT value_current\n"
588                "FROM state\n"
589                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
590                "      AND name = ?;",
591                &plugin->select_state_one);
592
593   sql_prepare (plugin->dbh,
594                "SELECT name, value_current\n"
595                "FROM state\n"
596                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
597                "      AND (name = ? OR substr(name, 1, ?) = ?);",
598                &plugin->select_state_prefix);
599
600   sql_prepare (plugin->dbh,
601                "SELECT name, value_signed\n"
602                "FROM state\n"
603                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
604                "      AND value_signed IS NOT NULL;",
605                &plugin->select_state_signed);
606
607   return GNUNET_OK;
608 }
609
610
611 /**
612  * Shutdown database connection and associate data
613  * structures.
614  * @param plugin the plugin context (state for this module)
615  */
616 static void
617 database_shutdown (struct Plugin *plugin)
618 {
619   int result;
620   sqlite3_stmt *stmt;
621   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
622   {
623     result = sqlite3_finalize (stmt);
624     if (SQLITE_OK != result)
625       LOG (GNUNET_ERROR_TYPE_WARNING,
626            "Failed to close statement %p: %d\n", stmt, result);
627   }
628   if (SQLITE_OK != sqlite3_close (plugin->dbh))
629     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
630
631   GNUNET_free_non_null (plugin->fn);
632 }
633
634 /**
635  * Execute a prepared statement with a @a channel_key argument.
636  *
637  * @param plugin Plugin handle.
638  * @param stmt Statement to execute.
639  * @param channel_key Public key of the channel.
640  *
641  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
642  */
643 static int
644 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
645               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
646 {
647   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
648                                       sizeof (*channel_key), SQLITE_STATIC))
649   {
650     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
651                 "sqlite3_bind");
652   }
653   else if (SQLITE_DONE != sqlite3_step (stmt))
654   {
655     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
656                 "sqlite3_step");
657   }
658
659   if (SQLITE_OK != sqlite3_reset (stmt))
660   {
661     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
662                 "sqlite3_reset");
663     return GNUNET_SYSERR;
664   }
665
666   return GNUNET_OK;
667 }
668
669 /**
670  * Begin a transaction.
671  */
672 static int
673 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
674 {
675   sqlite3_stmt *stmt = plugin->transaction_begin;
676
677   if (SQLITE_DONE != sqlite3_step (stmt))
678   {
679     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
680                 "sqlite3_step");
681   }
682   if (SQLITE_OK != sqlite3_reset (stmt))
683   {
684     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
685                 "sqlite3_reset");
686     return GNUNET_SYSERR;
687   }
688
689   plugin->transaction = transaction;
690   return GNUNET_OK;
691 }
692
693
694 /**
695  * Commit current transaction.
696  */
697 static int
698 transaction_commit (struct Plugin *plugin)
699 {
700   sqlite3_stmt *stmt = plugin->transaction_commit;
701
702   if (SQLITE_DONE != sqlite3_step (stmt))
703   {
704     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
705                 "sqlite3_step");
706   }
707   if (SQLITE_OK != sqlite3_reset (stmt))
708   {
709     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
710                 "sqlite3_reset");
711     return GNUNET_SYSERR;
712   }
713
714   plugin->transaction = TRANSACTION_NONE;
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Roll back current transaction.
721  */
722 static int
723 transaction_rollback (struct Plugin *plugin)
724 {
725   sqlite3_stmt *stmt = plugin->transaction_rollback;
726
727   if (SQLITE_DONE != sqlite3_step (stmt))
728   {
729     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
730                 "sqlite3_step");
731   }
732   if (SQLITE_OK != sqlite3_reset (stmt))
733   {
734     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
735                 "sqlite3_reset");
736     return GNUNET_SYSERR;
737   }
738   plugin->transaction = TRANSACTION_NONE;
739   return GNUNET_OK;
740 }
741
742
743 static int
744 channel_key_store (struct Plugin *plugin,
745                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
746 {
747   sqlite3_stmt *stmt = plugin->insert_channel_key;
748
749   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
750                                       sizeof (*channel_key), SQLITE_STATIC))
751   {
752     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
753                 "sqlite3_bind");
754   }
755   else if (SQLITE_DONE != sqlite3_step (stmt))
756   {
757     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
758                 "sqlite3_step");
759   }
760
761   if (SQLITE_OK != sqlite3_reset (stmt))
762   {
763     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
764                 "sqlite3_reset");
765     return GNUNET_SYSERR;
766   }
767
768   return GNUNET_OK;
769 }
770
771
772 static int
773 slave_key_store (struct Plugin *plugin,
774                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
775 {
776   sqlite3_stmt *stmt = plugin->insert_slave_key;
777
778   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
779                                       sizeof (*slave_key), SQLITE_STATIC))
780   {
781     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
782                 "sqlite3_bind");
783   }
784   else if (SQLITE_DONE != sqlite3_step (stmt))
785   {
786     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
787                 "sqlite3_step");
788   }
789
790
791   if (SQLITE_OK != sqlite3_reset (stmt))
792   {
793     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
794                 "sqlite3_reset");
795     return GNUNET_SYSERR;
796   }
797
798   return GNUNET_OK;
799 }
800
801
802 /**
803  * Store join/leave events for a PSYC channel in order to be able to answer
804  * membership test queries later.
805  *
806  * @see GNUNET_PSYCSTORE_membership_store()
807  *
808  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
809  */
810 static int
811 sqlite_membership_store (void *cls,
812                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
813                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
814                          int did_join,
815                          uint64_t announced_at,
816                          uint64_t effective_since,
817                          uint64_t group_generation)
818 {
819   struct Plugin *plugin = cls;
820   sqlite3_stmt *stmt = plugin->insert_membership;
821
822   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
823
824   if (announced_at > INT64_MAX ||
825       effective_since > INT64_MAX ||
826       group_generation > INT64_MAX)
827   {
828     GNUNET_break (0);
829     return GNUNET_SYSERR;
830   }
831
832   if (GNUNET_OK != channel_key_store (plugin, channel_key)
833       || GNUNET_OK != slave_key_store (plugin, slave_key))
834     return GNUNET_SYSERR;
835
836   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
837                                       sizeof (*channel_key), SQLITE_STATIC)
838       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
839                                          sizeof (*slave_key), SQLITE_STATIC)
840       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
841       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
842       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
843       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
844   {
845     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
846                 "sqlite3_bind");
847   }
848   else if (SQLITE_DONE != sqlite3_step (stmt))
849   {
850     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
851                 "sqlite3_step");
852   }
853
854   if (SQLITE_OK != sqlite3_reset (stmt))
855   {
856     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
857                 "sqlite3_reset");
858     return GNUNET_SYSERR;
859   }
860
861   return GNUNET_OK;
862 }
863
864 /**
865  * Test if a member was admitted to the channel at the given message ID.
866  *
867  * @see GNUNET_PSYCSTORE_membership_test()
868  *
869  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
870  *         #GNUNET_SYSERR if there was en error.
871  */
872 static int
873 membership_test (void *cls,
874                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
875                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
876                  uint64_t message_id)
877 {
878   struct Plugin *plugin = cls;
879   sqlite3_stmt *stmt = plugin->select_membership;
880   int ret = GNUNET_SYSERR;
881
882   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
883                                       sizeof (*channel_key), SQLITE_STATIC)
884       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
885                                          sizeof (*slave_key), SQLITE_STATIC)
886       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
887   {
888     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
889                 "sqlite3_bind");
890   }
891   else
892   {
893     switch (sqlite3_step (stmt))
894     {
895     case SQLITE_DONE:
896       ret = GNUNET_NO;
897       break;
898     case SQLITE_ROW:
899       ret = GNUNET_YES;
900     }
901   }
902
903   if (SQLITE_OK != sqlite3_reset (stmt))
904   {
905     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
906                 "sqlite3_reset");
907   }
908
909   return ret;
910 }
911
912 /**
913  * Store a message fragment sent to a channel.
914  *
915  * @see GNUNET_PSYCSTORE_fragment_store()
916  *
917  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
918  */
919 static int
920 fragment_store (void *cls,
921                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
922                 const struct GNUNET_MULTICAST_MessageHeader *msg,
923                 uint32_t psycstore_flags)
924 {
925   struct Plugin *plugin = cls;
926   sqlite3_stmt *stmt = plugin->insert_fragment;
927
928   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
929
930   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
931   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
932   uint64_t message_id = GNUNET_ntohll (msg->message_id);
933   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
934
935   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
936       message_id > INT64_MAX || group_generation > INT64_MAX)
937   {
938     LOG (GNUNET_ERROR_TYPE_ERROR,
939          "Tried to store fragment with a field > INT64_MAX: "
940          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
941          message_id, group_generation);
942     GNUNET_break (0);
943     return GNUNET_SYSERR;
944   }
945
946   if (GNUNET_OK != channel_key_store (plugin, channel_key))
947     return GNUNET_SYSERR;
948
949   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
950                                       sizeof (*channel_key), SQLITE_STATIC)
951       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
952       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
953                                          sizeof (msg->signature), SQLITE_STATIC)
954       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
955                                          sizeof (msg->purpose), SQLITE_STATIC)
956       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
957       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
958       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
959       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
960       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
961       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
962       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
963                                          ntohs (msg->header.size)
964                                          - sizeof (*msg), SQLITE_STATIC))
965   {
966     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
967                 "sqlite3_bind");
968   }
969   else if (SQLITE_DONE != sqlite3_step (stmt))
970   {
971     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
972                 "sqlite3_step");
973   }
974
975   if (SQLITE_OK != sqlite3_reset (stmt))
976   {
977     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
978                 "sqlite3_reset");
979     return GNUNET_SYSERR;
980   }
981
982   return GNUNET_OK;
983 }
984
985 /**
986  * Set additional flags for a given message.
987  *
988  * They are OR'd with any existing flags set.
989  *
990  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
991  */
992 static int
993 message_add_flags (void *cls,
994                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
995                    uint64_t message_id,
996                    uint32_t psycstore_flags)
997 {
998   struct Plugin *plugin = cls;
999   sqlite3_stmt *stmt = plugin->update_message_flags;
1000   int ret = GNUNET_SYSERR;
1001
1002   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1003       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1004                                          sizeof (*channel_key), SQLITE_STATIC)
1005       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1006   {
1007     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1008                 "sqlite3_bind");
1009   }
1010   else
1011   {
1012     switch (sqlite3_step (stmt))
1013     {
1014     case SQLITE_DONE:
1015       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1016       break;
1017     default:
1018       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1019                   "sqlite3_step");
1020     }
1021   }
1022
1023   if (SQLITE_OK != sqlite3_reset (stmt))
1024   {
1025     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1026                 "sqlite3_reset");
1027     return GNUNET_SYSERR;
1028   }
1029
1030   return ret;
1031 }
1032
1033 static int
1034 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1035               void *cb_cls)
1036 {
1037   int data_size = sqlite3_column_bytes (stmt, 9);
1038   struct GNUNET_MULTICAST_MessageHeader *msg
1039     = GNUNET_malloc (sizeof (*msg) + data_size);
1040
1041   msg->header.size = htons (sizeof (*msg) + data_size);
1042   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1043   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1044   GNUNET_memcpy (&msg->signature,
1045           sqlite3_column_blob (stmt, 1),
1046           sqlite3_column_bytes (stmt, 1));
1047   GNUNET_memcpy (&msg->purpose,
1048           sqlite3_column_blob (stmt, 2),
1049           sqlite3_column_bytes (stmt, 2));
1050   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1051   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1052   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1053   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1054   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1055   GNUNET_memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1056
1057   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1058 }
1059
1060
1061 static int
1062 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1063                  uint64_t *returned_fragments,
1064                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1065 {
1066   int ret = GNUNET_SYSERR;
1067   int sql_ret;
1068
1069   do
1070   {
1071     sql_ret = sqlite3_step (stmt);
1072     switch (sql_ret)
1073     {
1074     case SQLITE_DONE:
1075       if (ret != GNUNET_OK)
1076         ret = GNUNET_NO;
1077       break;
1078     case SQLITE_ROW:
1079       ret = fragment_row (stmt, cb, cb_cls);
1080       (*returned_fragments)++;
1081       if (ret != GNUNET_YES)
1082         sql_ret = SQLITE_DONE;
1083       break;
1084     default:
1085       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1086                   "sqlite3_step");
1087     }
1088   }
1089   while (sql_ret == SQLITE_ROW);
1090
1091   return ret;
1092 }
1093
1094 /**
1095  * Retrieve a message fragment range by fragment ID.
1096  *
1097  * @see GNUNET_PSYCSTORE_fragment_get()
1098  *
1099  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1100  */
1101 static int
1102 fragment_get (void *cls,
1103               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1104               uint64_t first_fragment_id,
1105               uint64_t last_fragment_id,
1106               uint64_t *returned_fragments,
1107               GNUNET_PSYCSTORE_FragmentCallback cb,
1108               void *cb_cls)
1109 {
1110   struct Plugin *plugin = cls;
1111   sqlite3_stmt *stmt = plugin->select_fragments;
1112   int ret = GNUNET_SYSERR;
1113   *returned_fragments = 0;
1114
1115   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1116                                       sizeof (*channel_key),
1117                                       SQLITE_STATIC)
1118       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1119       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1120   {
1121     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1122                 "sqlite3_bind");
1123   }
1124   else
1125   {
1126     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1127   }
1128
1129   if (SQLITE_OK != sqlite3_reset (stmt))
1130   {
1131     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1132                 "sqlite3_reset");
1133   }
1134
1135   return ret;
1136 }
1137
1138
1139 /**
1140  * Retrieve a message fragment range by fragment ID.
1141  *
1142  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1143  *
1144  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1145  */
1146 static int
1147 fragment_get_latest (void *cls,
1148                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1149                      uint64_t fragment_limit,
1150                      uint64_t *returned_fragments,
1151                      GNUNET_PSYCSTORE_FragmentCallback cb,
1152                      void *cb_cls)
1153 {
1154   struct Plugin *plugin = cls;
1155   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1156   int ret = GNUNET_SYSERR;
1157   *returned_fragments = 0;
1158
1159   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1160                                       sizeof (*channel_key),
1161                                       SQLITE_STATIC)
1162       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1163   {
1164     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1165                 "sqlite3_bind");
1166   }
1167   else
1168   {
1169     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1170   }
1171
1172   if (SQLITE_OK != sqlite3_reset (stmt))
1173   {
1174     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1175                 "sqlite3_reset");
1176   }
1177
1178   return ret;
1179 }
1180
1181
1182 /**
1183  * Retrieve all fragments of a message ID range.
1184  *
1185  * @see GNUNET_PSYCSTORE_message_get()
1186  *
1187  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1188  */
1189 static int
1190 message_get (void *cls,
1191              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1192              uint64_t first_message_id,
1193              uint64_t last_message_id,
1194              uint64_t fragment_limit,
1195              uint64_t *returned_fragments,
1196              GNUNET_PSYCSTORE_FragmentCallback cb,
1197              void *cb_cls)
1198 {
1199   struct Plugin *plugin = cls;
1200   sqlite3_stmt *stmt = plugin->select_messages;
1201   int ret = GNUNET_SYSERR;
1202   *returned_fragments = 0;
1203
1204   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1205                                       sizeof (*channel_key),
1206                                       SQLITE_STATIC)
1207       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1208       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id)
1209       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1210                                           (0 != fragment_limit)
1211                                           ? fragment_limit
1212                                           : INT64_MAX))
1213   {
1214     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1215                 "sqlite3_bind");
1216   }
1217   else
1218   {
1219     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1220   }
1221
1222   if (SQLITE_OK != sqlite3_reset (stmt))
1223   {
1224     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1225                 "sqlite3_reset");
1226   }
1227
1228   return ret;
1229 }
1230
1231
1232 /**
1233  * Retrieve all fragments of the latest messages.
1234  *
1235  * @see GNUNET_PSYCSTORE_message_get_latest()
1236  *
1237  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1238  */
1239 static int
1240 message_get_latest (void *cls,
1241                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1242                     uint64_t message_limit,
1243                     uint64_t *returned_fragments,
1244                     GNUNET_PSYCSTORE_FragmentCallback cb,
1245                     void *cb_cls)
1246 {
1247   struct Plugin *plugin = cls;
1248   sqlite3_stmt *stmt = plugin->select_latest_messages;
1249   int ret = GNUNET_SYSERR;
1250   *returned_fragments = 0;
1251
1252   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1253                                       sizeof (*channel_key),
1254                                       SQLITE_STATIC)
1255       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1256                                          sizeof (*channel_key),
1257                                          SQLITE_STATIC)
1258       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1259   {
1260     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1261                 "sqlite3_bind");
1262   }
1263   else
1264   {
1265     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1266   }
1267
1268   if (SQLITE_OK != sqlite3_reset (stmt))
1269   {
1270     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1271                 "sqlite3_reset");
1272   }
1273
1274   return ret;
1275 }
1276
1277
1278 /**
1279  * Retrieve a fragment of message specified by its message ID and fragment
1280  * offset.
1281  *
1282  * @see GNUNET_PSYCSTORE_message_get_fragment()
1283  *
1284  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1285  */
1286 static int
1287 message_get_fragment (void *cls,
1288                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1289                       uint64_t message_id,
1290                       uint64_t fragment_offset,
1291                       GNUNET_PSYCSTORE_FragmentCallback cb,
1292                       void *cb_cls)
1293 {
1294   struct Plugin *plugin = cls;
1295   sqlite3_stmt *stmt = plugin->select_message_fragment;
1296   int ret = GNUNET_SYSERR;
1297
1298   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1299                                       sizeof (*channel_key),
1300                                       SQLITE_STATIC)
1301       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1302       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1303   {
1304     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1305                 "sqlite3_bind");
1306   }
1307   else
1308   {
1309     switch (sqlite3_step (stmt))
1310     {
1311     case SQLITE_DONE:
1312       ret = GNUNET_NO;
1313       break;
1314     case SQLITE_ROW:
1315       ret = fragment_row (stmt, cb, cb_cls);
1316       break;
1317     default:
1318       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1319                   "sqlite3_step");
1320     }
1321   }
1322
1323   if (SQLITE_OK != sqlite3_reset (stmt))
1324   {
1325     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1326                 "sqlite3_reset");
1327   }
1328
1329   return ret;
1330 }
1331
1332 /**
1333  * Retrieve the max. values of message counters for a channel.
1334  *
1335  * @see GNUNET_PSYCSTORE_counters_get()
1336  *
1337  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1338  */
1339 static int
1340 counters_message_get (void *cls,
1341                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1342                       uint64_t *max_fragment_id,
1343                       uint64_t *max_message_id,
1344                       uint64_t *max_group_generation)
1345 {
1346   struct Plugin *plugin = cls;
1347   sqlite3_stmt *stmt = plugin->select_counters_message;
1348   int ret = GNUNET_SYSERR;
1349
1350   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1351                                       sizeof (*channel_key),
1352                                       SQLITE_STATIC))
1353   {
1354     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1355                 "sqlite3_bind");
1356   }
1357   else
1358   {
1359     switch (sqlite3_step (stmt))
1360     {
1361     case SQLITE_DONE:
1362       ret = GNUNET_NO;
1363       break;
1364     case SQLITE_ROW:
1365       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1366       *max_message_id = sqlite3_column_int64 (stmt, 1);
1367       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1368       ret = GNUNET_OK;
1369       break;
1370     default:
1371       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1372                   "sqlite3_step");
1373     }
1374   }
1375
1376   if (SQLITE_OK != sqlite3_reset (stmt))
1377   {
1378     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1379                 "sqlite3_reset");
1380   }
1381
1382   return ret;
1383 }
1384
1385 /**
1386  * Retrieve the max. values of state counters for a channel.
1387  *
1388  * @see GNUNET_PSYCSTORE_counters_get()
1389  *
1390  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1391  */
1392 static int
1393 counters_state_get (void *cls,
1394                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1395                     uint64_t *max_state_message_id)
1396 {
1397   struct Plugin *plugin = cls;
1398   sqlite3_stmt *stmt = plugin->select_counters_state;
1399   int ret = GNUNET_SYSERR;
1400
1401   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1402                                       sizeof (*channel_key),
1403                                       SQLITE_STATIC))
1404   {
1405     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1406                 "sqlite3_bind");
1407   }
1408   else
1409   {
1410     switch (sqlite3_step (stmt))
1411     {
1412     case SQLITE_DONE:
1413       ret = GNUNET_NO;
1414       break;
1415     case SQLITE_ROW:
1416       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1417       ret = GNUNET_OK;
1418       break;
1419     default:
1420       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1421                   "sqlite3_step");
1422     }
1423   }
1424
1425   if (SQLITE_OK != sqlite3_reset (stmt))
1426   {
1427     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1428                 "sqlite3_reset");
1429   }
1430
1431   return ret;
1432 }
1433
1434
1435 /**
1436  * Assign a value to a state variable.
1437  *
1438  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1439  */
1440 static int
1441 state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
1442               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1443               const char *name, const void *value, size_t value_size)
1444 {
1445   int ret = GNUNET_SYSERR;
1446
1447   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1448                                       sizeof (*channel_key), SQLITE_STATIC)
1449       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1450       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1451                                          SQLITE_STATIC))
1452   {
1453     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1454                 "sqlite3_bind");
1455   }
1456   else
1457   {
1458     switch (sqlite3_step (stmt))
1459     {
1460     case SQLITE_DONE:
1461       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1462       break;
1463     default:
1464       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1465                   "sqlite3_step");
1466     }
1467   }
1468
1469   if (SQLITE_OK != sqlite3_reset (stmt))
1470   {
1471     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1472                 "sqlite3_reset");
1473     return GNUNET_SYSERR;
1474   }
1475
1476   return ret;
1477 }
1478
1479
1480 static int
1481 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1482                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1483                    uint64_t message_id)
1484 {
1485   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1486       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1487                                          sizeof (*channel_key), SQLITE_STATIC))
1488   {
1489     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1490                 "sqlite3_bind");
1491   }
1492   else if (SQLITE_DONE != sqlite3_step (stmt))
1493   {
1494     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1495                 "sqlite3_step");
1496   }
1497   if (SQLITE_OK != sqlite3_reset (stmt))
1498   {
1499     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1500                 "sqlite3_reset");
1501     return GNUNET_SYSERR;
1502   }
1503   return GNUNET_OK;
1504 }
1505
1506
1507 /**
1508  * Begin modifying current state.
1509  */
1510 static int
1511 state_modify_begin (void *cls,
1512                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1513                     uint64_t message_id, uint64_t state_delta)
1514 {
1515   struct Plugin *plugin = cls;
1516
1517   if (state_delta > 0)
1518   {
1519     /**
1520      * We can only apply state modifiers in the current message if modifiers in
1521      * the previous stateful message (message_id - state_delta) were already
1522      * applied.
1523      */
1524
1525     uint64_t max_state_message_id = 0;
1526     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1527     switch (ret)
1528     {
1529     case GNUNET_OK:
1530     case GNUNET_NO: // no state yet
1531       ret = GNUNET_OK;
1532       break;
1533     default:
1534       return ret;
1535     }
1536
1537     if (max_state_message_id < message_id - state_delta)
1538       return GNUNET_NO; /* some stateful messages not yet applied */
1539     else if (message_id - state_delta < max_state_message_id)
1540       return GNUNET_NO; /* changes already applied */
1541   }
1542
1543   if (TRANSACTION_NONE != plugin->transaction)
1544   {
1545     /** @todo FIXME: wait for other transaction to finish  */
1546     return GNUNET_SYSERR;
1547   }
1548   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1549 }
1550
1551
1552 /**
1553  * Set the current value of state variable.
1554  *
1555  * @see GNUNET_PSYCSTORE_state_modify()
1556  *
1557  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1558  */
1559 static int
1560 state_modify_op (void *cls,
1561                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1562                  enum GNUNET_PSYC_Operator op,
1563                  const char *name, const void *value, size_t value_size)
1564 {
1565   struct Plugin *plugin = cls;
1566   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1567
1568   switch (op)
1569   {
1570   case GNUNET_PSYC_OP_ASSIGN:
1571     return state_assign (plugin, plugin->insert_state_current, channel_key,
1572                          name, value, value_size);
1573
1574   default: /** @todo implement more state operations */
1575     GNUNET_break (0);
1576     return GNUNET_SYSERR;
1577   }
1578 }
1579
1580
1581 /**
1582  * End modifying current state.
1583  */
1584 static int
1585 state_modify_end (void *cls,
1586                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1587                   uint64_t message_id)
1588 {
1589   struct Plugin *plugin = cls;
1590   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1591
1592   return
1593     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1594     && GNUNET_OK == update_message_id (plugin,
1595                                        plugin->update_max_state_message_id,
1596                                        channel_key, message_id)
1597     && GNUNET_OK == transaction_commit (plugin)
1598     ? GNUNET_OK : GNUNET_SYSERR;
1599 }
1600
1601
1602 /**
1603  * Begin state synchronization.
1604  */
1605 static int
1606 state_sync_begin (void *cls,
1607                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1608 {
1609   struct Plugin *plugin = cls;
1610   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1611 }
1612
1613
1614 /**
1615  * Assign current value of a state variable.
1616  *
1617  * @see GNUNET_PSYCSTORE_state_modify()
1618  *
1619  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1620  */
1621 static int
1622 state_sync_assign (void *cls,
1623                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1624                 const char *name, const void *value, size_t value_size)
1625 {
1626   struct Plugin *plugin = cls;
1627   return state_assign (cls, plugin->insert_state_sync, channel_key,
1628                        name, value, value_size);
1629 }
1630
1631
1632 /**
1633  * End modifying current state.
1634  */
1635 static int
1636 state_sync_end (void *cls,
1637                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1638                 uint64_t max_state_message_id,
1639                 uint64_t state_hash_message_id)
1640 {
1641   struct Plugin *plugin = cls;
1642   int ret = GNUNET_SYSERR;
1643
1644   if (TRANSACTION_NONE != plugin->transaction)
1645   {
1646     /** @todo FIXME: wait for other transaction to finish  */
1647     return GNUNET_SYSERR;
1648   }
1649
1650   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1651     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1652     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1653                                   channel_key)
1654     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1655                                   channel_key)
1656     && GNUNET_OK == update_message_id (plugin,
1657                                        plugin->update_state_hash_message_id,
1658                                        channel_key, state_hash_message_id)
1659     && GNUNET_OK == update_message_id (plugin,
1660                                        plugin->update_max_state_message_id,
1661                                        channel_key, max_state_message_id)
1662     && GNUNET_OK == transaction_commit (plugin)
1663     ? ret = GNUNET_OK
1664     : transaction_rollback (plugin);
1665   return ret;
1666 }
1667
1668
1669 /**
1670  * Delete the whole state.
1671  *
1672  * @see GNUNET_PSYCSTORE_state_reset()
1673  *
1674  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1675  */
1676 static int
1677 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1678 {
1679   struct Plugin *plugin = cls;
1680   return exec_channel (plugin, plugin->delete_state, channel_key);
1681 }
1682
1683
1684 /**
1685  * Update signed values of state variables in the state store.
1686  *
1687  * @see GNUNET_PSYCSTORE_state_hash_update()
1688  *
1689  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1690  */
1691 static int
1692 state_update_signed (void *cls,
1693                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1694 {
1695   struct Plugin *plugin = cls;
1696   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1697 }
1698
1699
1700 /**
1701  * Retrieve a state variable by name.
1702  *
1703  * @see GNUNET_PSYCSTORE_state_get()
1704  *
1705  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1706  */
1707 static int
1708 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1709            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1710 {
1711   struct Plugin *plugin = cls;
1712   int ret = GNUNET_SYSERR;
1713
1714   sqlite3_stmt *stmt = plugin->select_state_one;
1715
1716   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1717                                       sizeof (*channel_key),
1718                                       SQLITE_STATIC)
1719       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1720   {
1721     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1722                 "sqlite3_bind");
1723   }
1724   else
1725   {
1726     switch (sqlite3_step (stmt))
1727     {
1728     case SQLITE_DONE:
1729       ret = GNUNET_NO;
1730       break;
1731     case SQLITE_ROW:
1732       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1733                 sqlite3_column_bytes (stmt, 0));
1734       break;
1735     default:
1736       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1737                   "sqlite3_step");
1738     }
1739   }
1740
1741   if (SQLITE_OK != sqlite3_reset (stmt))
1742   {
1743     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1744                 "sqlite3_reset");
1745   }
1746
1747   return ret;
1748 }
1749
1750
1751 /**
1752  * Retrieve all state variables for a channel with the given prefix.
1753  *
1754  * @see GNUNET_PSYCSTORE_state_get_prefix()
1755  *
1756  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1757  */
1758 static int
1759 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1760                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1761                   void *cb_cls)
1762 {
1763   struct Plugin *plugin = cls;
1764   int ret = GNUNET_SYSERR;
1765   sqlite3_stmt *stmt = plugin->select_state_prefix;
1766   size_t name_len = strlen (name);
1767
1768   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1769                                       sizeof (*channel_key), SQLITE_STATIC)
1770       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1771       || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len)
1772       || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC))
1773   {
1774     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1775                 "sqlite3_bind");
1776   }
1777   else
1778   {
1779     int sql_ret;
1780     do
1781     {
1782       sql_ret = sqlite3_step (stmt);
1783       switch (sql_ret)
1784       {
1785       case SQLITE_DONE:
1786         if (ret != GNUNET_OK)
1787           ret = GNUNET_NO;
1788         break;
1789       case SQLITE_ROW:
1790         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1791                   sqlite3_column_blob (stmt, 1),
1792                   sqlite3_column_bytes (stmt, 1));
1793         if (ret != GNUNET_YES)
1794           sql_ret = SQLITE_DONE;
1795         break;
1796       default:
1797         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1798                     "sqlite3_step");
1799       }
1800     }
1801     while (sql_ret == SQLITE_ROW);
1802   }
1803   if (SQLITE_OK != sqlite3_reset (stmt))
1804   {
1805     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1806                 "sqlite3_reset");
1807   }
1808   return ret;
1809 }
1810
1811
1812 /**
1813  * Retrieve all signed state variables for a channel.
1814  *
1815  * @see GNUNET_PSYCSTORE_state_get_signed()
1816  *
1817  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1818  */
1819 static int
1820 state_get_signed (void *cls,
1821                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1822                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1823 {
1824   struct Plugin *plugin = cls;
1825   int ret = GNUNET_SYSERR;
1826
1827   sqlite3_stmt *stmt = plugin->select_state_signed;
1828
1829   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1830                                       sizeof (*channel_key), SQLITE_STATIC))
1831   {
1832     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1833                 "sqlite3_bind");
1834   }
1835   else
1836   {
1837     int sql_ret;
1838     do
1839     {
1840       sql_ret = sqlite3_step (stmt);
1841       switch (sql_ret)
1842       {
1843       case SQLITE_DONE:
1844         if (ret != GNUNET_OK)
1845           ret = GNUNET_NO;
1846         break;
1847       case SQLITE_ROW:
1848         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1849                   sqlite3_column_blob (stmt, 1),
1850                   sqlite3_column_bytes (stmt, 1));
1851         if (ret != GNUNET_YES)
1852           sql_ret = SQLITE_DONE;
1853         break;
1854       default:
1855         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1856                     "sqlite3_step");
1857       }
1858     }
1859     while (sql_ret == SQLITE_ROW);
1860   }
1861
1862   if (SQLITE_OK != sqlite3_reset (stmt))
1863   {
1864     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1865                 "sqlite3_reset");
1866   }
1867
1868   return ret;
1869 }
1870
1871
1872 /**
1873  * Entry point for the plugin.
1874  *
1875  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1876  * @return NULL on error, otherwise the plugin context
1877  */
1878 void *
1879 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1880 {
1881   static struct Plugin plugin;
1882   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1883   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1884
1885   if (NULL != plugin.cfg)
1886     return NULL;                /* can only initialize once! */
1887   memset (&plugin, 0, sizeof (struct Plugin));
1888   plugin.cfg = cfg;
1889   if (GNUNET_OK != database_setup (&plugin))
1890   {
1891     database_shutdown (&plugin);
1892     return NULL;
1893   }
1894   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1895   api->cls = &plugin;
1896   api->membership_store = &sqlite_membership_store;
1897   api->membership_test = &membership_test;
1898   api->fragment_store = &fragment_store;
1899   api->message_add_flags = &message_add_flags;
1900   api->fragment_get = &fragment_get;
1901   api->fragment_get_latest = &fragment_get_latest;
1902   api->message_get = &message_get;
1903   api->message_get_latest = &message_get_latest;
1904   api->message_get_fragment = &message_get_fragment;
1905   api->counters_message_get = &counters_message_get;
1906   api->counters_state_get = &counters_state_get;
1907   api->state_modify_begin = &state_modify_begin;
1908   api->state_modify_op = &state_modify_op;
1909   api->state_modify_end = &state_modify_end;
1910   api->state_sync_begin = &state_sync_begin;
1911   api->state_sync_assign = &state_sync_assign;
1912   api->state_sync_end = &state_sync_end;
1913   api->state_reset = &state_reset;
1914   api->state_update_signed = &state_update_signed;
1915   api->state_get = &state_get;
1916   api->state_get_prefix = &state_get_prefix;
1917   api->state_get_signed = &state_get_signed;
1918
1919   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1920   return api;
1921 }
1922
1923
1924 /**
1925  * Exit point from the plugin.
1926  *
1927  * @param cls The plugin context (as returned by "init")
1928  * @return Always NULL
1929  */
1930 void *
1931 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1932 {
1933   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1934   struct Plugin *plugin = api->cls;
1935
1936   database_shutdown (plugin);
1937   plugin->cfg = NULL;
1938   GNUNET_free (api);
1939   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1940   return NULL;
1941 }
1942
1943 /* end of plugin_psycstore_sqlite.c */