Exponential backoff for find finger trail task
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
39 #include <sqlite3.h>
40
41 /**
42  * After how many ms "busy" should a DB operation fail for good?  A
43  * low value makes sure that we are more responsive to requests
44  * (especially PUTs).  A high value guarantees a higher success rate
45  * (SELECTs in iterate can take several seconds despite LIMIT=1).
46  *
47  * The default value of 1s should ensure that users do not experience
48  * huge latencies while at the same time allowing operations to
49  * succeed with reasonable probability.
50  */
51 #define BUSY_TIMEOUT_MS 1000
52
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54
55 /**
56  * Log an error message at log-level 'level' that indicates
57  * a failure of the command 'cmd' on file 'filename'
58  * with the message given by strerror(errno).
59  */
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
61
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
63
64 enum Transactions {
65   TRANSACTION_NONE = 0,
66   TRANSACTION_STATE_MODIFY
67 };
68
69 /**
70  * Context for all functions in this plugin.
71  */
72 struct Plugin
73 {
74
75   const struct GNUNET_CONFIGURATION_Handle *cfg;
76
77   /**
78    * Database filename.
79    */
80   char *fn;
81
82   /**
83    * Native SQLite database handle.
84    */
85   sqlite3 *dbh;
86
87   /**
88    * Current transaction.
89    */
90   enum Transactions transaction;
91
92   sqlite3_stmt *transaction_begin;
93
94   sqlite3_stmt *transaction_commit;
95
96   sqlite3_stmt *transaction_rollback;
97
98   /**
99    * Precompiled SQL for channel_key_store()
100    */
101   sqlite3_stmt *insert_channel_key;
102
103   /**
104    * Precompiled SQL for slave_key_store()
105    */
106   sqlite3_stmt *insert_slave_key;
107
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   sqlite3_stmt *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   sqlite3_stmt *select_membership;
118
119
120   /**
121    * Precompiled SQL for fragment_store()
122    */
123   sqlite3_stmt *insert_fragment;
124
125   /**
126    * Precompiled SQL for message_add_flags()
127    */
128   sqlite3_stmt *update_message_flags;
129
130   /**
131    * Precompiled SQL for fragment_get()
132    */
133   sqlite3_stmt *select_fragments;
134
135   /**
136    * Precompiled SQL for fragment_get()
137    */
138   sqlite3_stmt *select_latest_fragments;
139
140   /**
141    * Precompiled SQL for message_get()
142    */
143   sqlite3_stmt *select_messages;
144
145   /**
146    * Precompiled SQL for message_get()
147    */
148   sqlite3_stmt *select_latest_messages;
149
150   /**
151    * Precompiled SQL for message_get_fragment()
152    */
153   sqlite3_stmt *select_message_fragment;
154
155   /**
156    * Precompiled SQL for counters_get_message()
157    */
158   sqlite3_stmt *select_counters_message;
159
160   /**
161    * Precompiled SQL for counters_get_state()
162    */
163   sqlite3_stmt *select_counters_state;
164
165   /**
166    * Precompiled SQL for state_modify_end()
167    */
168   sqlite3_stmt *update_state_hash_message_id;
169
170   /**
171    * Precompiled SQL for state_sync_end()
172    */
173   sqlite3_stmt *update_max_state_message_id;
174
175
176   /**
177    * Precompiled SQL for message_modify_begin()
178    */
179   sqlite3_stmt *select_message_state_delta;
180
181   /**
182    * Precompiled SQL for state_modify_set()
183    */
184   sqlite3_stmt *insert_state_current;
185
186   /**
187    * Precompiled SQL for state_modify_end()
188    */
189   sqlite3_stmt *delete_state_empty;
190
191   /**
192    * Precompiled SQL for state_set_signed()
193    */
194   sqlite3_stmt *update_state_signed;
195
196   /**
197    * Precompiled SQL for state_sync()
198    */
199   sqlite3_stmt *insert_state_sync;
200
201   /**
202    * Precompiled SQL for state_sync()
203    */
204   sqlite3_stmt *delete_state;
205
206   /**
207    * Precompiled SQL for state_sync()
208    */
209   sqlite3_stmt *insert_state_from_sync;
210
211   /**
212    * Precompiled SQL for state_sync()
213    */
214   sqlite3_stmt *delete_state_sync;
215
216   /**
217    * Precompiled SQL for state_get_signed()
218    */
219   sqlite3_stmt *select_state_signed;
220
221   /**
222    * Precompiled SQL for state_get()
223    */
224   sqlite3_stmt *select_state_one;
225
226   /**
227    * Precompiled SQL for state_get_prefix()
228    */
229   sqlite3_stmt *select_state_prefix;
230
231 };
232
233 #if DEBUG_PSYCSTORE
234
235 static void
236 sql_trace (void *cls, const char *sql)
237 {
238   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
239 }
240
241 #endif
242
243 /**
244  * @brief Prepare a SQL statement
245  *
246  * @param dbh handle to the database
247  * @param sql SQL statement, UTF-8 encoded
248  * @param stmt set to the prepared statement
249  * @return 0 on success
250  */
251 static int
252 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
253 {
254   char *tail;
255   int result;
256
257   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
258                                (const char **) &tail);
259   LOG (GNUNET_ERROR_TYPE_DEBUG,
260        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
261   if (result != SQLITE_OK)
262     LOG (GNUNET_ERROR_TYPE_ERROR,
263          _("Error preparing SQL query: %s\n  %s\n"),
264          sqlite3_errmsg (dbh), sql);
265   return result;
266 }
267
268
269 /**
270  * @brief Prepare a SQL statement
271  *
272  * @param dbh handle to the database
273  * @param sql SQL statement, UTF-8 encoded
274  * @return 0 on success
275  */
276 static int
277 sql_exec (sqlite3 *dbh, const char *sql)
278 {
279   int result;
280
281   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
282   LOG (GNUNET_ERROR_TYPE_DEBUG,
283        "Executed `%s' / %d\n", sql, result);
284   if (result != SQLITE_OK)
285     LOG (GNUNET_ERROR_TYPE_ERROR,
286          _("Error executing SQL query: %s\n  %s\n"),
287          sqlite3_errmsg (dbh), sql);
288   return result;
289 }
290
291
292 /**
293  * Initialize the database connections and associated
294  * data structures (create tables and indices
295  * as needed as well).
296  *
297  * @param plugin the plugin context (state for this module)
298  * @return GNUNET_OK on success
299  */
300 static int
301 database_setup (struct Plugin *plugin)
302 {
303   char *filename;
304
305   if (GNUNET_OK !=
306       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
307                                                "FILENAME", &filename))
308   {
309     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
310                                "psycstore-sqlite", "FILENAME");
311     return GNUNET_SYSERR;
312   }
313   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
314   {
315     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
316     {
317       GNUNET_break (0);
318       GNUNET_free (filename);
319       return GNUNET_SYSERR;
320     }
321   }
322   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
323   plugin->fn = filename;
324
325   /* Open database and precompile statements */
326   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
327   {
328     LOG (GNUNET_ERROR_TYPE_ERROR,
329          _("Unable to initialize SQLite: %s.\n"),
330          sqlite3_errmsg (plugin->dbh));
331     return GNUNET_SYSERR;
332   }
333
334 #if DEBUG_PSYCSTORE
335   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
336 #endif
337
338   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
339   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
340   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
341   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
342   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
343 #if ! DEBUG_PSYCSTORE
344   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
345 #endif
346   sql_exec (plugin->dbh, "PRAGMA count_changes=OFF");
347   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
348
349   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
350
351   /* Create tables */
352
353   sql_exec (plugin->dbh,
354             "CREATE TABLE IF NOT EXISTS channels (\n"
355             "  id INTEGER PRIMARY KEY,\n"
356             "  pub_key BLOB UNIQUE,\n"
357             "  max_state_message_id INTEGER,\n"
358             "  state_hash_message_id INTEGER\n"
359             ");");
360
361   sql_exec (plugin->dbh,
362             "CREATE TABLE IF NOT EXISTS slaves (\n"
363             "  id INTEGER PRIMARY KEY,\n"
364             "  pub_key BLOB UNIQUE\n"
365             ");");
366
367   sql_exec (plugin->dbh,
368             "CREATE TABLE IF NOT EXISTS membership (\n"
369             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
370             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
371             "  did_join INTEGER NOT NULL,\n"
372             "  announced_at INTEGER NOT NULL,\n"
373             "  effective_since INTEGER NOT NULL,\n"
374             "  group_generation INTEGER NOT NULL\n"
375             ");");
376   sql_exec (plugin->dbh,
377             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
378             "ON membership (channel_id, slave_id);");
379
380   sql_exec (plugin->dbh,
381             "CREATE TABLE IF NOT EXISTS messages (\n"
382             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
383             "  hop_counter INTEGER NOT NULL,\n"
384             "  signature BLOB,\n"
385             "  purpose BLOB,\n"
386             "  fragment_id INTEGER NOT NULL,\n"
387             "  fragment_offset INTEGER NOT NULL,\n"
388             "  message_id INTEGER NOT NULL,\n"
389             "  group_generation INTEGER NOT NULL,\n"
390             "  multicast_flags INTEGER NOT NULL,\n"
391             "  psycstore_flags INTEGER NOT NULL,\n"
392             "  data BLOB,\n"
393             "  PRIMARY KEY (channel_id, fragment_id),\n"
394             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
395             ");");
396
397   sql_exec (plugin->dbh,
398             "CREATE TABLE IF NOT EXISTS state (\n"
399             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
400             "  name TEXT NOT NULL,\n"
401             "  value_current BLOB,\n"
402             "  value_signed BLOB,\n"
403             "  PRIMARY KEY (channel_id, name)\n"
404             ");");
405
406   sql_exec (plugin->dbh,
407             "CREATE TABLE IF NOT EXISTS state_sync (\n"
408             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
409             "  name TEXT NOT NULL,\n"
410             "  value BLOB,\n"
411             "  PRIMARY KEY (channel_id, name)\n"
412             ");");
413
414   /* Prepare statements */
415
416   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
417
418   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
419
420   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
421
422   sql_prepare (plugin->dbh,
423                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
424                &plugin->insert_channel_key);
425
426   sql_prepare (plugin->dbh,
427                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
428                &plugin->insert_slave_key);
429
430   sql_prepare (plugin->dbh,
431                "INSERT INTO membership\n"
432                " (channel_id, slave_id, did_join, announced_at,\n"
433                "  effective_since, group_generation)\n"
434                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
435                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
436                "        ?, ?, ?, ?);",
437                &plugin->insert_membership);
438
439   sql_prepare (plugin->dbh,
440                "SELECT did_join FROM membership\n"
441                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
442                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
443                "      AND effective_since <= ? AND did_join = 1\n"
444                "ORDER BY announced_at DESC LIMIT 1;",
445                &plugin->select_membership);
446
447   sql_prepare (plugin->dbh,
448                "INSERT OR IGNORE INTO messages\n"
449                " (channel_id, hop_counter, signature, purpose,\n"
450                "  fragment_id, fragment_offset, message_id,\n"
451                "  group_generation, multicast_flags, psycstore_flags, data)\n"
452                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
453                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
454                &plugin->insert_fragment);
455
456   sql_prepare (plugin->dbh,
457                "UPDATE messages\n"
458                "SET psycstore_flags = psycstore_flags | ?\n"
459                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
460                "      AND message_id = ? AND fragment_offset = 0;",
461                &plugin->update_message_flags);
462
463   sql_prepare (plugin->dbh,
464                "SELECT hop_counter, signature, purpose, fragment_id,\n"
465                "       fragment_offset, message_id, group_generation,\n"
466                "       multicast_flags, psycstore_flags, data\n"
467                "FROM messages\n"
468                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
469                "      AND ? <= fragment_id AND fragment_id <= ?;",
470                &plugin->select_fragments);
471
472   sql_prepare (plugin->dbh,
473                "SELECT hop_counter, signature, purpose, fragment_id,\n"
474                "       fragment_offset, message_id, group_generation,\n"
475                "       multicast_flags, psycstore_flags, data\n"
476                "FROM messages\n"
477                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
478                "      AND ? <= message_id AND message_id <= ?;",
479                &plugin->select_messages);
480
481   sql_prepare (plugin->dbh,
482                "SELECT * FROM\n"
483                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
484                "        fragment_offset, message_id, group_generation,\n"
485                "        multicast_flags, psycstore_flags, data\n"
486                " FROM messages\n"
487                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
488                " ORDER BY fragment_id DESC\n"
489                " LIMIT ?)\n"
490                "ORDER BY fragment_id;",
491                &plugin->select_latest_fragments);
492
493   sql_prepare (plugin->dbh,
494                "SELECT hop_counter, signature, purpose, fragment_id,\n"
495                "       fragment_offset, message_id, group_generation,\n"
496                "        multicast_flags, psycstore_flags, data\n"
497                "FROM messages\n"
498                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
499                "      AND message_id IN\n"
500                "      (SELECT message_id\n"
501                "       FROM messages\n"
502                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
503                "       ORDER BY message_id\n"
504                "       DESC LIMIT ?)\n"
505                "ORDER BY fragment_id;",
506                &plugin->select_latest_messages);
507
508   sql_prepare (plugin->dbh,
509                "SELECT hop_counter, signature, purpose, fragment_id,\n"
510                "       fragment_offset, message_id, group_generation,\n"
511                "       multicast_flags, psycstore_flags, data\n"
512                "FROM messages\n"
513                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
514                "      AND message_id = ? AND fragment_offset = ?;",
515                &plugin->select_message_fragment);
516
517   sql_prepare (plugin->dbh,
518                "SELECT fragment_id, message_id, group_generation\n"
519                "FROM messages\n"
520                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
521                "ORDER BY fragment_id DESC LIMIT 1;",
522                &plugin->select_counters_message);
523
524   sql_prepare (plugin->dbh,
525                "SELECT max_state_message_id\n"
526                "FROM channels\n"
527                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
528                &plugin->select_counters_state);
529
530   sql_prepare (plugin->dbh,
531                "UPDATE channels\n"
532                "SET max_state_message_id = ?\n"
533                "WHERE pub_key = ?;",
534                &plugin->update_max_state_message_id);
535
536   sql_prepare (plugin->dbh,
537                "UPDATE channels\n"
538                "SET state_hash_message_id = ?\n"
539                "WHERE pub_key = ?;",
540                &plugin->update_state_hash_message_id);
541
542   sql_prepare (plugin->dbh,
543                "SELECT 1\n"
544                "FROM channels AS c\n"
545                "LEFT JOIN messages AS m\n"
546                "ON c.id = m.channel_id\n"
547                "WHERE c.pub_key = ?\n"
548                "      AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
549                "           OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
550                "LIMIT 1;",
551                &plugin->select_message_state_delta);
552
553   sql_prepare (plugin->dbh,
554                "INSERT OR REPLACE INTO state\n"
555                "  (channel_id, name, value_current, value_signed)\n"
556                "SELECT new.channel_id, new.name,\n"
557                "       new.value_current, old.value_signed\n"
558                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
559                "             AS channel_id,\n"
560                "             ? AS name, ? AS value_current) AS new\n"
561                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
562                "           FROM state) AS old\n"
563                "ON new.channel_id = old.channel_id AND new.name = old.name;",
564                &plugin->insert_state_current);
565
566   sql_prepare (plugin->dbh,
567                "DELETE FROM state\n"
568                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
569                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
570                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
571                &plugin->delete_state_empty);
572
573   sql_prepare (plugin->dbh,
574                "UPDATE state\n"
575                "SET value_signed = value_current\n"
576                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
577                &plugin->update_state_signed);
578
579   sql_prepare (plugin->dbh,
580                "DELETE FROM state\n"
581                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
582                &plugin->delete_state);
583
584   sql_prepare (plugin->dbh,
585                "INSERT INTO state_sync (channel_id, name, value)\n"
586                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
587                &plugin->insert_state_sync);
588
589   sql_prepare (plugin->dbh,
590                "INSERT INTO state\n"
591                " (channel_id, name, value_current, value_signed)\n"
592                "SELECT channel_id, name, value, value\n"
593                "FROM state_sync\n"
594                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
595                &plugin->insert_state_from_sync);
596
597   sql_prepare (plugin->dbh,
598                "DELETE FROM state_sync\n"
599                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
600                &plugin->delete_state_sync);
601
602   sql_prepare (plugin->dbh,
603                "SELECT value_current\n"
604                "FROM state\n"
605                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
606                "      AND name = ?;",
607                &plugin->select_state_one);
608
609   sql_prepare (plugin->dbh,
610                "SELECT name, value_current\n"
611                "FROM state\n"
612                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
613                "      AND (name = ? OR name LIKE ?);",
614                &plugin->select_state_prefix);
615
616   sql_prepare (plugin->dbh,
617                "SELECT name, value_signed\n"
618                "FROM state\n"
619                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
620                "      AND value_signed IS NOT NULL;",
621                &plugin->select_state_signed);
622
623   return GNUNET_OK;
624 }
625
626
627 /**
628  * Shutdown database connection and associate data
629  * structures.
630  * @param plugin the plugin context (state for this module)
631  */
632 static void
633 database_shutdown (struct Plugin *plugin)
634 {
635   int result;
636   sqlite3_stmt *stmt;
637   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
638   {
639     result = sqlite3_finalize (stmt);
640     if (SQLITE_OK != result)
641       LOG (GNUNET_ERROR_TYPE_WARNING,
642            "Failed to close statement %p: %d\n", stmt, result);
643   }
644   if (SQLITE_OK != sqlite3_close (plugin->dbh))
645     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
646
647   GNUNET_free_non_null (plugin->fn);
648 }
649
650 /**
651  * Execute a prepared statement with a @a channel_key argument.
652  *
653  * @param plugin Plugin handle.
654  * @param stmt Statement to execute.
655  * @param channel_key Public key of the channel.
656  *
657  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
658  */
659 static int
660 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
661               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
662 {
663   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
664                                       sizeof (*channel_key), SQLITE_STATIC))
665   {
666     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
667                 "sqlite3_bind");
668   }
669   else if (SQLITE_DONE != sqlite3_step (stmt))
670   {
671     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
672                 "sqlite3_step");
673   }
674
675   if (SQLITE_OK != sqlite3_reset (stmt))
676   {
677     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
678                 "sqlite3_reset");
679     return GNUNET_SYSERR;
680   }
681
682   return GNUNET_OK;
683 }
684
685 /**
686  * Begin a transaction.
687  */
688 static int
689 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
690 {
691   sqlite3_stmt *stmt = plugin->transaction_begin;
692
693   if (SQLITE_DONE != sqlite3_step (stmt))
694   {
695     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
696                 "sqlite3_step");
697   }
698   if (SQLITE_OK != sqlite3_reset (stmt))
699   {
700     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
701                 "sqlite3_reset");
702     return GNUNET_SYSERR;
703   }
704
705   plugin->transaction = transaction;
706   return GNUNET_OK;
707 }
708
709
710 /**
711  * Commit current transaction.
712  */
713 static int
714 transaction_commit (struct Plugin *plugin)
715 {
716   sqlite3_stmt *stmt = plugin->transaction_commit;
717
718   if (SQLITE_DONE != sqlite3_step (stmt))
719   {
720     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
721                 "sqlite3_step");
722   }
723   if (SQLITE_OK != sqlite3_reset (stmt))
724   {
725     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
726                 "sqlite3_reset");
727     return GNUNET_SYSERR;
728   }
729
730   plugin->transaction = TRANSACTION_NONE;
731   return GNUNET_OK;
732 }
733
734
735 /**
736  * Roll back current transaction.
737  */
738 static int
739 transaction_rollback (struct Plugin *plugin)
740 {
741   sqlite3_stmt *stmt = plugin->transaction_rollback;
742
743   if (SQLITE_DONE != sqlite3_step (stmt))
744   {
745     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
746                 "sqlite3_step");
747   }
748   if (SQLITE_OK != sqlite3_reset (stmt))
749   {
750     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
751                 "sqlite3_reset");
752     return GNUNET_SYSERR;
753   }
754   plugin->transaction = TRANSACTION_NONE;
755   return GNUNET_OK;
756 }
757
758
759 static int
760 channel_key_store (struct Plugin *plugin,
761                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
762 {
763   sqlite3_stmt *stmt = plugin->insert_channel_key;
764
765   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
766                                       sizeof (*channel_key), SQLITE_STATIC))
767   {
768     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
769                 "sqlite3_bind");
770   }
771   else if (SQLITE_DONE != sqlite3_step (stmt))
772   {
773     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
774                 "sqlite3_step");
775   }
776
777   if (SQLITE_OK != sqlite3_reset (stmt))
778   {
779     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
780                 "sqlite3_reset");
781     return GNUNET_SYSERR;
782   }
783
784   return GNUNET_OK;
785 }
786
787
788 static int
789 slave_key_store (struct Plugin *plugin,
790                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
791 {
792   sqlite3_stmt *stmt = plugin->insert_slave_key;
793
794   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
795                                       sizeof (*slave_key), SQLITE_STATIC))
796   {
797     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
798                 "sqlite3_bind");
799   }
800   else if (SQLITE_DONE != sqlite3_step (stmt))
801   {
802     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
803                 "sqlite3_step");
804   }
805
806
807   if (SQLITE_OK != sqlite3_reset (stmt))
808   {
809     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
810                 "sqlite3_reset");
811     return GNUNET_SYSERR;
812   }
813
814   return GNUNET_OK;
815 }
816
817
818 /**
819  * Store join/leave events for a PSYC channel in order to be able to answer
820  * membership test queries later.
821  *
822  * @see GNUNET_PSYCSTORE_membership_store()
823  *
824  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
825  */
826 static int
827 membership_store (void *cls,
828                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
829                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
830                   int did_join,
831                   uint64_t announced_at,
832                   uint64_t effective_since,
833                   uint64_t group_generation)
834 {
835   struct Plugin *plugin = cls;
836   sqlite3_stmt *stmt = plugin->insert_membership;
837
838   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
839
840   if (announced_at > INT64_MAX ||
841       effective_since > INT64_MAX ||
842       group_generation > INT64_MAX)
843   {
844     GNUNET_break (0);
845     return GNUNET_SYSERR;
846   }
847
848   if (GNUNET_OK != channel_key_store (plugin, channel_key)
849       || GNUNET_OK != slave_key_store (plugin, slave_key))
850     return GNUNET_SYSERR;
851
852   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
853                                       sizeof (*channel_key), SQLITE_STATIC)
854       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
855                                          sizeof (*slave_key), SQLITE_STATIC)
856       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
857       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
858       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
859       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
860   {
861     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
862                 "sqlite3_bind");
863   }
864   else if (SQLITE_DONE != sqlite3_step (stmt))
865   {
866     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
867                 "sqlite3_step");
868   }
869
870   if (SQLITE_OK != sqlite3_reset (stmt))
871   {
872     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
873                 "sqlite3_reset");
874     return GNUNET_SYSERR;
875   }
876
877   return GNUNET_OK;
878 }
879
880 /**
881  * Test if a member was admitted to the channel at the given message ID.
882  *
883  * @see GNUNET_PSYCSTORE_membership_test()
884  *
885  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
886  *         #GNUNET_SYSERR if there was en error.
887  */
888 static int
889 membership_test (void *cls,
890                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
891                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
892                  uint64_t message_id)
893 {
894   struct Plugin *plugin = cls;
895   sqlite3_stmt *stmt = plugin->select_membership;
896   int ret = GNUNET_SYSERR;
897
898   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
899                                       sizeof (*channel_key), SQLITE_STATIC)
900       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
901                                          sizeof (*slave_key), SQLITE_STATIC)
902       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
903   {
904     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
905                 "sqlite3_bind");
906   }
907   else
908   {
909     switch (sqlite3_step (stmt))
910     {
911     case SQLITE_DONE:
912       ret = GNUNET_NO;
913       break;
914     case SQLITE_ROW:
915       ret = GNUNET_YES;
916     }
917   }
918
919   if (SQLITE_OK != sqlite3_reset (stmt))
920   {
921     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
922                 "sqlite3_reset");
923   }
924
925   return ret;
926 }
927
928 /**
929  * Store a message fragment sent to a channel.
930  *
931  * @see GNUNET_PSYCSTORE_fragment_store()
932  *
933  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
934  */
935 static int
936 fragment_store (void *cls,
937                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
938                 const struct GNUNET_MULTICAST_MessageHeader *msg,
939                 uint32_t psycstore_flags)
940 {
941   struct Plugin *plugin = cls;
942   sqlite3_stmt *stmt = plugin->insert_fragment;
943
944   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
945
946   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
947   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
948   uint64_t message_id = GNUNET_ntohll (msg->message_id);
949   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
950
951   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
952       message_id > INT64_MAX || group_generation > INT64_MAX)
953   {
954     LOG (GNUNET_ERROR_TYPE_ERROR,
955          "Tried to store fragment with a field > INT64_MAX: "
956          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
957          message_id, group_generation);
958     GNUNET_break (0);
959     return GNUNET_SYSERR;
960   }
961
962   if (GNUNET_OK != channel_key_store (plugin, channel_key))
963     return GNUNET_SYSERR;
964
965   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
966                                       sizeof (*channel_key), SQLITE_STATIC)
967       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
968       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
969                                          sizeof (msg->signature), SQLITE_STATIC)
970       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
971                                          sizeof (msg->purpose), SQLITE_STATIC)
972       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
973       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
974       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
975       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
976       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
977       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
978       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
979                                          ntohs (msg->header.size)
980                                          - sizeof (*msg), SQLITE_STATIC))
981   {
982     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
983                 "sqlite3_bind");
984   }
985   else if (SQLITE_DONE != sqlite3_step (stmt))
986   {
987     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
988                 "sqlite3_step");
989   }
990
991   if (SQLITE_OK != sqlite3_reset (stmt))
992   {
993     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
994                 "sqlite3_reset");
995     return GNUNET_SYSERR;
996   }
997
998   return GNUNET_OK;
999 }
1000
1001 /**
1002  * Set additional flags for a given message.
1003  *
1004  * They are OR'd with any existing flags set.
1005  *
1006  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1007  */
1008 static int
1009 message_add_flags (void *cls,
1010                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1011                    uint64_t message_id,
1012                    uint64_t psycstore_flags)
1013 {
1014   struct Plugin *plugin = cls;
1015   sqlite3_stmt *stmt = plugin->update_message_flags;
1016   int ret = GNUNET_SYSERR;
1017
1018   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1019       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1020                                          sizeof (*channel_key), SQLITE_STATIC)
1021       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1022   {
1023     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1024                 "sqlite3_bind");
1025   }
1026   else
1027   {
1028     switch (sqlite3_step (stmt))
1029     {
1030     case SQLITE_DONE:
1031       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1032       break;
1033     default:
1034       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1035                   "sqlite3_step");
1036     }
1037   }
1038
1039   if (SQLITE_OK != sqlite3_reset (stmt))
1040   {
1041     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1042                 "sqlite3_reset");
1043     return GNUNET_SYSERR;
1044   }
1045
1046   return ret;
1047 }
1048
1049 static int
1050 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1051               void *cb_cls)
1052 {
1053   int data_size = sqlite3_column_bytes (stmt, 9);
1054   struct GNUNET_MULTICAST_MessageHeader *msg
1055     = GNUNET_malloc (sizeof (*msg) + data_size);
1056
1057   msg->header.size = htons (sizeof (*msg) + data_size);
1058   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1059   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1060   memcpy (&msg->signature,
1061           sqlite3_column_blob (stmt, 1),
1062           sqlite3_column_bytes (stmt, 1));
1063   memcpy (&msg->purpose,
1064           sqlite3_column_blob (stmt, 2),
1065           sqlite3_column_bytes (stmt, 2));
1066   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1067   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1068   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1069   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1070   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1071   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1072
1073   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1074 }
1075
1076
1077 static int
1078 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1079                  uint64_t *returned_fragments,
1080                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1081 {
1082   int ret = GNUNET_SYSERR;
1083   int sql_ret;
1084
1085   do
1086   {
1087     sql_ret = sqlite3_step (stmt);
1088     switch (sql_ret)
1089     {
1090     case SQLITE_DONE:
1091       if (ret != GNUNET_OK)
1092         ret = GNUNET_NO;
1093       break;
1094     case SQLITE_ROW:
1095       ret = fragment_row (stmt, cb, cb_cls);
1096       (*returned_fragments)++;
1097       if (ret != GNUNET_YES)
1098         sql_ret = SQLITE_DONE;
1099       break;
1100     default:
1101       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1102                   "sqlite3_step");
1103     }
1104   }
1105   while (sql_ret == SQLITE_ROW);
1106
1107   return ret;
1108 }
1109
1110 /**
1111  * Retrieve a message fragment range by fragment ID.
1112  *
1113  * @see GNUNET_PSYCSTORE_fragment_get()
1114  *
1115  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1116  */
1117 static int
1118 fragment_get (void *cls,
1119               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1120               uint64_t first_fragment_id,
1121               uint64_t last_fragment_id,
1122               uint64_t *returned_fragments,
1123               GNUNET_PSYCSTORE_FragmentCallback cb,
1124               void *cb_cls)
1125 {
1126   struct Plugin *plugin = cls;
1127   sqlite3_stmt *stmt = plugin->select_fragments;
1128   int ret = GNUNET_SYSERR;
1129   *returned_fragments = 0;
1130
1131   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1132                                       sizeof (*channel_key),
1133                                       SQLITE_STATIC)
1134       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1135       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1136   {
1137     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1138                 "sqlite3_bind");
1139   }
1140   else
1141   {
1142     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1143   }
1144
1145   if (SQLITE_OK != sqlite3_reset (stmt))
1146   {
1147     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1148                 "sqlite3_reset");
1149   }
1150
1151   return ret;
1152 }
1153
1154
1155 /**
1156  * Retrieve a message fragment range by fragment ID.
1157  *
1158  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1159  *
1160  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1161  */
1162 static int
1163 fragment_get_latest (void *cls,
1164                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1165                      uint64_t fragment_limit,
1166                      uint64_t *returned_fragments,
1167                      GNUNET_PSYCSTORE_FragmentCallback cb,
1168                      void *cb_cls)
1169 {
1170   struct Plugin *plugin = cls;
1171   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1172   int ret = GNUNET_SYSERR;
1173   *returned_fragments = 0;
1174
1175   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1176                                       sizeof (*channel_key),
1177                                       SQLITE_STATIC)
1178       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1179   {
1180     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1181                 "sqlite3_bind");
1182   }
1183   else
1184   {
1185     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1186   }
1187
1188   if (SQLITE_OK != sqlite3_reset (stmt))
1189   {
1190     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1191                 "sqlite3_reset");
1192   }
1193
1194   return ret;
1195 }
1196
1197
1198 /**
1199  * Retrieve all fragments of a message ID range.
1200  *
1201  * @see GNUNET_PSYCSTORE_message_get()
1202  *
1203  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1204  */
1205 static int
1206 message_get (void *cls,
1207              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1208              uint64_t first_message_id,
1209              uint64_t last_message_id,
1210              uint64_t *returned_fragments,
1211              GNUNET_PSYCSTORE_FragmentCallback cb,
1212              void *cb_cls)
1213 {
1214   struct Plugin *plugin = cls;
1215   sqlite3_stmt *stmt = plugin->select_messages;
1216   int ret = GNUNET_SYSERR;
1217   *returned_fragments = 0;
1218
1219   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1220                                       sizeof (*channel_key),
1221                                       SQLITE_STATIC)
1222       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1223       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1224   {
1225     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1226                 "sqlite3_bind");
1227   }
1228   else
1229   {
1230     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1231   }
1232
1233   if (SQLITE_OK != sqlite3_reset (stmt))
1234   {
1235     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1236                 "sqlite3_reset");
1237   }
1238
1239   return ret;
1240 }
1241
1242
1243 /**
1244  * Retrieve all fragments of the latest messages.
1245  *
1246  * @see GNUNET_PSYCSTORE_message_get_latest()
1247  *
1248  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1249  */
1250 static int
1251 message_get_latest (void *cls,
1252                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1253                     uint64_t message_limit,
1254                     uint64_t *returned_fragments,
1255                     GNUNET_PSYCSTORE_FragmentCallback cb,
1256                     void *cb_cls)
1257 {
1258   struct Plugin *plugin = cls;
1259   sqlite3_stmt *stmt = plugin->select_latest_messages;
1260   int ret = GNUNET_SYSERR;
1261   *returned_fragments = 0;
1262
1263   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1264                                       sizeof (*channel_key),
1265                                       SQLITE_STATIC)
1266       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1267                                          sizeof (*channel_key),
1268                                          SQLITE_STATIC)
1269       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1270   {
1271     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1272                 "sqlite3_bind");
1273   }
1274   else
1275   {
1276     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1277   }
1278
1279   if (SQLITE_OK != sqlite3_reset (stmt))
1280   {
1281     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1282                 "sqlite3_reset");
1283   }
1284
1285   return ret;
1286 }
1287
1288
1289 /**
1290  * Retrieve a fragment of message specified by its message ID and fragment
1291  * offset.
1292  *
1293  * @see GNUNET_PSYCSTORE_message_get_fragment()
1294  *
1295  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1296  */
1297 static int
1298 message_get_fragment (void *cls,
1299                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1300                       uint64_t message_id,
1301                       uint64_t fragment_offset,
1302                       GNUNET_PSYCSTORE_FragmentCallback cb,
1303                       void *cb_cls)
1304 {
1305   struct Plugin *plugin = cls;
1306   sqlite3_stmt *stmt = plugin->select_message_fragment;
1307   int ret = GNUNET_SYSERR;
1308
1309   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1310                                       sizeof (*channel_key),
1311                                       SQLITE_STATIC)
1312       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1313       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1314   {
1315     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1316                 "sqlite3_bind");
1317   }
1318   else
1319   {
1320     switch (sqlite3_step (stmt))
1321     {
1322     case SQLITE_DONE:
1323       ret = GNUNET_NO;
1324       break;
1325     case SQLITE_ROW:
1326       ret = fragment_row (stmt, cb, cb_cls);
1327       break;
1328     default:
1329       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1330                   "sqlite3_step");
1331     }
1332   }
1333
1334   if (SQLITE_OK != sqlite3_reset (stmt))
1335   {
1336     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1337                 "sqlite3_reset");
1338   }
1339
1340   return ret;
1341 }
1342
1343 /**
1344  * Retrieve the max. values of message counters for a channel.
1345  *
1346  * @see GNUNET_PSYCSTORE_counters_get()
1347  *
1348  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1349  */
1350 static int
1351 counters_message_get (void *cls,
1352                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1353                       uint64_t *max_fragment_id,
1354                       uint64_t *max_message_id,
1355                       uint64_t *max_group_generation)
1356 {
1357   struct Plugin *plugin = cls;
1358   sqlite3_stmt *stmt = plugin->select_counters_message;
1359   int ret = GNUNET_SYSERR;
1360
1361   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1362                                       sizeof (*channel_key),
1363                                       SQLITE_STATIC))
1364   {
1365     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1366                 "sqlite3_bind");
1367   }
1368   else
1369   {
1370     switch (sqlite3_step (stmt))
1371     {
1372     case SQLITE_DONE:
1373       ret = GNUNET_NO;
1374       break;
1375     case SQLITE_ROW:
1376       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1377       *max_message_id = sqlite3_column_int64 (stmt, 1);
1378       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1379       ret = GNUNET_OK;
1380       break;
1381     default:
1382       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1383                   "sqlite3_step");
1384     }
1385   }
1386
1387   if (SQLITE_OK != sqlite3_reset (stmt))
1388   {
1389     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1390                 "sqlite3_reset");
1391   }
1392
1393   return ret;
1394 }
1395
1396 /**
1397  * Retrieve the max. values of state counters for a channel.
1398  *
1399  * @see GNUNET_PSYCSTORE_counters_get()
1400  *
1401  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1402  */
1403 static int
1404 counters_state_get (void *cls,
1405                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1406                     uint64_t *max_state_message_id)
1407 {
1408   struct Plugin *plugin = cls;
1409   sqlite3_stmt *stmt = plugin->select_counters_state;
1410   int ret = GNUNET_SYSERR;
1411
1412   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1413                                       sizeof (*channel_key),
1414                                       SQLITE_STATIC))
1415   {
1416     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1417                 "sqlite3_bind");
1418   }
1419   else
1420   {
1421     switch (sqlite3_step (stmt))
1422     {
1423     case SQLITE_DONE:
1424       ret = GNUNET_NO;
1425       break;
1426     case SQLITE_ROW:
1427       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1428       ret = GNUNET_OK;
1429       break;
1430     default:
1431       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1432                   "sqlite3_step");
1433     }
1434   }
1435
1436   if (SQLITE_OK != sqlite3_reset (stmt))
1437   {
1438     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1439                 "sqlite3_reset");
1440   }
1441
1442   return ret;
1443 }
1444
1445
1446 /**
1447  * Set a state variable to the given value.
1448  *
1449  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1450  */
1451 static int
1452 state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1453            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1454            const char *name, const void *value, size_t value_size)
1455 {
1456   int ret = GNUNET_SYSERR;
1457
1458   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1459                                       sizeof (*channel_key), SQLITE_STATIC)
1460       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1461       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1462                                          SQLITE_STATIC))
1463   {
1464     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1465                 "sqlite3_bind");
1466   }
1467   else
1468   {
1469     switch (sqlite3_step (stmt))
1470     {
1471     case SQLITE_DONE:
1472       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1473       break;
1474     default:
1475       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1476                   "sqlite3_step");
1477     }
1478   }
1479
1480   if (SQLITE_OK != sqlite3_reset (stmt))
1481   {
1482     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1483                 "sqlite3_reset");
1484     return GNUNET_SYSERR;
1485   }
1486
1487   return ret;
1488 }
1489
1490
1491 static int
1492 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1493                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1494                    uint64_t message_id)
1495 {
1496   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1497       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1498                                          sizeof (*channel_key), SQLITE_STATIC))
1499   {
1500     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1501                 "sqlite3_bind");
1502   }
1503   else if (SQLITE_DONE != sqlite3_step (stmt))
1504   {
1505     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1506                 "sqlite3_step");
1507   }
1508   if (SQLITE_OK != sqlite3_reset (stmt))
1509   {
1510     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1511                 "sqlite3_reset");
1512     return GNUNET_SYSERR;
1513   }
1514   return GNUNET_OK;
1515 }
1516
1517
1518 /**
1519  * Begin modifying current state.
1520  */
1521 static int
1522 state_modify_begin (void *cls,
1523                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1524                     uint64_t message_id, uint64_t state_delta)
1525 {
1526   struct Plugin *plugin = cls;
1527   sqlite3_stmt *stmt = plugin->select_message_state_delta;
1528
1529   if (state_delta > 0)
1530   {
1531     int ret = GNUNET_SYSERR;
1532     if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1533                                         sizeof (*channel_key), SQLITE_STATIC)
1534         || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1535                                             message_id - state_delta)
1536         || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1537                                             message_id)
1538         || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1539                                             message_id - state_delta)
1540         || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1541                                             GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1542     {
1543       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1544                   "sqlite3_bind");
1545     }
1546     else
1547     {
1548       switch (sqlite3_step (stmt))
1549       {
1550       case SQLITE_DONE:
1551         ret = GNUNET_NO;
1552         break;
1553       case SQLITE_ROW:
1554         ret = GNUNET_OK;
1555         break;
1556       default:
1557         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1558                     "sqlite3_step");
1559       }
1560     }
1561     if (SQLITE_OK != sqlite3_reset (stmt))
1562     {
1563       ret = GNUNET_SYSERR;
1564       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1565                   "sqlite3_reset");
1566      }
1567     if (GNUNET_OK != ret)
1568       return ret;
1569   }
1570
1571   if (TRANSACTION_NONE != plugin->transaction)
1572       if (GNUNET_OK != transaction_rollback (plugin))
1573           return GNUNET_SYSERR;
1574
1575   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1576 }
1577
1578
1579 /**
1580  * Set the current value of state variable.
1581  *
1582  * @see GNUNET_PSYCSTORE_state_modify()
1583  *
1584  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1585  */
1586 static int
1587 state_modify_set (void *cls,
1588                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1589                   const char *name, const void *value, size_t value_size)
1590 {
1591   struct Plugin *plugin = cls;
1592   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1593
1594   return state_set (plugin, plugin->insert_state_current, channel_key,
1595                     name, value, value_size);
1596
1597 }
1598
1599
1600 /**
1601  * End modifying current state.
1602  */
1603 static int
1604 state_modify_end (void *cls,
1605                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1606                   uint64_t message_id)
1607 {
1608   struct Plugin *plugin = cls;
1609   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1610
1611   return
1612     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1613     && GNUNET_OK == update_message_id (plugin,
1614                                        plugin->update_max_state_message_id,
1615                                        channel_key, message_id)
1616     && GNUNET_OK == transaction_commit (plugin)
1617     ? GNUNET_OK : GNUNET_SYSERR;
1618 }
1619
1620
1621 /**
1622  * Begin state synchronization.
1623  */
1624 static int
1625 state_sync_begin (void *cls,
1626                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1627 {
1628   struct Plugin *plugin = cls;
1629   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1630 }
1631
1632
1633 /**
1634  * Set the current value of state variable.
1635  *
1636  * @see GNUNET_PSYCSTORE_state_modify()
1637  *
1638  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1639  */
1640 static int
1641 state_sync_set (void *cls,
1642                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1643                 const char *name, const void *value, size_t value_size)
1644 {
1645   struct Plugin *plugin = cls;
1646   return state_set (cls, plugin->insert_state_sync, channel_key,
1647                     name, value, value_size);
1648 }
1649
1650
1651 /**
1652  * End modifying current state.
1653  */
1654 static int
1655 state_sync_end (void *cls,
1656                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1657                 uint64_t message_id)
1658 {
1659   struct Plugin *plugin = cls;
1660   int ret = GNUNET_SYSERR;
1661
1662   GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1663     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1664     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1665                                   channel_key)
1666     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1667                                   channel_key)
1668     && GNUNET_OK == update_message_id (plugin,
1669                                        plugin->update_state_hash_message_id,
1670                                        channel_key, message_id)
1671     && GNUNET_OK == transaction_commit (plugin)
1672     ? ret = GNUNET_OK
1673     : transaction_rollback (plugin);
1674   return ret;
1675 }
1676
1677
1678 /**
1679  * Reset the state of a channel.
1680  *
1681  * @see GNUNET_PSYCSTORE_state_reset()
1682  *
1683  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1684  */
1685 static int
1686 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1687 {
1688   struct Plugin *plugin = cls;
1689   return exec_channel (plugin, plugin->delete_state, channel_key);
1690 }
1691
1692
1693 /**
1694  * Update signed values of state variables in the state store.
1695  *
1696  * @see GNUNET_PSYCSTORE_state_hash_update()
1697  *
1698  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1699  */
1700 static int
1701 state_update_signed (void *cls,
1702                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1703 {
1704   struct Plugin *plugin = cls;
1705   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1706 }
1707
1708
1709 /**
1710  * Retrieve a state variable by name.
1711  *
1712  * @see GNUNET_PSYCSTORE_state_get()
1713  *
1714  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1715  */
1716 static int
1717 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1718            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1719 {
1720   struct Plugin *plugin = cls;
1721   int ret = GNUNET_SYSERR;
1722
1723   sqlite3_stmt *stmt = plugin->select_state_one;
1724
1725   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1726                                       sizeof (*channel_key),
1727                                       SQLITE_STATIC)
1728       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1729   {
1730     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1731                 "sqlite3_bind");
1732   }
1733   else
1734   {
1735     switch (sqlite3_step (stmt))
1736     {
1737     case SQLITE_DONE:
1738       ret = GNUNET_NO;
1739       break;
1740     case SQLITE_ROW:
1741       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1742                 sqlite3_column_bytes (stmt, 0));
1743       break;
1744     default:
1745       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1746                   "sqlite3_step");
1747     }
1748   }
1749
1750   if (SQLITE_OK != sqlite3_reset (stmt))
1751   {
1752     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1753                 "sqlite3_reset");
1754   }
1755
1756   return ret;
1757 }
1758
1759
1760 /**
1761  * Retrieve all state variables for a channel with the given prefix.
1762  *
1763  * @see GNUNET_PSYCSTORE_state_get_prefix()
1764  *
1765  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1766  */
1767 static int
1768 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1769                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1770                   void *cb_cls)
1771 {
1772   struct Plugin *plugin = cls;
1773   int ret = GNUNET_SYSERR;
1774
1775   sqlite3_stmt *stmt = plugin->select_state_prefix;
1776   size_t name_len = strlen (name);
1777   char *name_prefix = GNUNET_malloc (name_len + 2);
1778   memcpy (name_prefix, name, name_len);
1779   memcpy (name_prefix + name_len, "_%", 2);
1780
1781   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1782                                       sizeof (*channel_key), SQLITE_STATIC)
1783       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1784       || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1785                                          SQLITE_STATIC))
1786   {
1787     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1788                 "sqlite3_bind");
1789   }
1790   else
1791   {
1792     int sql_ret;
1793     do
1794     {
1795       sql_ret = sqlite3_step (stmt);
1796       switch (sql_ret)
1797       {
1798       case SQLITE_DONE:
1799         if (ret != GNUNET_OK)
1800           ret = GNUNET_NO;
1801         break;
1802       case SQLITE_ROW:
1803         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1804                   sqlite3_column_blob (stmt, 1),
1805                   sqlite3_column_bytes (stmt, 1));
1806         if (ret != GNUNET_YES)
1807           sql_ret = SQLITE_DONE;
1808         break;
1809       default:
1810         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1811                     "sqlite3_step");
1812       }
1813     }
1814     while (sql_ret == SQLITE_ROW);
1815   }
1816
1817   if (SQLITE_OK != sqlite3_reset (stmt))
1818   {
1819     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1820                 "sqlite3_reset");
1821   }
1822
1823   return ret;
1824 }
1825
1826
1827 /**
1828  * Retrieve all signed state variables for a channel.
1829  *
1830  * @see GNUNET_PSYCSTORE_state_get_signed()
1831  *
1832  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1833  */
1834 static int
1835 state_get_signed (void *cls,
1836                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1837                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1838 {
1839   struct Plugin *plugin = cls;
1840   int ret = GNUNET_SYSERR;
1841
1842   sqlite3_stmt *stmt = plugin->select_state_signed;
1843
1844   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1845                                       sizeof (*channel_key), SQLITE_STATIC))
1846   {
1847     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1848                 "sqlite3_bind");
1849   }
1850   else
1851   {
1852     int sql_ret;
1853     do
1854     {
1855       sql_ret = sqlite3_step (stmt);
1856       switch (sql_ret)
1857       {
1858       case SQLITE_DONE:
1859         if (ret != GNUNET_OK)
1860           ret = GNUNET_NO;
1861         break;
1862       case SQLITE_ROW:
1863         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1864                   sqlite3_column_blob (stmt, 1),
1865                   sqlite3_column_bytes (stmt, 1));
1866         if (ret != GNUNET_YES)
1867           sql_ret = SQLITE_DONE;
1868         break;
1869       default:
1870         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1871                     "sqlite3_step");
1872       }
1873     }
1874     while (sql_ret == SQLITE_ROW);
1875   }
1876
1877   if (SQLITE_OK != sqlite3_reset (stmt))
1878   {
1879     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1880                 "sqlite3_reset");
1881   }
1882
1883   return ret;
1884 }
1885
1886
1887 /**
1888  * Entry point for the plugin.
1889  *
1890  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1891  * @return NULL on error, otherwise the plugin context
1892  */
1893 void *
1894 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1895 {
1896   static struct Plugin plugin;
1897   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1898   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1899
1900   if (NULL != plugin.cfg)
1901     return NULL;                /* can only initialize once! */
1902   memset (&plugin, 0, sizeof (struct Plugin));
1903   plugin.cfg = cfg;
1904   if (GNUNET_OK != database_setup (&plugin))
1905   {
1906     database_shutdown (&plugin);
1907     return NULL;
1908   }
1909   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1910   api->cls = &plugin;
1911   api->membership_store = &membership_store;
1912   api->membership_test = &membership_test;
1913   api->fragment_store = &fragment_store;
1914   api->message_add_flags = &message_add_flags;
1915   api->fragment_get = &fragment_get;
1916   api->fragment_get_latest = &fragment_get_latest;
1917   api->message_get = &message_get;
1918   api->message_get_latest = &message_get_latest;
1919   api->message_get_fragment = &message_get_fragment;
1920   api->counters_message_get = &counters_message_get;
1921   api->counters_state_get = &counters_state_get;
1922   api->state_modify_begin = &state_modify_begin;
1923   api->state_modify_set = &state_modify_set;
1924   api->state_modify_end = &state_modify_end;
1925   api->state_sync_begin = &state_sync_begin;
1926   api->state_sync_set = &state_sync_set;
1927   api->state_sync_end = &state_sync_end;
1928   api->state_reset = &state_reset;
1929   api->state_update_signed = &state_update_signed;
1930   api->state_get = &state_get;
1931   api->state_get_prefix = &state_get_prefix;
1932   api->state_get_signed = &state_get_signed;
1933
1934   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1935   return api;
1936 }
1937
1938
1939 /**
1940  * Exit point from the plugin.
1941  *
1942  * @param cls The plugin context (as returned by "init")
1943  * @return Always NULL
1944  */
1945 void *
1946 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1947 {
1948   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1949   struct Plugin *plugin = api->cls;
1950
1951   database_shutdown (plugin);
1952   plugin->cfg = NULL;
1953   GNUNET_free (api);
1954   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1955   return NULL;
1956 }
1957
1958 /* end of plugin_psycstore_sqlite.c */