750fdce16f4a4882b79e5b20b39e78b993d8e806
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_sqlite.c
1  /*
2   * This file is part of GNUnet
3   * (C) 2009-2013 Christian Grothoff (and other contributing authors)
4   *
5   * GNUnet is free software; you can redistribute it and/or modify
6   * it under the terms of the GNU General Public License as published
7   * by the Free Software Foundation; either version 3, or (at your
8   * option) any later version.
9   *
10   * GNUnet is distributed in the hope that it will be useful, but
11   * WITHOUT ANY WARRANTY; without even the implied warranty of
12 t  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   * General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with GNUnet; see the file COPYING.  If not, write to the
17   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   * Boston, MA 02111-1307, USA.
19   */
20
21 /**
22  * @file psycstore/plugin_psycstore_sqlite.c
23  * @brief sqlite-based psycstore backend
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 /*
29  * FIXME: SQLite3 only supports signed 64-bit integers natively,
30  *        thus it can only store 63 bits of the uint64_t's.
31  */
32
33 #include "platform.h"
34 #include "gnunet_psycstore_plugin.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_multicast_service.h"
37 #include "gnunet_crypto_lib.h"
38 #include "psycstore.h"
39 #include <sqlite3.h>
40
41 /**
42  * After how many ms "busy" should a DB operation fail for good?  A
43  * low value makes sure that we are more responsive to requests
44  * (especially PUTs).  A high value guarantees a higher success rate
45  * (SELECTs in iterate can take several seconds despite LIMIT=1).
46  *
47  * The default value of 1s should ensure that users do not experience
48  * huge latencies while at the same time allowing operations to
49  * succeed with reasonable probability.
50  */
51 #define BUSY_TIMEOUT_MS 1000
52
53 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54
55 /**
56  * Log an error message at log-level 'level' that indicates
57  * a failure of the command 'cmd' on file 'filename'
58  * with the message given by strerror(errno).
59  */
60 #define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
61
62 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
63
64
65 /**
66  * Context for all functions in this plugin.
67  */
68 struct Plugin
69 {
70
71   const struct GNUNET_CONFIGURATION_Handle *cfg;
72
73   /**
74    * Database filename.
75    */
76   char *fn;
77
78   /**
79    * Native SQLite database handle.
80    */
81   sqlite3 *dbh;
82
83   /**
84    * Precompiled SQL for channel_key_store()
85    */
86   sqlite3_stmt *insert_channel_key;
87
88   /**
89    * Precompiled SQL for slave_key_store()
90    */
91   sqlite3_stmt *insert_slave_key;
92
93
94   /**
95    * Precompiled SQL for membership_store()
96    */
97   sqlite3_stmt *insert_membership;
98
99   /**
100    * Precompiled SQL for membership_test()
101    */
102   sqlite3_stmt *select_membership;
103
104
105   /**
106    * Precompiled SQL for fragment_store()
107    */
108   sqlite3_stmt *insert_fragment;
109
110   /**
111    * Precompiled SQL for message_add_flags()
112    */
113   sqlite3_stmt *update_message_flags;
114
115   /**
116    * Precompiled SQL for fragment_get()
117    */
118   sqlite3_stmt *select_fragment;
119
120   /**
121    * Precompiled SQL for message_get()
122    */
123   sqlite3_stmt *select_message;
124
125   /**
126    * Precompiled SQL for message_get_fragment()
127    */
128   sqlite3_stmt *select_message_fragment;
129
130   /**
131    * Precompiled SQL for counters_get_master()
132    */
133   sqlite3_stmt *select_counters_master;
134
135   /**
136    * Precompiled SQL for counters_get_slave()
137    */
138   sqlite3_stmt *select_counters_slave;
139
140
141   /**
142    * Precompiled SQL for state_set()
143    */
144   sqlite3_stmt *insert_state_current;
145
146   /**
147    * Precompiled SQL for state_set_signed()
148    */
149   sqlite3_stmt *update_state_signed;
150
151   /**
152    * Precompiled SQL for state_sync()
153    */
154   sqlite3_stmt *insert_state_sync;
155
156   /**
157    * Precompiled SQL for state_sync()
158    */
159   sqlite3_stmt *delete_state;
160
161   /**
162    * Precompiled SQL for state_sync()
163    */
164   sqlite3_stmt *insert_state_from_sync;
165
166   /**
167    * Precompiled SQL for state_sync()
168    */
169   sqlite3_stmt *delete_state_sync;
170
171   /**
172    * Precompiled SQL for state_get_signed()
173    */
174   sqlite3_stmt *select_state_signed;
175
176   /**
177    * Precompiled SQL for state_get()
178    */
179   sqlite3_stmt *select_state_one;
180
181   /**
182    * Precompiled SQL for state_get_all()
183    */
184   sqlite3_stmt *select_state_prefix;
185
186 };
187
188 #if DEBUG_PSYCSTORE
189
190 static void
191 sql_trace (void *cls, const char *sql)
192 {
193   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql);
194 }
195
196 #endif
197
198 /**
199  * @brief Prepare a SQL statement
200  *
201  * @param dbh handle to the database
202  * @param sql SQL statement, UTF-8 encoded
203  * @param stmt set to the prepared statement
204  * @return 0 on success
205  */
206 static int
207 sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
208 {
209   char *tail;
210   int result;
211
212   result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
213                                (const char **) &tail);
214   LOG (GNUNET_ERROR_TYPE_DEBUG, 
215        "Prepared `%s' / %p: %d\n", sql, *stmt, result);
216   if (result != SQLITE_OK)
217     LOG (GNUNET_ERROR_TYPE_ERROR,
218          _("Error preparing SQL query: %s\n  %s\n"),
219          sqlite3_errmsg (dbh), sql);
220   return result;
221 }
222
223
224 /**
225  * @brief Prepare a SQL statement
226  *
227  * @param dbh handle to the database
228  * @param sql SQL statement, UTF-8 encoded
229  * @return 0 on success
230  */
231 static int
232 sql_exec (sqlite3 *dbh, const char *sql)
233 {
234   int result;
235
236   result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
237   LOG (GNUNET_ERROR_TYPE_DEBUG, 
238        "Executed `%s' / %d\n", sql, result);
239   if (result != SQLITE_OK)
240     LOG (GNUNET_ERROR_TYPE_ERROR,
241          _("Error executing SQL query: %s\n  %s\n"),
242          sqlite3_errmsg (dbh), sql);
243   return result;
244 }
245
246
247 /**
248  * Initialize the database connections and associated
249  * data structures (create tables and indices
250  * as needed as well).
251  *
252  * @param plugin the plugin context (state for this module)
253  * @return GNUNET_OK on success
254  */
255 static int
256 database_setup (struct Plugin *plugin)
257 {
258   char *filename;
259
260   if (GNUNET_OK !=
261       GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite",
262                                                "FILENAME", &filename))
263   {
264     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
265                                "psycstore-sqlite", "FILENAME");
266     return GNUNET_SYSERR;
267   }
268   if (GNUNET_OK != GNUNET_DISK_file_test (filename))
269   {
270     if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
271     {
272       GNUNET_break (0);
273       GNUNET_free (filename);
274       return GNUNET_SYSERR;
275     }
276   }
277   /* filename should be UTF-8-encoded. If it isn't, it's a bug */
278   plugin->fn = filename;
279
280   /* Open database and precompile statements */
281   if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
282   {
283     LOG (GNUNET_ERROR_TYPE_ERROR,
284          _("Unable to initialize SQLite: %s.\n"),
285          sqlite3_errmsg (plugin->dbh));
286     return GNUNET_SYSERR;
287   }
288
289 #if DEBUG_PSYCSTORE
290   sqlite3_trace (plugin->dbh, &sql_trace, NULL);
291 #endif
292
293   sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
294   sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
295   sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
296   sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
297   sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
298   sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
299   sql_exec (plugin->dbh, "PRAGMA count_changes=OFF");
300   sql_exec (plugin->dbh, "PRAGMA page_size=4096");
301
302   sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
303
304   /* Create tables */
305
306   sql_exec (plugin->dbh,
307             "CREATE TABLE IF NOT EXISTS channels (\n"
308             "  id INTEGER PRIMARY KEY,\n"
309             "  pub_key BLOB UNIQUE\n"
310             ");");
311
312   sql_exec (plugin->dbh,
313             "CREATE TABLE IF NOT EXISTS slaves (\n"
314             "  id INTEGER PRIMARY KEY,\n"
315             "  pub_key BLOB UNIQUE\n"
316             ");");
317
318   sql_exec (plugin->dbh,
319             "CREATE TABLE IF NOT EXISTS membership (\n"
320             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
321             "  slave_id INTEGER NOT NULL REFERENCES slaves(id),\n"
322             "  did_join INTEGER NOT NULL,\n"
323             "  announced_at INTEGER NOT NULL,\n"
324             "  effective_since INTEGER NOT NULL,\n"
325             "  group_generation INTEGER NOT NULL\n"
326             ");");
327   sql_exec (plugin->dbh,
328             "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
329             "ON membership (channel_id, slave_id);");
330
331   sql_exec (plugin->dbh,
332             "CREATE TABLE IF NOT EXISTS messages (\n"
333             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
334             "  hop_counter INTEGER NOT NULL,\n"
335             "  signature BLOB,\n"
336             "  purpose BLOB,\n"
337             "  fragment_id INTEGER NOT NULL,\n"
338             "  fragment_offset INTEGER NOT NULL,\n"
339             "  message_id INTEGER NOT NULL,\n"
340             "  group_generation INTEGER NOT NULL,\n"
341             "  multicast_flags INTEGER NOT NULL,\n"
342             "  psycstore_flags INTEGER NOT NULL,\n"
343             "  data BLOB,\n"
344             "  PRIMARY KEY (channel_id, fragment_id),\n"
345             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
346             ");");
347
348   sql_exec (plugin->dbh,
349             "CREATE TABLE IF NOT EXISTS state (\n"
350             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
351             "  name TEXT NOT NULL,\n"
352             "  value_current BLOB,\n"
353             "  value_signed BLOB,\n"
354             "  PRIMARY KEY (channel_id, name)\n"
355             ");");
356
357   sql_exec (plugin->dbh,
358             "CREATE TABLE IF NOT EXISTS state_sync (\n"
359             "  channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
360             "  name TEXT NOT NULL,\n"
361             "  value BLOB,\n"
362             "  PRIMARY KEY (channel_id, name)\n"
363             ");");
364
365   /* Prepare statements */
366
367   sql_prepare (plugin->dbh,
368                "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
369                &plugin->insert_channel_key);
370
371   sql_prepare (plugin->dbh,
372                "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
373                &plugin->insert_slave_key);
374
375   sql_prepare (plugin->dbh,
376                "INSERT INTO membership\n"
377                " (channel_id, slave_id, did_join, announced_at,\n"
378                "  effective_since, group_generation)\n"
379                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
380                "        (SELECT id FROM slaves WHERE pub_key = ?),\n"
381                "        ?, ?, ?, ?);",
382                &plugin->insert_membership);
383
384   sql_prepare (plugin->dbh,
385                "SELECT did_join FROM membership\n"
386                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
387                "      AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
388                "      AND effective_since <= ? AND did_join = 1\n"
389                "ORDER BY announced_at DESC LIMIT 1;",
390                &plugin->select_membership);
391
392   sql_prepare (plugin->dbh,
393                "INSERT INTO messages\n"
394                " (channel_id, hop_counter, signature, purpose,\n"
395                "  fragment_id, fragment_offset, message_id,\n"
396                "  group_generation, multicast_flags, psycstore_flags, data)\n"
397                "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
398                "        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
399                &plugin->insert_fragment);
400
401   sql_prepare (plugin->dbh,
402                "UPDATE messages\n"
403                "SET psycstore_flags = psycstore_flags | ?\n"
404                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
405                "      AND message_id = ? AND fragment_offset = 0;",
406                &plugin->update_message_flags);
407
408   sql_prepare (plugin->dbh,
409                "SELECT hop_counter, signature, purpose, fragment_id,\n"
410                "       fragment_offset, message_id, group_generation,\n"
411                "       multicast_flags, psycstore_flags, data\n"
412                "FROM messages\n"
413                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
414                "      AND fragment_id = ?;",
415                &plugin->select_fragment);
416
417   sql_prepare (plugin->dbh,
418                "SELECT hop_counter, signature, purpose, fragment_id,\n"
419                "       fragment_offset, message_id, group_generation,\n"
420                "       multicast_flags, psycstore_flags, data\n"
421                "FROM messages\n"
422                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
423                "      AND message_id = ?;",
424                &plugin->select_message);
425
426   sql_prepare (plugin->dbh,
427                "SELECT hop_counter, signature, purpose, fragment_id,\n"
428                "       fragment_offset, message_id, group_generation,\n"
429                "       multicast_flags, psycstore_flags, data\n"
430                "FROM messages\n"
431                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
432                "      AND message_id = ? AND fragment_offset = ?;",
433                &plugin->select_message_fragment);
434
435   sql_prepare (plugin->dbh,
436                "SELECT fragment_id, message_id, group_generation\n"
437                "FROM messages\n"
438                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
439                "ORDER BY fragment_id DESC LIMIT 1;",
440                &plugin->select_counters_master);
441
442   sql_prepare (plugin->dbh,
443                "SELECT message_id\n"
444                "FROM messages\n"
445                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
446                "      AND psycstore_flags & ?\n"
447                "ORDER BY message_id DESC LIMIT 1",
448                &plugin->select_counters_slave);
449
450   sql_prepare (plugin->dbh,
451                "INSERT OR REPLACE INTO state\n"
452                "  (channel_id, name, value_current, value_signed)\n"
453                "SELECT new.channel_id, new.name,\n"
454                "       new.value_current, old.value_signed\n"
455                "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
456                "             AS channel_id,\n"
457                "             ? AS name, ? AS value_current) AS new\n"
458                "LEFT JOIN (SELECT channel_id, name, value_signed\n"
459                "           FROM state) AS old\n"
460                "ON new.channel_id = old.channel_id AND new.name = old.name;",
461                &plugin->insert_state_current);
462
463   sql_prepare (plugin->dbh,
464                "UPDATE state\n"
465                "SET value_signed = value_current\n"
466                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
467                &plugin->update_state_signed);
468
469   sql_prepare (plugin->dbh,
470                "INSERT INTO state_sync (channel_id, name, value)\n"
471                "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
472                &plugin->insert_state_sync);
473
474   sql_prepare (plugin->dbh,
475                "DELETE FROM state\n"
476                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
477                &plugin->delete_state);
478
479   sql_prepare (plugin->dbh,
480                "INSERT INTO state\n"
481                " (channel_id, name, value_current, value_signed)\n"
482                "SELECT channel_id, name, value, value\n"
483                "FROM state_sync\n"
484                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
485                &plugin->insert_state_from_sync);
486
487   sql_prepare (plugin->dbh,
488                "DELETE FROM state_sync\n"
489                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
490                &plugin->delete_state_sync);
491
492   sql_prepare (plugin->dbh,
493                "SELECT value_current\n"
494                "FROM state\n"
495                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
496                "      AND name = ?;",
497                &plugin->select_state_one);
498
499   sql_prepare (plugin->dbh,
500                "SELECT name, value_current\n"
501                "FROM state\n"
502                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
503                "      AND name = ? OR name LIKE ?;",
504                &plugin->select_state_prefix);
505
506   sql_prepare (plugin->dbh,
507                "SELECT name, value_signed\n"
508                "FROM state\n"
509                "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
510                "      AND value_signed IS NOT NULL;",
511                &plugin->select_state_signed);
512
513   return GNUNET_OK;
514 }
515
516
517 /**
518  * Shutdown database connection and associate data
519  * structures.
520  * @param plugin the plugin context (state for this module)
521  */
522 static void
523 database_shutdown (struct Plugin *plugin)
524 {
525   int result;
526   sqlite3_stmt *stmt;
527
528   if (NULL != plugin->insert_channel_key)
529     sqlite3_finalize (plugin->insert_channel_key);
530
531   if (NULL != plugin->insert_slave_key)
532     sqlite3_finalize (plugin->insert_slave_key);
533
534   if (NULL != plugin->insert_membership)
535     sqlite3_finalize (plugin->insert_membership);
536
537   if (NULL != plugin->select_membership)
538     sqlite3_finalize (plugin->select_membership);
539
540   if (NULL != plugin->insert_fragment)
541     sqlite3_finalize (plugin->insert_fragment);
542
543   if (NULL != plugin->update_message_flags)
544     sqlite3_finalize (plugin->update_message_flags);
545
546   if (NULL != plugin->select_fragment)
547     sqlite3_finalize (plugin->select_fragment);
548
549   if (NULL != plugin->select_message)
550     sqlite3_finalize (plugin->select_message);
551
552   if (NULL != plugin->select_message_fragment)
553     sqlite3_finalize (plugin->select_message_fragment);
554
555   if (NULL != plugin->select_counters_master)
556     sqlite3_finalize (plugin->select_counters_master);
557
558   if (NULL != plugin->select_counters_slave)
559     sqlite3_finalize (plugin->select_counters_slave);
560
561   if (NULL != plugin->insert_state_current)
562     sqlite3_finalize (plugin->insert_state_current);
563
564   if (NULL != plugin->update_state_signed)
565     sqlite3_finalize (plugin->update_state_signed);
566
567   if (NULL != plugin->insert_state_sync)
568     sqlite3_finalize (plugin->insert_state_sync);
569
570   if (NULL != plugin->delete_state)
571     sqlite3_finalize (plugin->delete_state);
572
573   if (NULL != plugin->insert_state_from_sync)
574     sqlite3_finalize (plugin->insert_state_from_sync);
575
576   if (NULL != plugin->delete_state_sync)
577     sqlite3_finalize (plugin->delete_state_sync);
578
579   if (NULL != plugin->select_state_one)
580     sqlite3_finalize (plugin->select_state_one);
581
582   if (NULL != plugin->select_state_prefix)
583     sqlite3_finalize (plugin->select_state_prefix);
584
585   result = sqlite3_close (plugin->dbh);
586   if (result == SQLITE_BUSY)
587   {
588     LOG (GNUNET_ERROR_TYPE_WARNING,
589          _("Tried to close sqlite without finalizing all prepared statements.\n"));
590     stmt = sqlite3_next_stmt (plugin->dbh, NULL);
591     while (stmt != NULL)
592     {
593       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite",
594                        "Closing statement %p\n", stmt);
595       result = sqlite3_finalize (stmt);
596       if (result != SQLITE_OK)
597         GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "sqlite",
598                          "Failed to close statement %p: %d\n", stmt, result);
599       stmt = sqlite3_next_stmt (plugin->dbh, NULL);
600     }
601     result = sqlite3_close (plugin->dbh);
602   }
603   if (SQLITE_OK != result)
604     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
605
606   GNUNET_free_non_null (plugin->fn);
607 }
608
609
610 static int
611 channel_key_store (struct Plugin *plugin,
612                    const struct GNUNET_CRYPTO_EccPublicKey *channel_key)
613 {
614   sqlite3_stmt *stmt = plugin->insert_channel_key;
615
616   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
617                                       sizeof (*channel_key), SQLITE_STATIC))
618   {
619     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
620                 "insert_channel_key (bind)");
621   }
622   else if (SQLITE_DONE != sqlite3_step (stmt))
623   {
624     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
625                 "insert_channel_key (step)");
626   }
627
628   if (SQLITE_OK != sqlite3_reset (stmt))
629   {
630     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
631                 "insert_channel_key (reset)");
632     return GNUNET_SYSERR;
633   }
634
635   return GNUNET_OK;
636 }
637
638
639 static int
640 slave_key_store (struct Plugin *plugin,
641                  const struct GNUNET_CRYPTO_EccPublicKey *slave_key)
642 {
643   sqlite3_stmt *stmt = plugin->insert_slave_key;
644
645   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key,
646                                       sizeof (*slave_key), SQLITE_STATIC))
647   {
648     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
649                 "insert_slave_key (bind)");
650   }
651   else if (SQLITE_DONE != sqlite3_step (stmt))
652   {
653     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
654                 "insert_slave_key (step)");
655   }
656
657
658   if (SQLITE_OK != sqlite3_reset (stmt))
659   {
660     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
661                 "insert_slave_key (reset)");
662     return GNUNET_SYSERR;
663   }
664
665   return GNUNET_OK;
666 }
667
668
669 /** 
670  * Store join/leave events for a PSYC channel in order to be able to answer
671  * membership test queries later.
672  *
673  * @see GNUNET_PSYCSTORE_membership_store()
674  *
675  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
676  */
677 static int
678 membership_store (void *cls,
679                   const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
680                   const struct GNUNET_CRYPTO_EccPublicKey *slave_key,
681                   int did_join,
682                   uint64_t announced_at,
683                   uint64_t effective_since,
684                   uint64_t group_generation)
685 {
686   if (announced_at > INT64_MAX ||
687       effective_since > INT64_MAX ||
688       group_generation > INT64_MAX)
689   {
690     GNUNET_break (0);
691     return GNUNET_SYSERR;
692   }
693
694   struct Plugin *plugin = cls;
695   sqlite3_stmt *stmt = plugin->insert_membership;
696
697   if (GNUNET_OK != channel_key_store (plugin, channel_key) ||
698       GNUNET_OK != slave_key_store (plugin, slave_key))
699     return GNUNET_SYSERR;
700
701   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
702                                       sizeof (*channel_key), SQLITE_STATIC) ||
703       SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
704                                       sizeof (*slave_key), SQLITE_STATIC) ||
705       SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join) ||
706       SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at) ||
707       SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since) ||
708       SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
709   {
710     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
711                 "insert_membership (bind)");
712   }
713   else if (SQLITE_DONE != sqlite3_step (stmt))
714   {
715     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
716                 "insert_membership (step)");
717   }
718
719   if (SQLITE_OK != sqlite3_reset (stmt))
720   {
721     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
722                 "insert_membership (reset)");
723     return GNUNET_SYSERR;
724   }
725
726   return GNUNET_OK;
727 }
728
729 /** 
730  * Test if a member was admitted to the channel at the given message ID.
731  *
732  * @see GNUNET_PSYCSTORE_membership_test()
733  * 
734  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
735  *         #GNUNET_SYSERR if there was en error.
736  */
737 static int
738 membership_test (void *cls,
739                  const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
740                  const struct GNUNET_CRYPTO_EccPublicKey *slave_key,
741                  uint64_t message_id)
742 {
743   struct Plugin *plugin = cls;
744   sqlite3_stmt *stmt = plugin->select_membership;
745   int ret = GNUNET_SYSERR;
746
747   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
748                                       sizeof (*channel_key), SQLITE_STATIC) ||
749       SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
750                                       sizeof (*slave_key), SQLITE_STATIC) ||
751       SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
752   {
753     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
754                 "select_membership (bind)");
755   }
756   else
757   {
758     switch (sqlite3_step (stmt))
759     {
760     case SQLITE_DONE:
761       ret = GNUNET_NO;
762       break;
763     case SQLITE_ROW:
764       ret = GNUNET_YES;
765     }
766   }
767
768   if (SQLITE_OK != sqlite3_reset (stmt))
769   {
770     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
771                 "select_membership (reset)");
772   }
773
774   return ret;
775 }
776
777 /** 
778  * Store a message fragment sent to a channel.
779  *
780  * @see GNUNET_PSYCSTORE_fragment_store()
781  * 
782  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
783  */
784 static int
785 fragment_store (void *cls,
786                 const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
787                 const struct GNUNET_MULTICAST_MessageHeader *msg,
788                 uint32_t psycstore_flags)
789 {
790   if (msg->fragment_id > INT64_MAX ||
791       msg->fragment_offset > INT64_MAX ||
792       msg->message_id > INT64_MAX ||
793       msg->group_generation > INT64_MAX)
794   {
795     GNUNET_break (0);
796     return GNUNET_SYSERR;
797   }
798
799   struct Plugin *plugin = cls;
800   sqlite3_stmt *stmt = plugin->insert_fragment;
801
802   if (GNUNET_OK != channel_key_store (plugin, channel_key))
803     return GNUNET_SYSERR;
804
805   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
806                                       sizeof (*channel_key), SQLITE_STATIC) ||
807       SQLITE_OK != sqlite3_bind_int64 (stmt, 2, msg->hop_counter ) ||
808       SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
809                                       sizeof (msg->signature), SQLITE_STATIC) ||
810       SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
811                                       sizeof (msg->purpose), SQLITE_STATIC) ||
812       SQLITE_OK != sqlite3_bind_int64 (stmt, 5, msg->fragment_id) ||
813       SQLITE_OK != sqlite3_bind_int64 (stmt, 6, msg->fragment_offset) ||
814       SQLITE_OK != sqlite3_bind_int64 (stmt, 7, msg->message_id) ||
815       SQLITE_OK != sqlite3_bind_int64 (stmt, 8, msg->group_generation) ||
816       SQLITE_OK != sqlite3_bind_int64 (stmt, 9, msg->flags) ||
817       SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags) ||
818       SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
819                                       ntohs (msg->header.size) - sizeof (*msg),
820                                       SQLITE_STATIC))
821   {
822     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
823                 "insert_fragment (bind)");
824   }
825   else if (SQLITE_DONE != sqlite3_step (stmt))
826   {
827     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
828                 "insert_fragment (step)");
829   }
830
831   if (SQLITE_OK != sqlite3_reset (stmt))
832   {
833     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
834                 "insert_fragment (reset)");
835     return GNUNET_SYSERR;
836   }
837
838   return GNUNET_OK;
839 }
840
841 /** 
842  * Set additional flags for a given message.
843  *
844  * They are OR'd with any existing flags set.
845  * 
846  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
847  */
848 static int
849 message_add_flags (void *cls,
850                    const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
851                    uint64_t message_id,
852                    uint64_t psycstore_flags)
853 {
854   struct Plugin *plugin = cls;
855   sqlite3_stmt *stmt = plugin->update_message_flags;
856   int ret = GNUNET_SYSERR;
857
858   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags) ||
859       SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
860                                       sizeof (*channel_key), SQLITE_STATIC) ||
861       SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
862   {
863     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
864                 "update_message_flags (bind)");
865   }
866   else
867   {
868     switch (sqlite3_step (stmt))
869     {
870     case SQLITE_DONE:
871       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
872       break;
873     default:
874       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
875                   "update_message_flags (step)");
876     }
877   }
878
879   if (SQLITE_OK != sqlite3_reset (stmt))
880   {
881     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
882                 "update_message_flags (reset)");
883     return GNUNET_SYSERR;
884   }
885
886   return ret;
887 }
888
889 static int
890 fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
891               void *cb_cls)
892 {
893   int data_size = sqlite3_column_bytes (stmt, 9);
894   struct GNUNET_MULTICAST_MessageHeader *msg
895     = GNUNET_malloc (sizeof (*msg) + data_size);
896
897   msg->header.size = htons (sizeof (*msg) + data_size);
898   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
899   msg->hop_counter = (uint32_t) sqlite3_column_int64 (stmt, 0);
900   memcpy (&msg->signature,
901           sqlite3_column_blob (stmt, 1),
902           sqlite3_column_bytes (stmt, 1));
903   memcpy (&msg->purpose,
904           sqlite3_column_blob (stmt, 2),
905           sqlite3_column_bytes (stmt, 2));
906   msg->fragment_id = sqlite3_column_int64 (stmt, 3);
907   msg->fragment_offset = sqlite3_column_int64 (stmt, 4);
908   msg->message_id = sqlite3_column_int64 (stmt, 5);
909   msg->group_generation = sqlite3_column_int64 (stmt, 6);
910   msg->flags = sqlite3_column_int64 (stmt, 7);
911   memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
912
913   return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
914 }
915
916 /** 
917  * Retrieve a message fragment by fragment ID.
918  *
919  * @see GNUNET_PSYCSTORE_fragment_get()
920  * 
921  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
922  */
923 static int
924 fragment_get (void *cls,
925               const struct
926               GNUNET_CRYPTO_EccPublicKey *channel_key,
927               uint64_t fragment_id,
928               GNUNET_PSYCSTORE_FragmentCallback cb,
929               void *cb_cls)
930 {
931   struct Plugin *plugin = cls;
932   sqlite3_stmt *stmt = plugin->select_fragment;
933   int ret = GNUNET_SYSERR;
934
935   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
936                                       sizeof (*channel_key),
937                                       SQLITE_STATIC) ||
938       SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id))
939   {
940     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
941                 "select_fragment (bind)");
942   }
943   else
944   {
945     switch (sqlite3_step (stmt))
946     {
947     case SQLITE_DONE:
948       ret = GNUNET_NO;
949       break;
950     case SQLITE_ROW:
951       ret = fragment_row (stmt, cb, cb_cls);
952       break;
953     default:
954       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
955                   "select_fragment (step)");
956     }
957   }
958
959   if (SQLITE_OK != sqlite3_reset (stmt))
960   {
961     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
962                 "select_fragment (reset)");
963   }
964
965   return ret;
966 }
967
968 /** 
969  * Retrieve all fragments of a message.
970  *
971  * @see GNUNET_PSYCSTORE_message_get()
972  * 
973  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
974  */
975 static int
976 message_get (void *cls,
977              const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
978              uint64_t message_id,
979              GNUNET_PSYCSTORE_FragmentCallback cb,
980              void *cb_cls)
981 {
982   struct Plugin *plugin = cls;
983   sqlite3_stmt *stmt = plugin->select_message;
984   int ret = GNUNET_SYSERR;
985
986   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
987                                       sizeof (*channel_key),
988                                       SQLITE_STATIC) ||
989       SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id))
990   {
991     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
992                 "select_message (bind)");
993   }
994   else
995   {
996     int sql_ret;
997     do
998     {
999       sql_ret = sqlite3_step (stmt);
1000       switch (sql_ret)
1001       {
1002       case SQLITE_DONE:
1003         if (ret != GNUNET_OK)
1004           ret = GNUNET_NO;
1005         break;
1006       case SQLITE_ROW:
1007         ret = fragment_row (stmt, cb, cb_cls);
1008         if (ret != GNUNET_YES)
1009           sql_ret = SQLITE_DONE;
1010         break;
1011       default:
1012         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1013                     "select_message (step)");
1014       }
1015     }
1016     while (sql_ret == SQLITE_ROW);
1017   }
1018
1019   if (SQLITE_OK != sqlite3_reset (stmt))
1020   {
1021     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1022                 "select_message (reset)");
1023   }
1024
1025   return ret;
1026 }
1027
1028 /** 
1029  * Retrieve a fragment of message specified by its message ID and fragment
1030  * offset.
1031  *
1032  * @see GNUNET_PSYCSTORE_message_get_fragment()
1033  * 
1034  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1035  */
1036 static int
1037 message_get_fragment (void *cls,
1038                       const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1039                       uint64_t message_id,
1040                       uint64_t fragment_offset,
1041                       GNUNET_PSYCSTORE_FragmentCallback cb,
1042                       void *cb_cls)
1043 {
1044   struct Plugin *plugin = cls;
1045   int ret = GNUNET_SYSERR;
1046
1047   sqlite3_stmt *stmt = plugin->select_message_fragment;
1048
1049   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1050                                       sizeof (*channel_key),
1051                                       SQLITE_STATIC) ||
1052       SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id) ||
1053       SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1054   {
1055     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1056                 "select_message_fragment (bind)");
1057   }
1058   else
1059   {
1060     switch (sqlite3_step (stmt))
1061     {
1062     case SQLITE_DONE:
1063       ret = GNUNET_NO;
1064       break;
1065     case SQLITE_ROW:
1066       ret = fragment_row (stmt, cb, cb_cls);
1067       break;
1068     default:
1069       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1070                   "select_message_fragment (step)");
1071     }
1072   }
1073
1074   if (SQLITE_OK != sqlite3_reset (stmt))
1075   {
1076     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1077                 "select_message_fragment (reset)");
1078   }
1079
1080   return ret;
1081 }
1082
1083 /** 
1084  * Retrieve latest values of counters for a channel master.
1085  *
1086  * @see GNUNET_PSYCSTORE_counters_get_master()
1087  * 
1088  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1089  */
1090 static int
1091 counters_get_master (void *cls,
1092                      const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1093                      uint64_t *fragment_id,
1094                      uint64_t *message_id,
1095                      uint64_t *group_generation)
1096 {
1097   struct Plugin *plugin = cls;
1098   sqlite3_stmt *stmt = plugin->select_counters_master;
1099   int ret = GNUNET_SYSERR;
1100
1101   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1102                                       sizeof (*channel_key),
1103                                       SQLITE_STATIC))
1104   {
1105     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1106                 "select_counters_master (bind)");
1107   }
1108   else
1109   {
1110     switch (sqlite3_step (stmt))
1111     {
1112     case SQLITE_DONE:
1113       ret = GNUNET_NO;
1114       break;
1115     case SQLITE_ROW:
1116       *fragment_id = sqlite3_column_int64 (stmt, 0);
1117       *message_id = sqlite3_column_int64 (stmt, 1);
1118       *group_generation = sqlite3_column_int64 (stmt, 2);
1119       ret = GNUNET_OK;
1120       break;
1121     default:
1122       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1123                   "select_counters_master (step)");
1124     }
1125   }
1126
1127   if (SQLITE_OK != sqlite3_reset (stmt))
1128   {
1129     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1130                 "select_counters_master (reset)");
1131   }
1132
1133   return ret;
1134 }
1135
1136 /** 
1137  * Retrieve latest values of counters for a channel slave. 
1138  *
1139  * @see GNUNET_PSYCSTORE_counters_get_slave()
1140  * 
1141  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1142  */
1143 static int
1144 counters_get_slave (void *cls,
1145                     const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1146                     uint64_t *max_state_msg_id)
1147 {
1148   struct Plugin *plugin = cls;
1149   sqlite3_stmt *stmt = plugin->select_counters_slave;
1150   int ret = GNUNET_SYSERR;
1151
1152   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1153                                       sizeof (*channel_key),
1154                                       SQLITE_STATIC) ||
1155       SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1156                                        GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1157   {
1158     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1159                 "select_counters_slave (bind)");
1160   }
1161   else
1162   {
1163     switch (sqlite3_step (stmt))
1164     {
1165     case SQLITE_DONE:
1166       ret = GNUNET_NO;
1167       break;
1168     case SQLITE_ROW:
1169       *max_state_msg_id = sqlite3_column_int64 (stmt, 0);
1170       ret = GNUNET_OK;
1171       break;
1172     default:
1173       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1174                   "select_counters_slave (step)");
1175     }
1176   }
1177
1178   if (SQLITE_OK != sqlite3_reset (stmt))
1179   {
1180     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1181                 "select_counters_slave (reset)");
1182   }
1183
1184   return ret;
1185 }
1186
1187 /** 
1188  * Set a state variable to the given value.
1189  *
1190  * @see GNUNET_PSYCSTORE_state_modify()
1191  * 
1192  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1193  */
1194 static int
1195 state_set (void *cls,
1196            const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1197            const char *name, const void *value, size_t value_size)
1198 {
1199   struct Plugin *plugin = cls;
1200   int ret = GNUNET_SYSERR;
1201
1202   sqlite3_stmt *stmt = plugin->insert_state_current;
1203
1204   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1205                                       sizeof (*channel_key), SQLITE_STATIC) ||
1206       SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC) ||
1207       SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1208                                       SQLITE_STATIC))
1209   {
1210     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1211                 "insert_state_current (bind)");
1212   }
1213   else
1214   {
1215     switch (sqlite3_step (stmt))
1216     {
1217     case SQLITE_DONE:
1218       ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
1219       break;
1220     default:
1221       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1222                   "insert_state_current (step)");
1223     }
1224   }
1225
1226   if (SQLITE_OK != sqlite3_reset (stmt))
1227   {
1228     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1229                 "insert_state_current (reset)");
1230     return GNUNET_SYSERR;
1231   }
1232
1233   return ret;
1234 }
1235
1236
1237 /** 
1238  * Reset the state of a channel.
1239  *
1240  * @see GNUNET_PSYCSTORE_state_reset()
1241  * 
1242  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1243  */
1244 static int
1245 state_reset (void *cls, const struct GNUNET_CRYPTO_EccPublicKey *channel_key)
1246 {
1247   struct Plugin *plugin = cls;
1248   sqlite3_stmt *stmt = plugin->delete_state;
1249
1250   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1251                                       sizeof (*channel_key), SQLITE_STATIC))
1252   {
1253     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1254                 "delete_state (bind)");
1255   }
1256   else if (SQLITE_DONE != sqlite3_step (stmt))
1257   {
1258     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1259                 "delete_state (step)");
1260   }
1261
1262   if (SQLITE_OK != sqlite3_reset (stmt))
1263   {
1264     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1265                 "delete_state (reset)");
1266     return GNUNET_SYSERR;
1267   }
1268
1269   return GNUNET_OK;
1270 }
1271
1272
1273 /** 
1274  * Update signed values of state variables in the state store.
1275  *
1276  * @see GNUNET_PSYCSTORE_state_hash_update()
1277  * 
1278  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1279  */
1280 static int
1281 state_update_signed (void *cls,
1282                      const struct GNUNET_CRYPTO_EccPublicKey *channel_key)
1283 {
1284   struct Plugin *plugin = cls;
1285   sqlite3_stmt *stmt = plugin->update_state_signed;
1286
1287   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1288                                       sizeof (*channel_key), SQLITE_STATIC))
1289   {
1290     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1291                 "update_state_signed (bind)");
1292   }
1293   else if (SQLITE_DONE != sqlite3_step (stmt))
1294   {
1295     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1296                 "update_state_signed (step)");
1297   }
1298
1299   if (SQLITE_OK != sqlite3_reset (stmt))
1300   {
1301     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1302                 "update_state_signed (reset)");
1303     return GNUNET_SYSERR;
1304   }
1305
1306   return GNUNET_OK;
1307 }
1308
1309
1310 /** 
1311  * Retrieve a state variable by name.
1312  *
1313  * @see GNUNET_PSYCSTORE_state_get()
1314  * 
1315  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1316  */
1317 static int
1318 state_get (void *cls, const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1319            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1320 {
1321   struct Plugin *plugin = cls;
1322   int ret = GNUNET_SYSERR;
1323
1324   sqlite3_stmt *stmt = plugin->select_state_one;
1325
1326   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1327                                       sizeof (*channel_key),
1328                                       SQLITE_STATIC) ||
1329       SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1330   {
1331     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1332                 "select_state_one (bind)");
1333   }
1334   else
1335   {
1336     switch (sqlite3_step (stmt))
1337     {
1338     case SQLITE_DONE:
1339       ret = GNUNET_NO;
1340       break;
1341     case SQLITE_ROW:
1342       ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1343                 sqlite3_column_bytes (stmt, 0));
1344       break;
1345     default:
1346       LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1347                   "select_state_one (step)");
1348     }
1349   }
1350
1351   if (SQLITE_OK != sqlite3_reset (stmt))
1352   {
1353     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1354                 "select_state_one (reset)");
1355   }
1356   
1357
1358   return ret;
1359 }
1360
1361
1362 /** 
1363  * Retrieve all state variables for a channel with the given prefix.
1364  *
1365  * @see GNUNET_PSYCSTORE_state_get_all()
1366  * 
1367  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1368  */
1369 static int
1370 state_get_all (void *cls, const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1371                const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1372                void *cb_cls)
1373 {
1374   struct Plugin *plugin = cls;
1375   int ret = GNUNET_SYSERR;
1376
1377   sqlite3_stmt *stmt = plugin->select_state_prefix;
1378   size_t name_len = strlen (name);
1379   char *name_prefix = GNUNET_malloc (name_len + 2);
1380   memcpy (name_prefix, name, name_len);
1381   memcpy (name_prefix + name_len, "_%", 2);
1382
1383   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1384                                       sizeof (*channel_key), SQLITE_STATIC) ||
1385       SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) ||
1386       SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1387                                       SQLITE_STATIC))
1388   {
1389     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1390                 "select_state_prefix (bind)");
1391   }
1392   else
1393   {
1394     int sql_ret;
1395     do
1396     {
1397       sql_ret = sqlite3_step (stmt);
1398       switch (sql_ret)
1399       {
1400       case SQLITE_DONE:
1401         if (ret != GNUNET_OK)
1402           ret = GNUNET_NO;
1403         break;
1404       case SQLITE_ROW:
1405         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1406                   sqlite3_column_blob (stmt, 1),
1407                   sqlite3_column_bytes (stmt, 1));
1408         if (ret != GNUNET_YES)
1409           sql_ret = SQLITE_DONE;
1410         break;
1411       default:
1412         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1413                     "select_state_prefix (step)");
1414       }
1415     }
1416     while (sql_ret == SQLITE_ROW);
1417   }
1418
1419   if (SQLITE_OK != sqlite3_reset (stmt))
1420   {
1421     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422                 "select_state_prefix (reset)");
1423   }
1424
1425   return ret;
1426 }
1427
1428
1429 /** 
1430  * Retrieve all signed state variables for a channel.
1431  *
1432  * @see GNUNET_PSYCSTORE_state_get_signed()
1433  * 
1434  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1435  */
1436 static int
1437 state_get_signed (void *cls,
1438                   const struct GNUNET_CRYPTO_EccPublicKey *channel_key,
1439                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1440 {
1441   struct Plugin *plugin = cls;
1442   int ret = GNUNET_SYSERR;
1443
1444   sqlite3_stmt *stmt = plugin->select_state_signed;
1445
1446   if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1447                                       sizeof (*channel_key), SQLITE_STATIC))
1448   {
1449     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1450                 "select_state_signed (bind)");
1451   }
1452   else
1453   {
1454     int sql_ret;
1455     do
1456     {
1457       sql_ret = sqlite3_step (stmt);
1458       switch (sql_ret)
1459       {
1460       case SQLITE_DONE:
1461         if (ret != GNUNET_OK)
1462           ret = GNUNET_NO;
1463         break;
1464       case SQLITE_ROW:
1465         ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1466                   sqlite3_column_blob (stmt, 1),
1467                   sqlite3_column_bytes (stmt, 1));
1468         if (ret != GNUNET_YES)
1469           sql_ret = SQLITE_DONE;
1470         break;
1471       default:
1472         LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1473                     "select_state_signed (step)");
1474       }
1475     }
1476     while (sql_ret == SQLITE_ROW);
1477   }
1478
1479   if (SQLITE_OK != sqlite3_reset (stmt))
1480   {
1481     LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1482                 "select_state_signed (reset)");
1483   }
1484
1485   return ret;
1486 }
1487
1488
1489 /** 
1490  * Entry point for the plugin.
1491  *
1492  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1493  * @return NULL on error, otherwise the plugin context
1494  */
1495 void *
1496 libgnunet_plugin_psycstore_sqlite_init (void *cls)
1497 {
1498   static struct Plugin plugin;
1499   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1500   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1501
1502   if (NULL != plugin.cfg)
1503     return NULL;                /* can only initialize once! */
1504   memset (&plugin, 0, sizeof (struct Plugin));
1505   plugin.cfg = cfg;  
1506   if (GNUNET_OK != database_setup (&plugin))
1507   {
1508     database_shutdown (&plugin);
1509     return NULL;
1510   }
1511   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1512   api->cls = &plugin;
1513   api->membership_store = &membership_store;
1514   api->membership_test = &membership_test;
1515   api->fragment_store = &fragment_store;
1516   api->message_add_flags = &message_add_flags;
1517   api->fragment_get = &fragment_get;
1518   api->message_get = &message_get;
1519   api->message_get_fragment = &message_get_fragment;
1520   api->counters_get_master = &counters_get_master;
1521   api->counters_get_slave = &counters_get_slave;
1522   api->state_set = &state_set;
1523   api->state_reset = &state_reset;
1524   api->state_update_signed = &state_update_signed;
1525   api->state_get = &state_get;
1526   api->state_get_all = &state_get_all;
1527   api->state_get_signed = &state_get_signed;
1528
1529   LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
1530   return api;
1531 }
1532
1533
1534 /**
1535  * Exit point from the plugin.
1536  *
1537  * @param cls The plugin context (as returned by "init")
1538  * @return Always NULL
1539  */
1540 void *
1541 libgnunet_plugin_psycstore_sqlite_done (void *cls)
1542 {
1543   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1544   struct Plugin *plugin = api->cls;
1545
1546   database_shutdown (plugin);
1547   plugin->cfg = NULL;
1548   GNUNET_free (api);
1549   LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
1550   return NULL;
1551 }
1552
1553 /* end of plugin_psycstore_sqlite.c */