542c4bfc9ef21af911f39ab1a2ced675964f0832
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
39 #include <sqlite3.h>
40
41 /**
42  * After how many ms "busy" should a DB operation fail for good?  A
43  * low value makes sure that we are more responsive to requests
44  * (especially PUTs).  A high value guarantees a higher success rate
45  * (SELECTs in iterate can take several seconds despite LIMIT=1).
46  *
47  * The default value of 1s should ensure that users do not experience
48  * huge latencies while at the same time allowing operations to
49  * succeed with reasonable probability.
50  */
51 #define BUSY_TIMEOUT_MS 1000
52
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54
55 /**
56  * Log an error message at log-level 'level' that indicates
57  * a failure of the command 'cmd' on file 'filename'
58  * with the message given by strerror(errno).
59  */
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
61
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
63
64 enum Transactions {
65   TRANSACTION_NONE = 0,
66   TRANSACTION_STATE_MODIFY
67 };
68
69 /**
70  * Context for all functions in this plugin.
71  */
72 struct Plugin
73 {
74
75   const struct GNUNET_CONFIGURATION_Handle *cfg;
76
77   /**
78    * Database filename.
79    */
80   char *fn;
81
82   /**
83    * Native SQLite database handle.
84    */
85   sqlite3 *dbh;
86
87   /**
88    * Current transaction.
89    */
90   enum Transactions transaction;
91
92   sqlite3_stmt *transaction_begin;
93
94   sqlite3_stmt *transaction_commit;
95
96   sqlite3_stmt *transaction_rollback;
97
98   /**
99    * Precompiled SQL for channel_key_store()
100    */
101   sqlite3_stmt *insert_channel_key;
102
103   /**
104    * Precompiled SQL for slave_key_store()
105    */
106   sqlite3_stmt *insert_slave_key;
107
108
109   /**
110    * Precompiled SQL for membership_store()
111    */
112   sqlite3_stmt *insert_membership;
113
114   /**
115    * Precompiled SQL for membership_test()
116    */
117   sqlite3_stmt *select_membership;
118
119
120   /**
121    * Precompiled SQL for fragment_store()
122    */
123   sqlite3_stmt *insert_fragment;
124
125   /**
126    * Precompiled SQL for message_add_flags()
127    */
128   sqlite3_stmt *update_message_flags;
129
130   /**
131    * Precompiled SQL for fragment_get()
132    */
133   sqlite3_stmt *select_fragments;
134
135   /**
136    * Precompiled SQL for fragment_get()
137    */
138   sqlite3_stmt *select_latest_fragments;
139
140   /**
141    * Precompiled SQL for message_get()
142    */
143   sqlite3_stmt *select_messages;
144
145   /**
146    * Precompiled SQL for message_get()
147    */
148   sqlite3_stmt *select_latest_messages;
149
150   /**
151    * Precompiled SQL for message_get_fragment()
152    */
153   sqlite3_stmt *select_message_fragment;
154
155   /**
156    * Precompiled SQL for counters_get_message()
157    */
158   sqlite3_stmt *select_counters_message;
159
160   /**
161    * Precompiled SQL for counters_get_state()
162    */
163   sqlite3_stmt *select_counters_state;
164
165   /**
166    * Precompiled SQL for state_modify_end()
167    */
168   sqlite3_stmt *update_state_hash_message_id;
169
170   /**
171    * Precompiled SQL for state_sync_end()
172    */
173   sqlite3_stmt *update_max_state_message_id;
174
175
176   /**
177    * Precompiled SQL for message_modify_begin()
178    */
179   sqlite3_stmt *select_message_state_delta;
180
181   /**
182    * Precompiled SQL for state_modify_set()
183    */
184   sqlite3_stmt *insert_state_current;
185
186   /**
187    * Precompiled SQL for state_modify_end()
188    */
189   sqlite3_stmt *delete_state_empty;
190
191   /**
192    * Precompiled SQL for state_set_signed()
193    */
194   sqlite3_stmt *update_state_signed;
195
196   /**
197    * Precompiled SQL for state_sync()
198    */
199   sqlite3_stmt *insert_state_sync;
200
201   /**
202    * Precompiled SQL for state_sync()
203    */
204   sqlite3_stmt *delete_state;
205
206   /**
207    * Precompiled SQL for state_sync()
208    */
209   sqlite3_stmt *insert_state_from_sync;
210
211   /**
212    * Precompiled SQL for state_sync()
213    */
214   sqlite3_stmt *delete_state_sync;
215
216   /**
217    * Precompiled SQL for state_get_signed()
218    */
219   sqlite3_stmt *select_state_signed;
220
221   /**
222    * Precompiled SQL for state_get()
223    */
224   sqlite3_stmt *select_state_one;
225
226   /**
227    * Precompiled SQL for state_get_prefix()
228    */
229   sqlite3_stmt *select_state_prefix;
230
231 };
232
233 #if DEBUG_PSYCSTORE
234
235 static void
236 sql_trace (void *cls, const char *sql)
237 {
238   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
239 }
240
241 #endif
242
243 /**
244  * @brief Prepare a SQL statement
245  *
246  * @param dbh handle to the database
247  * @param sql SQL statement, UTF-8 encoded
248  * @param stmt set to the prepared statement
249  * @return 0 on success
250  */
251 static int
252 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
253 {
254   char *tail;
255   int result;
256
257   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
258                                (const char **) &tail);
259   LOG (GNUNET_ERROR_TYPE_DEBUG,
260        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
261   if (result != SQLITE_OK)
262     LOG (GNUNET_ERROR_TYPE_ERROR,
263          _("Error preparing SQL query: %s\n  %s\n"),
264          sqlite3_errmsg (dbh), sql);
265   return result;
266 }
267
268
269 /**
270  * @brief Prepare a SQL statement
271  *
272  * @param dbh handle to the database
273  * @param sql SQL statement, UTF-8 encoded
274  * @return 0 on success
275  */
276 static int
277 sql_exec (sqlite3 *dbh, const char *sql)
278 {
279   int result;
280
281   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
282   LOG (GNUNET_ERROR_TYPE_DEBUG,
283        "Executed `%s' / %d\n", sql, result);
284   if (result != SQLITE_OK)
285     LOG (GNUNET_ERROR_TYPE_ERROR,
286          _("Error executing SQL query: %s\n  %s\n"),
287          sqlite3_errmsg (dbh), sql);
288   return result;
289 }
290
291
292 /**
293  * Initialize the database connections and associated
294  * data structures (create tables and indices
295  * as needed as well).
296  *
297  * @param plugin the plugin context (state for this module)
298  * @return GNUNET_OK on success
299  */
300 static int
301 database_setup (struct Plugin *plugin)
302 {
303   char *filename;
304
305   if (GNUNET_OK !=
306       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
307                                                "FILENAME", &filename))
308   {
309     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
310                                "psycstore-sqlite", "FILENAME");
311     return GNUNET_SYSERR;
312   }
313   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
314   {
315     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
316     {
317       GNUNET_break (0);
318       GNUNET_free (filename);
319       return GNUNET_SYSERR;
320     }
321   }
322   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
323   plugin->fn = filename;
324
325   /* Open database and precompile statements */
326   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
327   {
328     LOG (GNUNET_ERROR_TYPE_ERROR,
329          _("Unable to initialize SQLite: %s.\n"),
330          sqlite3_errmsg (plugin->dbh));
331     return GNUNET_SYSERR;
332   }
333
334 #if DEBUG_PSYCSTORE
335   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
336 #endif
337
338   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
339   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
340   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
341   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
342   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
343 #if ! DEBUG_PSYCSTORE
344   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
345 #endif
346   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
347
348   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
349
350   /* Create tables */
351
352   sql_exec (plugin->dbh,
353             "CREATE TABLE IF NOT EXISTS channels (\n"
354             "  id INTEGER PRIMARY KEY,\n"
355             "  pub_key BLOB UNIQUE,\n"
356             "  max_state_message_id INTEGER,\n"
357             "  state_hash_message_id INTEGER\n"
358             ");");
359
360   sql_exec (plugin->dbh,
361             "CREATE TABLE IF NOT EXISTS slaves (\n"
362             "  id INTEGER PRIMARY KEY,\n"
363             "  pub_key BLOB UNIQUE\n"
364             ");");
365
366   sql_exec (plugin->dbh,
367             "CREATE TABLE IF NOT EXISTS membership (\n"
368             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
369             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
370             "  did_join INTEGER NOT NULL,\n"
371             "  announced_at INTEGER NOT NULL,\n"
372             "  effective_since INTEGER NOT NULL,\n"
373             "  group_generation INTEGER NOT NULL\n"
374             ");");
375   sql_exec (plugin->dbh,
376             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
377             "ON membership (channel_id, slave_id);");
378
379   /** @todo messages table: add method_name column */
380   sql_exec (plugin->dbh,
381             "CREATE TABLE IF NOT EXISTS messages (\n"
382             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
383             "  hop_counter INTEGER NOT NULL,\n"
384             "  signature BLOB,\n"
385             "  purpose BLOB,\n"
386             "  fragment_id INTEGER NOT NULL,\n"
387             "  fragment_offset INTEGER NOT NULL,\n"
388             "  message_id INTEGER NOT NULL,\n"
389             "  group_generation INTEGER NOT NULL,\n"
390             "  multicast_flags INTEGER NOT NULL,\n"
391             "  psycstore_flags INTEGER NOT NULL,\n"
392             "  data BLOB,\n"
393             "  PRIMARY KEY (channel_id, fragment_id),\n"
394             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
395             ");");
396
397   sql_exec (plugin->dbh,
398             "CREATE TABLE IF NOT EXISTS state (\n"
399             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
400             "  name TEXT NOT NULL,\n"
401             "  value_current BLOB,\n"
402             "  value_signed BLOB,\n"
403             "  PRIMARY KEY (channel_id, name)\n"
404             ");");
405
406   sql_exec (plugin->dbh,
407             "CREATE TABLE IF NOT EXISTS state_sync (\n"
408             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
409             "  name TEXT NOT NULL,\n"
410             "  value BLOB,\n"
411             "  PRIMARY KEY (channel_id, name)\n"
412             ");");
413
414   /* Prepare statements */
415
416   sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
417
418   sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
419
420   sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
421
422   sql_prepare (plugin->dbh,
423                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
424                &plugin->insert_channel_key);
425
426   sql_prepare (plugin->dbh,
427                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
428                &plugin->insert_slave_key);
429
430   sql_prepare (plugin->dbh,
431                "INSERT INTO membership\n"
432                " (channel_id, slave_id, did_join, announced_at,\n"
433                "  effective_since, group_generation)\n"
434                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
435                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
436                "        ?, ?, ?, ?);",
437                &plugin->insert_membership);
438
439   sql_prepare (plugin->dbh,
440                "SELECT did_join FROM membership\n"
441                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
442                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
443                "      AND effective_since <= ? AND did_join = 1\n"
444                "ORDER BY announced_at DESC LIMIT 1;",
445                &plugin->select_membership);
446
447   sql_prepare (plugin->dbh,
448                "INSERT OR IGNORE INTO messages\n"
449                " (channel_id, hop_counter, signature, purpose,\n"
450                "  fragment_id, fragment_offset, message_id,\n"
451                "  group_generation, multicast_flags, psycstore_flags, data)\n"
452                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
453                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
454                &plugin->insert_fragment);
455
456   sql_prepare (plugin->dbh,
457                "UPDATE messages\n"
458                "SET psycstore_flags = psycstore_flags | ?\n"
459                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
460                "      AND message_id = ? AND fragment_offset = 0;",
461                &plugin->update_message_flags);
462
463   sql_prepare (plugin->dbh,
464                "SELECT hop_counter, signature, purpose, fragment_id,\n"
465                "       fragment_offset, message_id, group_generation,\n"
466                "       multicast_flags, psycstore_flags, data\n"
467                "FROM messages\n"
468                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
469                "      AND ? <= fragment_id AND fragment_id <= ?;",
470                &plugin->select_fragments);
471
472   /** @todo select_messages: add method_prefix filter */
473   sql_prepare (plugin->dbh,
474                "SELECT hop_counter, signature, purpose, fragment_id,\n"
475                "       fragment_offset, message_id, group_generation,\n"
476                "       multicast_flags, psycstore_flags, data\n"
477                "FROM messages\n"
478                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
479                "      AND ? <= message_id AND message_id <= ?;",
480                &plugin->select_messages);
481
482   sql_prepare (plugin->dbh,
483                "SELECT * FROM\n"
484                "(SELECT hop_counter, signature, purpose, fragment_id,\n"
485                "        fragment_offset, message_id, group_generation,\n"
486                "        multicast_flags, psycstore_flags, data\n"
487                " FROM messages\n"
488                " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
489                " ORDER BY fragment_id DESC\n"
490                " LIMIT ?)\n"
491                "ORDER BY fragment_id;",
492                &plugin->select_latest_fragments);
493
494   /** @todo select_latest_messages: add method_prefix filter */
495   sql_prepare (plugin->dbh,
496                "SELECT hop_counter, signature, purpose, fragment_id,\n"
497                "       fragment_offset, message_id, group_generation,\n"
498                "        multicast_flags, psycstore_flags, data\n"
499                "FROM messages\n"
500                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
501                "      AND message_id IN\n"
502                "      (SELECT message_id\n"
503                "       FROM messages\n"
504                "       WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
505                "       GROUP BY message_id\n"
506                "       ORDER BY message_id\n"
507                "       DESC LIMIT ?)\n"
508                "ORDER BY fragment_id;",
509                &plugin->select_latest_messages);
510
511   sql_prepare (plugin->dbh,
512                "SELECT hop_counter, signature, purpose, fragment_id,\n"
513                "       fragment_offset, message_id, group_generation,\n"
514                "       multicast_flags, psycstore_flags, data\n"
515                "FROM messages\n"
516                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
517                "      AND message_id = ? AND fragment_offset = ?;",
518                &plugin->select_message_fragment);
519
520   sql_prepare (plugin->dbh,
521                "SELECT fragment_id, message_id, group_generation\n"
522                "FROM messages\n"
523                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
524                "ORDER BY fragment_id DESC LIMIT 1;",
525                &plugin->select_counters_message);
526
527   sql_prepare (plugin->dbh,
528                "SELECT max_state_message_id\n"
529                "FROM channels\n"
530                "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
531                &plugin->select_counters_state);
532
533   sql_prepare (plugin->dbh,
534                "UPDATE channels\n"
535                "SET max_state_message_id = ?\n"
536                "WHERE pub_key = ?;",
537                &plugin->update_max_state_message_id);
538
539   sql_prepare (plugin->dbh,
540                "UPDATE channels\n"
541                "SET state_hash_message_id = ?\n"
542                "WHERE pub_key = ?;",
543                &plugin->update_state_hash_message_id);
544
545   sql_prepare (plugin->dbh,
546                "SELECT 1\n"
547                "FROM channels AS c\n"
548                "LEFT JOIN messages AS m\n"
549                "ON c.id = m.channel_id\n"
550                "WHERE c.pub_key = ?\n"
551                "      AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
552                "           OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
553                "LIMIT 1;",
554                &plugin->select_message_state_delta);
555
556   sql_prepare (plugin->dbh,
557                "INSERT OR REPLACE INTO state\n"
558                "  (channel_id, name, value_current, value_signed)\n"
559                "SELECT new.channel_id, new.name,\n"
560                "       new.value_current, old.value_signed\n"
561                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
562                "             AS channel_id,\n"
563                "             ? AS name, ? AS value_current) AS new\n"
564                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
565                "           FROM state) AS old\n"
566                "ON new.channel_id = old.channel_id AND new.name = old.name;",
567                &plugin->insert_state_current);
568
569   sql_prepare (plugin->dbh,
570                "DELETE FROM state\n"
571                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
572                "      AND (value_current IS NULL OR length(value_current) = 0)\n"
573                "      AND (value_signed IS NULL OR length(value_signed) = 0);",
574                &plugin->delete_state_empty);
575
576   sql_prepare (plugin->dbh,
577                "UPDATE state\n"
578                "SET value_signed = value_current\n"
579                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
580                &plugin->update_state_signed);
581
582   sql_prepare (plugin->dbh,
583                "DELETE FROM state\n"
584                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
585                &plugin->delete_state);
586
587   sql_prepare (plugin->dbh,
588                "INSERT INTO state_sync (channel_id, name, value)\n"
589                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
590                &plugin->insert_state_sync);
591
592   sql_prepare (plugin->dbh,
593                "INSERT INTO state\n"
594                " (channel_id, name, value_current, value_signed)\n"
595                "SELECT channel_id, name, value, value\n"
596                "FROM state_sync\n"
597                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
598                &plugin->insert_state_from_sync);
599
600   sql_prepare (plugin->dbh,
601                "DELETE FROM state_sync\n"
602                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
603                &plugin->delete_state_sync);
604
605   sql_prepare (plugin->dbh,
606                "SELECT value_current\n"
607                "FROM state\n"
608                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
609                "      AND name = ?;",
610                &plugin->select_state_one);
611
612   sql_prepare (plugin->dbh,
613                "SELECT name, value_current\n"
614                "FROM state\n"
615                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
616                "      AND (name = ? OR name LIKE ?);",
617                &plugin->select_state_prefix);
618
619   sql_prepare (plugin->dbh,
620                "SELECT name, value_signed\n"
621                "FROM state\n"
622                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
623                "      AND value_signed IS NOT NULL;",
624                &plugin->select_state_signed);
625
626   return GNUNET_OK;
627 }
628
629
630 /**
631  * Shutdown database connection and associate data
632  * structures.
633  * @param plugin the plugin context (state for this module)
634  */
635 static void
636 database_shutdown (struct Plugin *plugin)
637 {
638   int result;
639   sqlite3_stmt *stmt;
640   while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
641   {
642     result = sqlite3_finalize (stmt);
643     if (SQLITE_OK != result)
644       LOG (GNUNET_ERROR_TYPE_WARNING,
645            "Failed to close statement %p: %d\n", stmt, result);
646   }
647   if (SQLITE_OK != sqlite3_close (plugin->dbh))
648     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
649
650   GNUNET_free_non_null (plugin->fn);
651 }
652
653 /**
654  * Execute a prepared statement with a @a channel_key argument.
655  *
656  * @param plugin Plugin handle.
657  * @param stmt Statement to execute.
658  * @param channel_key Public key of the channel.
659  *
660  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
661  */
662 static int
663 exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
664               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
665 {
666   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
667                                       sizeof (*channel_key), SQLITE_STATIC))
668   {
669     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
670                 "sqlite3_bind");
671   }
672   else if (SQLITE_DONE != sqlite3_step (stmt))
673   {
674     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
675                 "sqlite3_step");
676   }
677
678   if (SQLITE_OK != sqlite3_reset (stmt))
679   {
680     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
681                 "sqlite3_reset");
682     return GNUNET_SYSERR;
683   }
684
685   return GNUNET_OK;
686 }
687
688 /**
689  * Begin a transaction.
690  */
691 static int
692 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
693 {
694   sqlite3_stmt *stmt = plugin->transaction_begin;
695
696   if (SQLITE_DONE != sqlite3_step (stmt))
697   {
698     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
699                 "sqlite3_step");
700   }
701   if (SQLITE_OK != sqlite3_reset (stmt))
702   {
703     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
704                 "sqlite3_reset");
705     return GNUNET_SYSERR;
706   }
707
708   plugin->transaction = transaction;
709   return GNUNET_OK;
710 }
711
712
713 /**
714  * Commit current transaction.
715  */
716 static int
717 transaction_commit (struct Plugin *plugin)
718 {
719   sqlite3_stmt *stmt = plugin->transaction_commit;
720
721   if (SQLITE_DONE != sqlite3_step (stmt))
722   {
723     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
724                 "sqlite3_step");
725   }
726   if (SQLITE_OK != sqlite3_reset (stmt))
727   {
728     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
729                 "sqlite3_reset");
730     return GNUNET_SYSERR;
731   }
732
733   plugin->transaction = TRANSACTION_NONE;
734   return GNUNET_OK;
735 }
736
737
738 /**
739  * Roll back current transaction.
740  */
741 static int
742 transaction_rollback (struct Plugin *plugin)
743 {
744   sqlite3_stmt *stmt = plugin->transaction_rollback;
745
746   if (SQLITE_DONE != sqlite3_step (stmt))
747   {
748     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
749                 "sqlite3_step");
750   }
751   if (SQLITE_OK != sqlite3_reset (stmt))
752   {
753     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
754                 "sqlite3_reset");
755     return GNUNET_SYSERR;
756   }
757   plugin->transaction = TRANSACTION_NONE;
758   return GNUNET_OK;
759 }
760
761
762 static int
763 channel_key_store (struct Plugin *plugin,
764                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
765 {
766   sqlite3_stmt *stmt = plugin->insert_channel_key;
767
768   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
769                                       sizeof (*channel_key), SQLITE_STATIC))
770   {
771     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
772                 "sqlite3_bind");
773   }
774   else if (SQLITE_DONE != sqlite3_step (stmt))
775   {
776     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
777                 "sqlite3_step");
778   }
779
780   if (SQLITE_OK != sqlite3_reset (stmt))
781   {
782     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
783                 "sqlite3_reset");
784     return GNUNET_SYSERR;
785   }
786
787   return GNUNET_OK;
788 }
789
790
791 static int
792 slave_key_store (struct Plugin *plugin,
793                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
794 {
795   sqlite3_stmt *stmt = plugin->insert_slave_key;
796
797   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
798                                       sizeof (*slave_key), SQLITE_STATIC))
799   {
800     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
801                 "sqlite3_bind");
802   }
803   else if (SQLITE_DONE != sqlite3_step (stmt))
804   {
805     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
806                 "sqlite3_step");
807   }
808
809
810   if (SQLITE_OK != sqlite3_reset (stmt))
811   {
812     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
813                 "sqlite3_reset");
814     return GNUNET_SYSERR;
815   }
816
817   return GNUNET_OK;
818 }
819
820
821 /**
822  * Store join/leave events for a PSYC channel in order to be able to answer
823  * membership test queries later.
824  *
825  * @see GNUNET_PSYCSTORE_membership_store()
826  *
827  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
828  */
829 static int
830 membership_store (void *cls,
831                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
832                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
833                   int did_join,
834                   uint64_t announced_at,
835                   uint64_t effective_since,
836                   uint64_t group_generation)
837 {
838   struct Plugin *plugin = cls;
839   sqlite3_stmt *stmt = plugin->insert_membership;
840
841   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
842
843   if (announced_at > INT64_MAX ||
844       effective_since > INT64_MAX ||
845       group_generation > INT64_MAX)
846   {
847     GNUNET_break (0);
848     return GNUNET_SYSERR;
849   }
850
851   if (GNUNET_OK != channel_key_store (plugin, channel_key)
852       || GNUNET_OK != slave_key_store (plugin, slave_key))
853     return GNUNET_SYSERR;
854
855   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
856                                       sizeof (*channel_key), SQLITE_STATIC)
857       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
858                                          sizeof (*slave_key), SQLITE_STATIC)
859       || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
860       || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
861       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
862       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
863   {
864     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
865                 "sqlite3_bind");
866   }
867   else if (SQLITE_DONE != sqlite3_step (stmt))
868   {
869     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
870                 "sqlite3_step");
871   }
872
873   if (SQLITE_OK != sqlite3_reset (stmt))
874   {
875     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
876                 "sqlite3_reset");
877     return GNUNET_SYSERR;
878   }
879
880   return GNUNET_OK;
881 }
882
883 /**
884  * Test if a member was admitted to the channel at the given message ID.
885  *
886  * @see GNUNET_PSYCSTORE_membership_test()
887  *
888  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
889  *         #GNUNET_SYSERR if there was en error.
890  */
891 static int
892 membership_test (void *cls,
893                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
894                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
895                  uint64_t message_id)
896 {
897   struct Plugin *plugin = cls;
898   sqlite3_stmt *stmt = plugin->select_membership;
899   int ret = GNUNET_SYSERR;
900
901   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
902                                       sizeof (*channel_key), SQLITE_STATIC)
903       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
904                                          sizeof (*slave_key), SQLITE_STATIC)
905       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
906   {
907     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
908                 "sqlite3_bind");
909   }
910   else
911   {
912     switch (sqlite3_step (stmt))
913     {
914     case SQLITE_DONE:
915       ret = GNUNET_NO;
916       break;
917     case SQLITE_ROW:
918       ret = GNUNET_YES;
919     }
920   }
921
922   if (SQLITE_OK != sqlite3_reset (stmt))
923   {
924     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
925                 "sqlite3_reset");
926   }
927
928   return ret;
929 }
930
931 /**
932  * Store a message fragment sent to a channel.
933  *
934  * @see GNUNET_PSYCSTORE_fragment_store()
935  *
936  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
937  */
938 static int
939 fragment_store (void *cls,
940                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
941                 const struct GNUNET_MULTICAST_MessageHeader *msg,
942                 uint32_t psycstore_flags)
943 {
944   struct Plugin *plugin = cls;
945   sqlite3_stmt *stmt = plugin->insert_fragment;
946
947   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
948
949   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
950   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
951   uint64_t message_id = GNUNET_ntohll (msg->message_id);
952   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
953
954   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
955       message_id > INT64_MAX || group_generation > INT64_MAX)
956   {
957     LOG (GNUNET_ERROR_TYPE_ERROR,
958          "Tried to store fragment with a field > INT64_MAX: "
959          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
960          message_id, group_generation);
961     GNUNET_break (0);
962     return GNUNET_SYSERR;
963   }
964
965   if (GNUNET_OK != channel_key_store (plugin, channel_key))
966     return GNUNET_SYSERR;
967
968   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
969                                       sizeof (*channel_key), SQLITE_STATIC)
970       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
971       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
972                                          sizeof (msg->signature), SQLITE_STATIC)
973       || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
974                                          sizeof (msg->purpose), SQLITE_STATIC)
975       || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
976       || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
977       || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
978       || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
979       || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
980       || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
981       || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
982                                          ntohs (msg->header.size)
983                                          - sizeof (*msg), SQLITE_STATIC))
984   {
985     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
986                 "sqlite3_bind");
987   }
988   else if (SQLITE_DONE != sqlite3_step (stmt))
989   {
990     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
991                 "sqlite3_step");
992   }
993
994   if (SQLITE_OK != sqlite3_reset (stmt))
995   {
996     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
997                 "sqlite3_reset");
998     return GNUNET_SYSERR;
999   }
1000
1001   return GNUNET_OK;
1002 }
1003
1004 /**
1005  * Set additional flags for a given message.
1006  *
1007  * They are OR'd with any existing flags set.
1008  *
1009  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1010  */
1011 static int
1012 message_add_flags (void *cls,
1013                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1014                    uint64_t message_id,
1015                    uint64_t psycstore_flags)
1016 {
1017   struct Plugin *plugin = cls;
1018   sqlite3_stmt *stmt = plugin->update_message_flags;
1019   int ret = GNUNET_SYSERR;
1020
1021   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
1022       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1023                                          sizeof (*channel_key), SQLITE_STATIC)
1024       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
1025   {
1026     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1027                 "sqlite3_bind");
1028   }
1029   else
1030   {
1031     switch (sqlite3_step (stmt))
1032     {
1033     case SQLITE_DONE:
1034       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1035       break;
1036     default:
1037       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1038                   "sqlite3_step");
1039     }
1040   }
1041
1042   if (SQLITE_OK != sqlite3_reset (stmt))
1043   {
1044     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1045                 "sqlite3_reset");
1046     return GNUNET_SYSERR;
1047   }
1048
1049   return ret;
1050 }
1051
1052 static int
1053 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1054               void *cb_cls)
1055 {
1056   int data_size = sqlite3_column_bytes (stmt, 9);
1057   struct GNUNET_MULTICAST_MessageHeader *msg
1058     = GNUNET_malloc (sizeof (*msg) + data_size);
1059
1060   msg->header.size = htons (sizeof (*msg) + data_size);
1061   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1062   msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1063   memcpy (&msg->signature,
1064           sqlite3_column_blob (stmt, 1),
1065           sqlite3_column_bytes (stmt, 1));
1066   memcpy (&msg->purpose,
1067           sqlite3_column_blob (stmt, 2),
1068           sqlite3_column_bytes (stmt, 2));
1069   msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1070   msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1071   msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1072   msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1073   msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1074   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1075
1076   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1077 }
1078
1079
1080 static int
1081 fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1082                  uint64_t *returned_fragments,
1083                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1084 {
1085   int ret = GNUNET_SYSERR;
1086   int sql_ret;
1087
1088   do
1089   {
1090     sql_ret = sqlite3_step (stmt);
1091     switch (sql_ret)
1092     {
1093     case SQLITE_DONE:
1094       if (ret != GNUNET_OK)
1095         ret = GNUNET_NO;
1096       break;
1097     case SQLITE_ROW:
1098       ret = fragment_row (stmt, cb, cb_cls);
1099       (*returned_fragments)++;
1100       if (ret != GNUNET_YES)
1101         sql_ret = SQLITE_DONE;
1102       break;
1103     default:
1104       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1105                   "sqlite3_step");
1106     }
1107   }
1108   while (sql_ret == SQLITE_ROW);
1109
1110   return ret;
1111 }
1112
1113 /**
1114  * Retrieve a message fragment range by fragment ID.
1115  *
1116  * @see GNUNET_PSYCSTORE_fragment_get()
1117  *
1118  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1119  */
1120 static int
1121 fragment_get (void *cls,
1122               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1123               uint64_t first_fragment_id,
1124               uint64_t last_fragment_id,
1125               uint64_t *returned_fragments,
1126               GNUNET_PSYCSTORE_FragmentCallback cb,
1127               void *cb_cls)
1128 {
1129   struct Plugin *plugin = cls;
1130   sqlite3_stmt *stmt = plugin->select_fragments;
1131   int ret = GNUNET_SYSERR;
1132   *returned_fragments = 0;
1133
1134   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1135                                       sizeof (*channel_key),
1136                                       SQLITE_STATIC)
1137       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1138       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1139   {
1140     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1141                 "sqlite3_bind");
1142   }
1143   else
1144   {
1145     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1146   }
1147
1148   if (SQLITE_OK != sqlite3_reset (stmt))
1149   {
1150     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1151                 "sqlite3_reset");
1152   }
1153
1154   return ret;
1155 }
1156
1157
1158 /**
1159  * Retrieve a message fragment range by fragment ID.
1160  *
1161  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1162  *
1163  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1164  */
1165 static int
1166 fragment_get_latest (void *cls,
1167                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1168                      uint64_t fragment_limit,
1169                      uint64_t *returned_fragments,
1170                      GNUNET_PSYCSTORE_FragmentCallback cb,
1171                      void *cb_cls)
1172 {
1173   struct Plugin *plugin = cls;
1174   sqlite3_stmt *stmt = plugin->select_latest_fragments;
1175   int ret = GNUNET_SYSERR;
1176   *returned_fragments = 0;
1177
1178   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1179                                       sizeof (*channel_key),
1180                                       SQLITE_STATIC)
1181       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1182   {
1183     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1184                 "sqlite3_bind");
1185   }
1186   else
1187   {
1188     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1189   }
1190
1191   if (SQLITE_OK != sqlite3_reset (stmt))
1192   {
1193     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1194                 "sqlite3_reset");
1195   }
1196
1197   return ret;
1198 }
1199
1200
1201 /**
1202  * Retrieve all fragments of a message ID range.
1203  *
1204  * @see GNUNET_PSYCSTORE_message_get()
1205  *
1206  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1207  */
1208 static int
1209 message_get (void *cls,
1210              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1211              uint64_t first_message_id,
1212              uint64_t last_message_id,
1213              uint64_t *returned_fragments,
1214              GNUNET_PSYCSTORE_FragmentCallback cb,
1215              void *cb_cls)
1216 {
1217   struct Plugin *plugin = cls;
1218   sqlite3_stmt *stmt = plugin->select_messages;
1219   int ret = GNUNET_SYSERR;
1220   *returned_fragments = 0;
1221
1222   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1223                                       sizeof (*channel_key),
1224                                       SQLITE_STATIC)
1225       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1226       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1227   {
1228     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1229                 "sqlite3_bind");
1230   }
1231   else
1232   {
1233     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1234   }
1235
1236   if (SQLITE_OK != sqlite3_reset (stmt))
1237   {
1238     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1239                 "sqlite3_reset");
1240   }
1241
1242   return ret;
1243 }
1244
1245
1246 /**
1247  * Retrieve all fragments of the latest messages.
1248  *
1249  * @see GNUNET_PSYCSTORE_message_get_latest()
1250  *
1251  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1252  */
1253 static int
1254 message_get_latest (void *cls,
1255                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1256                     uint64_t message_limit,
1257                     uint64_t *returned_fragments,
1258                     GNUNET_PSYCSTORE_FragmentCallback cb,
1259                     void *cb_cls)
1260 {
1261   struct Plugin *plugin = cls;
1262   sqlite3_stmt *stmt = plugin->select_latest_messages;
1263   int ret = GNUNET_SYSERR;
1264   *returned_fragments = 0;
1265
1266   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1267                                       sizeof (*channel_key),
1268                                       SQLITE_STATIC)
1269       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1270                                          sizeof (*channel_key),
1271                                          SQLITE_STATIC)
1272       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1273   {
1274     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1275                 "sqlite3_bind");
1276   }
1277   else
1278   {
1279     ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1280   }
1281
1282   if (SQLITE_OK != sqlite3_reset (stmt))
1283   {
1284     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1285                 "sqlite3_reset");
1286   }
1287
1288   return ret;
1289 }
1290
1291
1292 /**
1293  * Retrieve a fragment of message specified by its message ID and fragment
1294  * offset.
1295  *
1296  * @see GNUNET_PSYCSTORE_message_get_fragment()
1297  *
1298  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1299  */
1300 static int
1301 message_get_fragment (void *cls,
1302                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1303                       uint64_t message_id,
1304                       uint64_t fragment_offset,
1305                       GNUNET_PSYCSTORE_FragmentCallback cb,
1306                       void *cb_cls)
1307 {
1308   struct Plugin *plugin = cls;
1309   sqlite3_stmt *stmt = plugin->select_message_fragment;
1310   int ret = GNUNET_SYSERR;
1311
1312   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1313                                       sizeof (*channel_key),
1314                                       SQLITE_STATIC)
1315       || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1316       || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1317   {
1318     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1319                 "sqlite3_bind");
1320   }
1321   else
1322   {
1323     switch (sqlite3_step (stmt))
1324     {
1325     case SQLITE_DONE:
1326       ret = GNUNET_NO;
1327       break;
1328     case SQLITE_ROW:
1329       ret = fragment_row (stmt, cb, cb_cls);
1330       break;
1331     default:
1332       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1333                   "sqlite3_step");
1334     }
1335   }
1336
1337   if (SQLITE_OK != sqlite3_reset (stmt))
1338   {
1339     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1340                 "sqlite3_reset");
1341   }
1342
1343   return ret;
1344 }
1345
1346 /**
1347  * Retrieve the max. values of message counters for a channel.
1348  *
1349  * @see GNUNET_PSYCSTORE_counters_get()
1350  *
1351  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1352  */
1353 static int
1354 counters_message_get (void *cls,
1355                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1356                       uint64_t *max_fragment_id,
1357                       uint64_t *max_message_id,
1358                       uint64_t *max_group_generation)
1359 {
1360   struct Plugin *plugin = cls;
1361   sqlite3_stmt *stmt = plugin->select_counters_message;
1362   int ret = GNUNET_SYSERR;
1363
1364   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1365                                       sizeof (*channel_key),
1366                                       SQLITE_STATIC))
1367   {
1368     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1369                 "sqlite3_bind");
1370   }
1371   else
1372   {
1373     switch (sqlite3_step (stmt))
1374     {
1375     case SQLITE_DONE:
1376       ret = GNUNET_NO;
1377       break;
1378     case SQLITE_ROW:
1379       *max_fragment_id = sqlite3_column_int64 (stmt, 0);
1380       *max_message_id = sqlite3_column_int64 (stmt, 1);
1381       *max_group_generation = sqlite3_column_int64 (stmt, 2);
1382       ret = GNUNET_OK;
1383       break;
1384     default:
1385       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1386                   "sqlite3_step");
1387     }
1388   }
1389
1390   if (SQLITE_OK != sqlite3_reset (stmt))
1391   {
1392     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1393                 "sqlite3_reset");
1394   }
1395
1396   return ret;
1397 }
1398
1399 /**
1400  * Retrieve the max. values of state counters for a channel.
1401  *
1402  * @see GNUNET_PSYCSTORE_counters_get()
1403  *
1404  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1405  */
1406 static int
1407 counters_state_get (void *cls,
1408                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1409                     uint64_t *max_state_message_id)
1410 {
1411   struct Plugin *plugin = cls;
1412   sqlite3_stmt *stmt = plugin->select_counters_state;
1413   int ret = GNUNET_SYSERR;
1414
1415   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1416                                       sizeof (*channel_key),
1417                                       SQLITE_STATIC))
1418   {
1419     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1420                 "sqlite3_bind");
1421   }
1422   else
1423   {
1424     switch (sqlite3_step (stmt))
1425     {
1426     case SQLITE_DONE:
1427       ret = GNUNET_NO;
1428       break;
1429     case SQLITE_ROW:
1430       *max_state_message_id = sqlite3_column_int64 (stmt, 0);
1431       ret = GNUNET_OK;
1432       break;
1433     default:
1434       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1435                   "sqlite3_step");
1436     }
1437   }
1438
1439   if (SQLITE_OK != sqlite3_reset (stmt))
1440   {
1441     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1442                 "sqlite3_reset");
1443   }
1444
1445   return ret;
1446 }
1447
1448
1449 /**
1450  * Set a state variable to the given value.
1451  *
1452  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1453  */
1454 static int
1455 state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1456            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1457            const char *name, const void *value, size_t value_size)
1458 {
1459   int ret = GNUNET_SYSERR;
1460
1461   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1462                                       sizeof (*channel_key), SQLITE_STATIC)
1463       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1464       || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1465                                          SQLITE_STATIC))
1466   {
1467     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1468                 "sqlite3_bind");
1469   }
1470   else
1471   {
1472     switch (sqlite3_step (stmt))
1473     {
1474     case SQLITE_DONE:
1475       ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1476       break;
1477     default:
1478       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1479                   "sqlite3_step");
1480     }
1481   }
1482
1483   if (SQLITE_OK != sqlite3_reset (stmt))
1484   {
1485     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1486                 "sqlite3_reset");
1487     return GNUNET_SYSERR;
1488   }
1489
1490   return ret;
1491 }
1492
1493
1494 static int
1495 update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1496                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1497                    uint64_t message_id)
1498 {
1499   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1500       || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1501                                          sizeof (*channel_key), SQLITE_STATIC))
1502   {
1503     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1504                 "sqlite3_bind");
1505   }
1506   else if (SQLITE_DONE != sqlite3_step (stmt))
1507   {
1508     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1509                 "sqlite3_step");
1510   }
1511   if (SQLITE_OK != sqlite3_reset (stmt))
1512   {
1513     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1514                 "sqlite3_reset");
1515     return GNUNET_SYSERR;
1516   }
1517   return GNUNET_OK;
1518 }
1519
1520
1521 /**
1522  * Begin modifying current state.
1523  */
1524 static int
1525 state_modify_begin (void *cls,
1526                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1527                     uint64_t message_id, uint64_t state_delta)
1528 {
1529   struct Plugin *plugin = cls;
1530   sqlite3_stmt *stmt = plugin->select_message_state_delta;
1531
1532   if (state_delta > 0)
1533   {
1534     int ret = GNUNET_SYSERR;
1535     if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1536                                         sizeof (*channel_key), SQLITE_STATIC)
1537         || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1538                                             message_id - state_delta)
1539         || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1540                                             message_id)
1541         || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1542                                             message_id - state_delta)
1543         || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1544                                             GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1545     {
1546       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1547                   "sqlite3_bind");
1548     }
1549     else
1550     {
1551       switch (sqlite3_step (stmt))
1552       {
1553       case SQLITE_DONE:
1554         ret = GNUNET_NO;
1555         break;
1556       case SQLITE_ROW:
1557         ret = GNUNET_OK;
1558         break;
1559       default:
1560         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1561                     "sqlite3_step");
1562       }
1563     }
1564     if (SQLITE_OK != sqlite3_reset (stmt))
1565     {
1566       ret = GNUNET_SYSERR;
1567       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1568                   "sqlite3_reset");
1569      }
1570     if (GNUNET_OK != ret)
1571       return ret;
1572   }
1573
1574   if (TRANSACTION_NONE != plugin->transaction)
1575       if (GNUNET_OK != transaction_rollback (plugin))
1576           return GNUNET_SYSERR;
1577
1578   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1579 }
1580
1581
1582 /**
1583  * Set the current value of state variable.
1584  *
1585  * @see GNUNET_PSYCSTORE_state_modify()
1586  *
1587  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1588  */
1589 static int
1590 state_modify_set (void *cls,
1591                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1592                   const char *name, const void *value, size_t value_size)
1593 {
1594   struct Plugin *plugin = cls;
1595   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1596
1597   return state_set (plugin, plugin->insert_state_current, channel_key,
1598                     name, value, value_size);
1599
1600 }
1601
1602
1603 /**
1604  * End modifying current state.
1605  */
1606 static int
1607 state_modify_end (void *cls,
1608                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1609                   uint64_t message_id)
1610 {
1611   struct Plugin *plugin = cls;
1612   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1613
1614   return
1615     GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1616     && GNUNET_OK == update_message_id (plugin,
1617                                        plugin->update_max_state_message_id,
1618                                        channel_key, message_id)
1619     && GNUNET_OK == transaction_commit (plugin)
1620     ? GNUNET_OK : GNUNET_SYSERR;
1621 }
1622
1623
1624 /**
1625  * Begin state synchronization.
1626  */
1627 static int
1628 state_sync_begin (void *cls,
1629                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1630 {
1631   struct Plugin *plugin = cls;
1632   return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1633 }
1634
1635
1636 /**
1637  * Set the current value of state variable.
1638  *
1639  * @see GNUNET_PSYCSTORE_state_modify()
1640  *
1641  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1642  */
1643 static int
1644 state_sync_set (void *cls,
1645                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1646                 const char *name, const void *value, size_t value_size)
1647 {
1648   struct Plugin *plugin = cls;
1649   return state_set (cls, plugin->insert_state_sync, channel_key,
1650                     name, value, value_size);
1651 }
1652
1653
1654 /**
1655  * End modifying current state.
1656  */
1657 static int
1658 state_sync_end (void *cls,
1659                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1660                 uint64_t message_id)
1661 {
1662   struct Plugin *plugin = cls;
1663   int ret = GNUNET_SYSERR;
1664
1665   GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1666     && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1667     && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1668                                   channel_key)
1669     && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1670                                   channel_key)
1671     && GNUNET_OK == update_message_id (plugin,
1672                                        plugin->update_state_hash_message_id,
1673                                        channel_key, message_id)
1674     && GNUNET_OK == transaction_commit (plugin)
1675     ? ret = GNUNET_OK
1676     : transaction_rollback (plugin);
1677   return ret;
1678 }
1679
1680
1681 /**
1682  * Reset the state of a channel.
1683  *
1684  * @see GNUNET_PSYCSTORE_state_reset()
1685  *
1686  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1687  */
1688 static int
1689 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1690 {
1691   struct Plugin *plugin = cls;
1692   return exec_channel (plugin, plugin->delete_state, channel_key);
1693 }
1694
1695
1696 /**
1697  * Update signed values of state variables in the state store.
1698  *
1699  * @see GNUNET_PSYCSTORE_state_hash_update()
1700  *
1701  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1702  */
1703 static int
1704 state_update_signed (void *cls,
1705                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1706 {
1707   struct Plugin *plugin = cls;
1708   return exec_channel (plugin, plugin->update_state_signed, channel_key);
1709 }
1710
1711
1712 /**
1713  * Retrieve a state variable by name.
1714  *
1715  * @see GNUNET_PSYCSTORE_state_get()
1716  *
1717  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1718  */
1719 static int
1720 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1721            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1722 {
1723   struct Plugin *plugin = cls;
1724   int ret = GNUNET_SYSERR;
1725
1726   sqlite3_stmt *stmt = plugin->select_state_one;
1727
1728   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1729                                       sizeof (*channel_key),
1730                                       SQLITE_STATIC)
1731       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1732   {
1733     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1734                 "sqlite3_bind");
1735   }
1736   else
1737   {
1738     switch (sqlite3_step (stmt))
1739     {
1740     case SQLITE_DONE:
1741       ret = GNUNET_NO;
1742       break;
1743     case SQLITE_ROW:
1744       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1745                 sqlite3_column_bytes (stmt, 0));
1746       break;
1747     default:
1748       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1749                   "sqlite3_step");
1750     }
1751   }
1752
1753   if (SQLITE_OK != sqlite3_reset (stmt))
1754   {
1755     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1756                 "sqlite3_reset");
1757   }
1758
1759   return ret;
1760 }
1761
1762
1763 /**
1764  * Retrieve all state variables for a channel with the given prefix.
1765  *
1766  * @see GNUNET_PSYCSTORE_state_get_prefix()
1767  *
1768  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1769  */
1770 static int
1771 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1772                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1773                   void *cb_cls)
1774 {
1775   struct Plugin *plugin = cls;
1776   int ret = GNUNET_SYSERR;
1777   sqlite3_stmt *stmt = plugin->select_state_prefix;
1778   size_t name_len = strlen (name);
1779   char *name_prefix;
1780
1781   GNUNET_asprintf (&name_prefix,
1782                    "%s_%%",
1783                    name);
1784   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1785                                       sizeof (*channel_key), SQLITE_STATIC)
1786       || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1787       || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1788                                          SQLITE_STATIC))
1789   {
1790     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1791                 "sqlite3_bind");
1792   }
1793   else
1794   {
1795     int sql_ret;
1796     do
1797     {
1798       sql_ret = sqlite3_step (stmt);
1799       switch (sql_ret)
1800       {
1801       case SQLITE_DONE:
1802         if (ret != GNUNET_OK)
1803           ret = GNUNET_NO;
1804         break;
1805       case SQLITE_ROW:
1806         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1807                   sqlite3_column_blob (stmt, 1),
1808                   sqlite3_column_bytes (stmt, 1));
1809         if (ret != GNUNET_YES)
1810           sql_ret = SQLITE_DONE;
1811         break;
1812       default:
1813         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1814                     "sqlite3_step");
1815       }
1816     }
1817     while (sql_ret == SQLITE_ROW);
1818   }
1819   if (SQLITE_OK != sqlite3_reset (stmt))
1820   {
1821     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1822                 "sqlite3_reset");
1823   }
1824   GNUNET_free (name_prefix);
1825   return ret;
1826 }
1827
1828
1829 /**
1830  * Retrieve all signed state variables for a channel.
1831  *
1832  * @see GNUNET_PSYCSTORE_state_get_signed()
1833  *
1834  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1835  */
1836 static int
1837 state_get_signed (void *cls,
1838                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1839                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1840 {
1841   struct Plugin *plugin = cls;
1842   int ret = GNUNET_SYSERR;
1843
1844   sqlite3_stmt *stmt = plugin->select_state_signed;
1845
1846   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1847                                       sizeof (*channel_key), SQLITE_STATIC))
1848   {
1849     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1850                 "sqlite3_bind");
1851   }
1852   else
1853   {
1854     int sql_ret;
1855     do
1856     {
1857       sql_ret = sqlite3_step (stmt);
1858       switch (sql_ret)
1859       {
1860       case SQLITE_DONE:
1861         if (ret != GNUNET_OK)
1862           ret = GNUNET_NO;
1863         break;
1864       case SQLITE_ROW:
1865         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1866                   sqlite3_column_blob (stmt, 1),
1867                   sqlite3_column_bytes (stmt, 1));
1868         if (ret != GNUNET_YES)
1869           sql_ret = SQLITE_DONE;
1870         break;
1871       default:
1872         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1873                     "sqlite3_step");
1874       }
1875     }
1876     while (sql_ret == SQLITE_ROW);
1877   }
1878
1879   if (SQLITE_OK != sqlite3_reset (stmt))
1880   {
1881     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1882                 "sqlite3_reset");
1883   }
1884
1885   return ret;
1886 }
1887
1888
1889 /**
1890  * Entry point for the plugin.
1891  *
1892  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1893  * @return NULL on error, otherwise the plugin context
1894  */
1895 void *
1896 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1897 {
1898   static struct Plugin plugin;
1899   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1900   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1901
1902   if (NULL != plugin.cfg)
1903     return NULL;                /* can only initialize once! */
1904   memset (&plugin, 0, sizeof (struct Plugin));
1905   plugin.cfg = cfg;
1906   if (GNUNET_OK != database_setup (&plugin))
1907   {
1908     database_shutdown (&plugin);
1909     return NULL;
1910   }
1911   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1912   api->cls = &plugin;
1913   api->membership_store = &membership_store;
1914   api->membership_test = &membership_test;
1915   api->fragment_store = &fragment_store;
1916   api->message_add_flags = &message_add_flags;
1917   api->fragment_get = &fragment_get;
1918   api->fragment_get_latest = &fragment_get_latest;
1919   api->message_get = &message_get;
1920   api->message_get_latest = &message_get_latest;
1921   api->message_get_fragment = &message_get_fragment;
1922   api->counters_message_get = &counters_message_get;
1923   api->counters_state_get = &counters_state_get;
1924   api->state_modify_begin = &state_modify_begin;
1925   api->state_modify_set = &state_modify_set;
1926   api->state_modify_end = &state_modify_end;
1927   api->state_sync_begin = &state_sync_begin;
1928   api->state_sync_set = &state_sync_set;
1929   api->state_sync_end = &state_sync_end;
1930   api->state_reset = &state_reset;
1931   api->state_update_signed = &state_update_signed;
1932   api->state_get = &state_get;
1933   api->state_get_prefix = &state_get_prefix;
1934   api->state_get_signed = &state_get_signed;
1935
1936   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1937   return api;
1938 }
1939
1940
1941 /**
1942  * Exit point from the plugin.
1943  *
1944  * @param cls The plugin context (as returned by "init")
1945  * @return Always NULL
1946  */
1947 void *
1948 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1949 {
1950   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1951   struct Plugin *plugin = api->cls;
1952
1953   database_shutdown (plugin);
1954   plugin->cfg = NULL;
1955   GNUNET_free (api);
1956   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1957   return NULL;
1958 }
1959
1960 /* end of plugin_psycstore_sqlite.c */