Do non-select statements
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_postgres.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2016 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_postgres.c
23  * @brief PostgresQL-based psycstore backend
24  * @author Daniel Golle
25  * @author Gabor X Toth
26  * @author Christian Grothoff
27  * @author Christophe Genevey
28  */
29
30 #include "platform.h"
31 #include "gnunet_psycstore_plugin.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "psycstore.h"
37 #include "gnunet_postgres_lib.h"
38 #include "gnunet_pq_lib.h"
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
55
56 enum Transactions {
57   TRANSACTION_NONE = 0,
58   TRANSACTION_STATE_MODIFY,
59   TRANSACTION_STATE_SYNC,
60 };
61
62 /**
63  * Context for all functions in this plugin.
64  */
65 struct Plugin
66 {
67
68   const struct GNUNET_CONFIGURATION_Handle *cfg;
69
70   /**
71    * Native Postgres database handle.
72    */
73   PGconn *dbh;
74
75   enum Transactions transaction;
76
77   void *cls;
78 };
79
80
81 /**
82  * Initialize the database connections and associated
83  * data structures (create tables and indices
84  * as needed as well).
85  *
86  * @param plugin the plugin context (state for this module)
87  * @return #GNUNET_OK on success
88  */
89 static int
90 database_setup (struct Plugin *plugin)
91 {
92   struct GNUNET_PQ_ExecuteStatement es[] = {
93     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n"
94                             " id SERIAL,\n"
95                             " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
96                             " max_state_message_id BIGINT,\n"
97                             " state_hash_message_id BIGINT,\n"
98                             " PRIMARY KEY(id)\n"
99                             ")"
100                             "WITH OIDS"),
101     GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
102                             " ON channels (pub_key)"),
103     GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
104                             " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
105                             "RETURNS NULL ON NULL INPUT"),
106     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n"
107                             " id SERIAL,\n"
108                             " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
109                             " PRIMARY KEY(id)\n"
110                             ")"
111                             "WITH OIDS"),
112     GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
113                             " ON slaves (pub_key)"),
114     GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
115                             " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
116                             "RETURNS NULL ON NULL INPUT"),
117     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n"
118                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
119                             "  slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
120                             "  did_join INT NOT NULL,\n"
121                             "  announced_at BIGINT NOT NULL,\n"
122                             "  effective_since BIGINT NOT NULL,\n"
123                             "  group_generation BIGINT NOT NULL\n"
124                             ")"
125                             "WITH OIDS"),
126     GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
127                             "ON membership (channel_id, slave_id)"),
128     /** @todo messages table: add method_name column */
129     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n"
130                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
131                             "  hop_counter INT NOT NULL,\n"
132                             "  signature BYTEA CHECK (LENGTH(signature)=64),\n"
133                             "  purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
134                             "  fragment_id BIGINT NOT NULL,\n"
135                             "  fragment_offset BIGINT NOT NULL,\n"
136                             "  message_id BIGINT NOT NULL,\n"
137                             "  group_generation BIGINT NOT NULL,\n"
138                             "  multicast_flags INT NOT NULL,\n"
139                             "  psycstore_flags INT NOT NULL,\n"
140                             "  data BYTEA,\n"
141                             "  PRIMARY KEY (channel_id, fragment_id),\n"
142                             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
143                             ")"
144                             "WITH OIDS"),
145     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n"
146                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
147                             "  name TEXT NOT NULL,\n"
148                             "  value_current BYTEA,\n"
149                             "  value_signed BYTEA,\n"
150                             "  PRIMARY KEY (channel_id, name)\n"
151                             ")"
152                             "WITH OIDS"),
153     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n"
154                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
155                             "  name TEXT NOT NULL,\n"
156                             "  value BYTEA,\n"
157                             "  PRIMARY KEY (channel_id, name)\n"
158                             ")"
159                             "WITH OIDS"),
160     GNUNET_PQ_EXECUTE_STATEMENT_END
161   };
162
163   /* Open database and precompile statements */
164   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg,
165                                             "psycstore-postgres");
166   if (NULL == plugin->dbh)
167     return GNUNET_SYSERR;
168   if (GNUNET_OK !=
169       GNUNET_PQ_exec_statements (plugin->dbh,
170                                  es))
171   {
172     PQfinish (plugin->dbh);
173     plugin->dbh = NULL;
174     return GNUNET_SYSERR;
175   }
176
177   /* Prepare statements */
178   {
179     struct GNUNET_PQ_PreparedStatement ps[] = {
180       GNUNET_PQ_make_prepare ("transaction_begin",
181                               "BEGIN", 0),
182       GNUNET_PQ_make_prepare ("transaction_commit",
183                               "COMMIT", 0),
184       GNUNET_PQ_make_prepare ("transaction_rollback",
185                               "ROLLBACK", 0),
186       GNUNET_PQ_make_prepare ("insert_channel_key",
187                               "INSERT INTO channels (pub_key) VALUES ($1)"
188                               " ON CONFLICT DO NOTHING", 1),
189       GNUNET_PQ_make_prepare ("insert_slave_key",
190                               "INSERT INTO slaves (pub_key) VALUES ($1)"
191                               " ON CONFLICT DO NOTHING", 1),
192       GNUNET_PQ_make_prepare ("insert_membership",
193                               "INSERT INTO membership\n"
194                               " (channel_id, slave_id, did_join, announced_at,\n"
195                               "  effective_since, group_generation)\n"
196                               "VALUES (get_chan_id($1),\n"
197                               "        get_slave_id($2),\n"
198                               "        $3, $4, $5, $6)", 6),
199       GNUNET_PQ_make_prepare ("select_membership",
200                               "SELECT did_join FROM membership\n"
201                               "WHERE channel_id = get_chan_id($1)\n"
202                               "      AND slave_id = get_slave_id($2)\n"
203                               "      AND effective_since <= $3 AND did_join = 1\n"
204                               "ORDER BY announced_at DESC LIMIT 1", 3),
205       GNUNET_PQ_make_prepare ("insert_fragment",
206                               "INSERT INTO messages\n"
207                               " (channel_id, hop_counter, signature, purpose,\n"
208                               "  fragment_id, fragment_offset, message_id,\n"
209                               "  group_generation, multicast_flags, psycstore_flags, data)\n"
210                               "VALUES (get_chan_id($1),\n"
211                               "        $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
212                               "ON CONFLICT DO NOTHING", 11),
213       GNUNET_PQ_make_prepare ("update_message_flags",
214                               "UPDATE messages\n"
215                               "SET psycstore_flags = psycstore_flags | $1\n"
216                               "WHERE channel_id = get_chan_id($2) \n"
217                               "      AND message_id = $3 AND fragment_offset = 0", 3),
218       GNUNET_PQ_make_prepare ("select_fragments",
219                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
220                               "       fragment_offset, message_id, group_generation,\n"
221                               "       multicast_flags, psycstore_flags, data\n"
222                               "FROM messages\n"
223                               "WHERE channel_id = get_chan_id($1) \n"
224                               "      AND $2 <= fragment_id AND fragment_id <= $3", 3),
225       /** @todo select_messages: add method_prefix filter */
226       GNUNET_PQ_make_prepare ("select_messages",
227                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
228                               "       fragment_offset, message_id, group_generation,\n"
229                               "       multicast_flags, psycstore_flags, data\n"
230                               "FROM messages\n"
231                               "WHERE channel_id = get_chan_id($1) \n"
232                               "      AND $2 <= message_id AND message_id <= $3\n"
233                               "LIMIT $4;", 4),
234       /** @todo select_latest_messages: add method_prefix filter */
235       GNUNET_PQ_make_prepare ("select_latest_fragments",
236                               "SELECT  rev.hop_counter AS hop_counter,\n"
237                               "        rev.signature AS signature,\n"
238                               "        rev.purpose AS purpose,\n"
239                               "        rev.fragment_id AS fragment_id,\n"
240                               "        rev.fragment_offset AS fragment_offset,\n"
241                               "        rev.message_id AS message_id,\n"
242                               "        rev.group_generation AS group_generation,\n"
243                               "        rev.multicast_flags AS multicast_flags,\n"
244                               "        rev.psycstore_flags AS psycstore_flags,\n"
245                               "        rev.data AS data\n"
246                               " FROM\n"
247                               " (SELECT hop_counter, signature, purpose, fragment_id,\n"
248                               "        fragment_offset, message_id, group_generation,\n"
249                               "        multicast_flags, psycstore_flags, data \n"
250                               "  FROM messages\n"
251                               "  WHERE channel_id = get_chan_id($1) \n"
252                               "  ORDER BY fragment_id DESC\n"
253                               "  LIMIT $2) AS rev\n"
254                               " ORDER BY rev.fragment_id;", 2),
255       GNUNET_PQ_make_prepare ("select_latest_messages",
256                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
257                               "       fragment_offset, message_id, group_generation,\n"
258                               "        multicast_flags, psycstore_flags, data\n"
259                               "FROM messages\n"
260                               "WHERE channel_id = get_chan_id($1)\n"
261                               "      AND message_id IN\n"
262                               "      (SELECT message_id\n"
263                               "       FROM messages\n"
264                               "       WHERE channel_id = get_chan_id($2) \n"
265                               "       GROUP BY message_id\n"
266                               "       ORDER BY message_id\n"
267                               "       DESC LIMIT $3)\n"
268                               "ORDER BY fragment_id", 3),
269       GNUNET_PQ_make_prepare ("select_message_fragment",
270                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
271                               "       fragment_offset, message_id, group_generation,\n"
272                               "       multicast_flags, psycstore_flags, data\n"
273                               "FROM messages\n"
274                               "WHERE channel_id = get_chan_id($1) \n"
275                               "      AND message_id = $2 AND fragment_offset = $3", 3),
276       GNUNET_PQ_make_prepare ("select_counters_message",
277                               "SELECT fragment_id, message_id, group_generation\n"
278                               "FROM messages\n"
279                               "WHERE channel_id = get_chan_id($1)\n"
280                               "ORDER BY fragment_id DESC LIMIT 1", 1),
281       GNUNET_PQ_make_prepare ("select_counters_state",
282                               "SELECT max_state_message_id\n"
283                               "FROM channels\n"
284                               "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1),
285       GNUNET_PQ_make_prepare ("update_max_state_message_id",
286                               "UPDATE channels\n"
287                               "SET max_state_message_id = $1\n"
288                               "WHERE pub_key = $2", 2),
289
290       GNUNET_PQ_make_prepare ("update_state_hash_message_id",
291                               "UPDATE channels\n"
292                               "SET state_hash_message_id = $1\n"
293                               "WHERE pub_key = $2", 2),
294       GNUNET_PQ_make_prepare ("insert_state_current",
295                               "INSERT INTO state\n"
296                               "  (channel_id, name, value_current, value_signed)\n"
297                               "SELECT new.channel_id, new.name,\n"
298                               "       new.value_current, old.value_signed\n"
299                               "FROM (SELECT get_chan_id($1) AS channel_id,\n"
300                               "             $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
301                               "LEFT JOIN (SELECT channel_id, name, value_signed\n"
302                               "           FROM state) AS old\n"
303                               "ON new.channel_id = old.channel_id AND new.name = old.name\n"
304                               "ON CONFLICT (channel_id, name)\n"
305                               "   DO UPDATE SET value_current = EXCLUDED.value_current,\n"
306                               "                 value_signed = EXCLUDED.value_signed", 3),
307       GNUNET_PQ_make_prepare ("delete_state_empty",
308                               "DELETE FROM state\n"
309                               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
310                               "      AND (value_current IS NULL OR length(value_current) = 0)\n"
311                               "      AND (value_signed IS NULL OR length(value_signed) = 0)", 1),
312       GNUNET_PQ_make_prepare ("update_state_signed",
313                               "UPDATE state\n"
314                               "SET value_signed = value_current\n"
315                               "WHERE channel_id = get_chan_id($1) ", 1),
316       GNUNET_PQ_make_prepare ("delete_state",
317                               "DELETE FROM state\n"
318                               "WHERE channel_id = get_chan_id($1) ", 1),
319       GNUNET_PQ_make_prepare ("insert_state_sync",
320                               "INSERT INTO state_sync (channel_id, name, value)\n"
321                               "VALUES (get_chan_id($1), $2, $3)", 3),
322       GNUNET_PQ_make_prepare ("insert_state_from_sync",
323                               "INSERT INTO state\n"
324                               " (channel_id, name, value_current, value_signed)\n"
325                               "SELECT channel_id, name, value, value\n"
326                               "FROM state_sync\n"
327                               "WHERE channel_id = get_chan_id($1)", 1),
328       GNUNET_PQ_make_prepare ("delete_state_sync",
329                               "DELETE FROM state_sync\n"
330                               "WHERE channel_id = get_chan_id($1)", 1),
331       GNUNET_PQ_make_prepare ("select_state_one",
332                               "SELECT value_current\n"
333                               "FROM state\n"
334                               "WHERE channel_id = get_chan_id($1)\n"
335                               "      AND name = $2", 2),
336       GNUNET_PQ_make_prepare ("select_state_prefix",
337                               "SELECT name, value_current\n"
338                               "FROM state\n"
339                               "WHERE channel_id = get_chan_id($1)\n"
340                               "      AND (name = $2 OR substr(name, 1, $3) = $4)", 4),
341       GNUNET_PQ_make_prepare ("select_state_signed",
342                               "SELECT name, value_signed\n"
343                               "FROM state\n"
344                               "WHERE channel_id = get_chan_id($1)\n"
345                               "      AND value_signed IS NOT NULL", 1),
346       GNUNET_PQ_PREPARED_STATEMENT_END
347     };
348
349     if (GNUNET_OK !=
350         GNUNET_PQ_prepare_statements (plugin->dbh,
351                                       ps))
352     {
353       PQfinish (plugin->dbh);
354       plugin->dbh = NULL;
355       return GNUNET_SYSERR;
356     }
357   }
358
359   return GNUNET_OK;
360 }
361
362
363 /**
364  * Shutdown database connection and associate data
365  * structures.
366  * @param plugin the plugin context (state for this module)
367  */
368 static void
369 database_shutdown (struct Plugin *plugin)
370 {
371   PQfinish (plugin->dbh);
372   plugin->dbh = NULL;
373 }
374
375
376 /**
377  * Execute a prepared statement with a @a channel_key argument.
378  *
379  * @param plugin Plugin handle.
380  * @param stmt Statement to execute.
381  * @param channel_key Public key of the channel.
382  *
383  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
384  */
385 static int
386 exec_channel (struct Plugin *plugin, const char *stmt,
387               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
388 {
389   struct GNUNET_PQ_QueryParam params[] = {
390     GNUNET_PQ_query_param_auto_from_type (channel_key),
391     GNUNET_PQ_query_param_end
392   };
393
394   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
395       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
396     return GNUNET_SYSERR;
397
398   return GNUNET_OK;
399 }
400
401
402 /**
403  * Begin a transaction.
404  */
405 static int
406 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
407 {
408   struct GNUNET_PQ_QueryParam params[] = {
409     GNUNET_PQ_query_param_end
410   };
411
412   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
413       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params))
414     return GNUNET_SYSERR;
415
416   plugin->transaction = transaction;
417   return GNUNET_OK;
418 }
419
420
421 /**
422  * Commit current transaction.
423  */
424 static int
425 transaction_commit (struct Plugin *plugin)
426 {
427   struct GNUNET_PQ_QueryParam params[] = {
428     GNUNET_PQ_query_param_end
429   };
430
431   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
432       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params))
433     return GNUNET_SYSERR;
434
435   plugin->transaction = TRANSACTION_NONE;
436   return GNUNET_OK;
437 }
438
439
440 /**
441  * Roll back current transaction.
442  */
443 static int
444 transaction_rollback (struct Plugin *plugin)
445 {
446   struct GNUNET_PQ_QueryParam params[] = {
447     GNUNET_PQ_query_param_end
448   };
449
450   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
451       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params))
452     return GNUNET_SYSERR;
453
454   plugin->transaction = TRANSACTION_NONE;
455   return GNUNET_OK;
456 }
457
458
459 static int
460 channel_key_store (struct Plugin *plugin,
461                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
462 {
463   struct GNUNET_PQ_QueryParam params[] = {
464     GNUNET_PQ_query_param_auto_from_type (channel_key),
465     GNUNET_PQ_query_param_end
466   };
467
468   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
469       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_channel_key", params))
470     return GNUNET_SYSERR;
471
472   return GNUNET_OK;
473 }
474
475
476 static int
477 slave_key_store (struct Plugin *plugin,
478                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
479 {
480   struct GNUNET_PQ_QueryParam params[] = {
481     GNUNET_PQ_query_param_auto_from_type (slave_key),
482     GNUNET_PQ_query_param_end
483   };
484
485   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
486       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params))
487     return GNUNET_SYSERR;
488
489   return GNUNET_OK;
490 }
491
492
493 /**
494  * Store join/leave events for a PSYC channel in order to be able to answer
495  * membership test queries later.
496  *
497  * @see GNUNET_PSYCSTORE_membership_store()
498  *
499  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
500  */
501 static int
502 postgres_membership_store (void *cls,
503                            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
504                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
505                            int did_join,
506                            uint64_t announced_at,
507                            uint64_t effective_since,
508                            uint64_t group_generation)
509 {
510   struct Plugin *plugin = cls;
511
512   uint32_t idid_join = (uint32_t)did_join;
513
514   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
515
516   if (announced_at > INT64_MAX ||
517       effective_since > INT64_MAX ||
518       group_generation > INT64_MAX)
519   {
520     GNUNET_break (0);
521     return GNUNET_SYSERR;
522   }
523
524   if (GNUNET_OK != channel_key_store (plugin, channel_key)
525       || GNUNET_OK != slave_key_store (plugin, slave_key))
526     return GNUNET_SYSERR;
527
528   struct GNUNET_PQ_QueryParam params[] = {
529     GNUNET_PQ_query_param_auto_from_type (channel_key),
530     GNUNET_PQ_query_param_auto_from_type (slave_key),
531     GNUNET_PQ_query_param_uint32 (&idid_join),
532     GNUNET_PQ_query_param_uint64 (&announced_at),
533     GNUNET_PQ_query_param_uint64 (&effective_since),
534     GNUNET_PQ_query_param_uint64 (&group_generation),
535     GNUNET_PQ_query_param_end
536   };
537
538   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
539       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_membership", params))
540     return GNUNET_SYSERR;
541
542   return GNUNET_OK;
543 }
544
545 /**
546  * Test if a member was admitted to the channel at the given message ID.
547  *
548  * @see GNUNET_PSYCSTORE_membership_test()
549  *
550  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
551  *         #GNUNET_SYSERR if there was en error.
552  */
553 static int
554 membership_test (void *cls,
555                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
556                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
557                  uint64_t message_id)
558 {
559   PGresult *res;
560   struct Plugin *plugin = cls;
561
562   uint32_t did_join = 0;
563
564   int ret = GNUNET_SYSERR;
565
566   struct GNUNET_PQ_QueryParam params_select[] = {
567     GNUNET_PQ_query_param_auto_from_type (channel_key),
568     GNUNET_PQ_query_param_auto_from_type (slave_key),
569     GNUNET_PQ_query_param_uint64 (&message_id),
570     GNUNET_PQ_query_param_end
571   };
572
573   res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select);
574   if (GNUNET_OK !=
575       GNUNET_POSTGRES_check_result (plugin->dbh,
576                                     res,
577                                     PGRES_TUPLES_OK,
578                                     "PQexecPrepared", "select_membership"))
579   {
580     return GNUNET_SYSERR;
581   }
582
583   struct GNUNET_PQ_ResultSpec results_select[] = {
584     GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
585     GNUNET_PQ_result_spec_end
586   };
587
588   switch (GNUNET_PQ_extract_result (res, results_select, 0))
589   {
590     case GNUNET_OK:
591       ret = GNUNET_YES;
592       break;
593
594     default:
595       ret = GNUNET_NO;
596       break;
597   }
598
599   PQclear (res);
600
601   return ret;
602 }
603
604 /**
605  * Store a message fragment sent to a channel.
606  *
607  * @see GNUNET_PSYCSTORE_fragment_store()
608  *
609  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
610  */
611 static int
612 fragment_store (void *cls,
613                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
614                 const struct GNUNET_MULTICAST_MessageHeader *msg,
615                 uint32_t psycstore_flags)
616 {
617   struct Plugin *plugin = cls;
618
619   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
620
621   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
622
623   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
624   uint64_t message_id = GNUNET_ntohll (msg->message_id);
625   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
626
627   uint32_t hop_counter = ntohl(msg->hop_counter);
628   uint32_t flags = ntohl(msg->flags);
629
630   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
631       message_id > INT64_MAX || group_generation > INT64_MAX)
632   {
633     LOG(GNUNET_ERROR_TYPE_ERROR,
634          "Tried to store fragment with a field > INT64_MAX: "
635          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
636          message_id, group_generation);
637     GNUNET_break (0);
638     return GNUNET_SYSERR;
639   }
640
641   if (GNUNET_OK != channel_key_store (plugin, channel_key))
642     return GNUNET_SYSERR;
643
644   struct GNUNET_PQ_QueryParam params_insert[] = {
645     GNUNET_PQ_query_param_auto_from_type (channel_key),
646     GNUNET_PQ_query_param_uint32 (&hop_counter),
647     GNUNET_PQ_query_param_auto_from_type (&msg->signature),
648     GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
649     GNUNET_PQ_query_param_uint64 (&fragment_id),
650     GNUNET_PQ_query_param_uint64 (&fragment_offset),
651     GNUNET_PQ_query_param_uint64 (&message_id),
652     GNUNET_PQ_query_param_uint64 (&group_generation),
653     GNUNET_PQ_query_param_uint32 (&flags),
654     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
655     GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
656     GNUNET_PQ_query_param_end
657   };
658
659   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
660       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert))
661     return GNUNET_SYSERR;
662
663   return GNUNET_OK;
664 }
665
666 /**
667  * Set additional flags for a given message.
668  *
669  * They are OR'd with any existing flags set.
670  *
671  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
672  */
673 static int
674 message_add_flags (void *cls,
675                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
676                    uint64_t message_id,
677                    uint32_t psycstore_flags)
678 {
679   struct Plugin *plugin = cls;
680
681   struct GNUNET_PQ_QueryParam params_update[] = {
682     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
683     GNUNET_PQ_query_param_auto_from_type (channel_key),
684     GNUNET_PQ_query_param_uint64 (&message_id),
685     GNUNET_PQ_query_param_end
686   };
687
688   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
689       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update))
690     return GNUNET_SYSERR;
691
692   return GNUNET_OK;
693 }
694
695
696 static int
697 fragment_row (struct Plugin *plugin,
698               const char *stmt,
699               PGresult *res,
700               GNUNET_PSYCSTORE_FragmentCallback cb,
701               void *cb_cls,
702               uint64_t *returned_fragments)
703 {
704   uint32_t hop_counter;
705   void *signature = NULL;
706   void *purpose = NULL;
707   size_t signature_size;
708   size_t purpose_size;
709   uint64_t fragment_id;
710   uint64_t fragment_offset;
711   uint64_t message_id;
712   uint64_t group_generation;
713   uint32_t flags;
714   void *buf;
715   size_t buf_size;
716   int ret = GNUNET_SYSERR;
717   struct GNUNET_MULTICAST_MessageHeader *mp;
718   uint32_t msg_flags;
719   struct GNUNET_PQ_ResultSpec results[] = {
720     GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
721     GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
722     GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
723     GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
724     GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
725     GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
726     GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
727     GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
728     GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
729     GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
730     GNUNET_PQ_result_spec_end
731   };
732
733   if (GNUNET_OK !=
734       GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
735                                     "PQexecPrepared",
736                                     stmt))
737   {
738     LOG (GNUNET_ERROR_TYPE_DEBUG,
739          "Failing fragment lookup (postgres error)\n");
740     return GNUNET_SYSERR;
741   }
742
743   int nrows = PQntuples (res);
744   for (int row = 0; row < nrows; row++)
745   {
746     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
747     {
748       break;
749     }
750
751     mp = GNUNET_malloc (sizeof (*mp) + buf_size);
752
753     mp->header.size = htons (sizeof (*mp) + buf_size);
754     mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
755     mp->hop_counter = htonl (hop_counter);
756     GNUNET_memcpy (&mp->signature,
757                    signature,
758                    signature_size);
759     GNUNET_memcpy (&mp->purpose,
760                    purpose,
761                    purpose_size);
762     mp->fragment_id = GNUNET_htonll (fragment_id);
763     mp->fragment_offset = GNUNET_htonll (fragment_offset);
764     mp->message_id = GNUNET_htonll (message_id);
765     mp->group_generation = GNUNET_htonll (group_generation);
766     mp->flags = htonl(msg_flags);
767
768     GNUNET_memcpy (&mp[1],
769                    buf,
770                    buf_size);
771     GNUNET_PQ_cleanup_result(results);
772     ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
773     if (NULL != returned_fragments)
774       (*returned_fragments)++;
775   }
776
777   return ret;
778 }
779
780
781 static int
782 fragment_select (struct Plugin *plugin,
783                  const char *stmt,
784                  struct GNUNET_PQ_QueryParam *params,
785                  uint64_t *returned_fragments,
786                  GNUNET_PSYCSTORE_FragmentCallback cb,
787                  void *cb_cls)
788 {
789   PGresult *res;
790   int ret = GNUNET_SYSERR;
791
792   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
793   if (GNUNET_YES ==
794       GNUNET_POSTGRES_check_result (plugin->dbh,
795                                     res,
796                                     PGRES_TUPLES_OK,
797                                     "PQexecPrepared", stmt))
798   {
799     if (PQntuples (res) == 0)
800       ret = GNUNET_NO;
801     else
802     {
803       ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
804     }
805     PQclear (res);
806   }
807
808   return ret;
809 }
810
811 /**
812  * Retrieve a message fragment range by fragment ID.
813  *
814  * @see GNUNET_PSYCSTORE_fragment_get()
815  *
816  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
817  */
818 static int
819 fragment_get (void *cls,
820               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
821               uint64_t first_fragment_id,
822               uint64_t last_fragment_id,
823               uint64_t *returned_fragments,
824               GNUNET_PSYCSTORE_FragmentCallback cb,
825               void *cb_cls)
826 {
827   struct Plugin *plugin = cls;
828   struct GNUNET_PQ_QueryParam params_select[] = {
829     GNUNET_PQ_query_param_auto_from_type (channel_key),
830     GNUNET_PQ_query_param_uint64 (&first_fragment_id),
831     GNUNET_PQ_query_param_uint64 (&last_fragment_id),
832     GNUNET_PQ_query_param_end
833   };
834
835   *returned_fragments = 0;
836   return fragment_select (plugin,
837                           "select_fragments",
838                           params_select,
839                           returned_fragments,
840                           cb, cb_cls);
841 }
842
843
844 /**
845  * Retrieve a message fragment range by fragment ID.
846  *
847  * @see GNUNET_PSYCSTORE_fragment_get_latest()
848  *
849  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
850  */
851 static int
852 fragment_get_latest (void *cls,
853                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
854                      uint64_t fragment_limit,
855                      uint64_t *returned_fragments,
856                      GNUNET_PSYCSTORE_FragmentCallback cb,
857                      void *cb_cls)
858 {
859   struct Plugin *plugin = cls;
860
861   *returned_fragments = 0;
862
863   struct GNUNET_PQ_QueryParam params_select[] = {
864     GNUNET_PQ_query_param_auto_from_type (channel_key),
865     GNUNET_PQ_query_param_uint64 (&fragment_limit),
866     GNUNET_PQ_query_param_end
867   };
868
869   return fragment_select (plugin,
870                           "select_latest_fragments",
871                           params_select,
872                           returned_fragments,
873                           cb, cb_cls);
874 }
875
876
877 /**
878  * Retrieve all fragments of a message ID range.
879  *
880  * @see GNUNET_PSYCSTORE_message_get()
881  *
882  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
883  */
884 static int
885 message_get (void *cls,
886              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
887              uint64_t first_message_id,
888              uint64_t last_message_id,
889              uint64_t fragment_limit,
890              uint64_t *returned_fragments,
891              GNUNET_PSYCSTORE_FragmentCallback cb,
892              void *cb_cls)
893 {
894   struct Plugin *plugin = cls;
895   struct GNUNET_PQ_QueryParam params_select[] = {
896     GNUNET_PQ_query_param_auto_from_type (channel_key),
897     GNUNET_PQ_query_param_uint64 (&first_message_id),
898     GNUNET_PQ_query_param_uint64 (&last_message_id),
899     GNUNET_PQ_query_param_uint64 (&fragment_limit),
900     GNUNET_PQ_query_param_end
901   };
902
903   if (0 == fragment_limit)
904     fragment_limit = INT64_MAX;
905   *returned_fragments = 0;
906   return fragment_select (plugin,
907                           "select_messages",
908                           params_select,
909                           returned_fragments,
910                           cb, cb_cls);
911 }
912
913
914 /**
915  * Retrieve all fragments of the latest messages.
916  *
917  * @see GNUNET_PSYCSTORE_message_get_latest()
918  *
919  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
920  */
921 static int
922 message_get_latest (void *cls,
923                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
924                     uint64_t message_limit,
925                     uint64_t *returned_fragments,
926                     GNUNET_PSYCSTORE_FragmentCallback cb,
927                     void *cb_cls)
928 {
929   struct Plugin *plugin = cls;
930   struct GNUNET_PQ_QueryParam params_select[] = {
931     GNUNET_PQ_query_param_auto_from_type (channel_key),
932     GNUNET_PQ_query_param_auto_from_type (channel_key),
933     GNUNET_PQ_query_param_uint64 (&message_limit),
934     GNUNET_PQ_query_param_end
935   };
936
937   *returned_fragments = 0;
938   return fragment_select (plugin,
939                           "select_latest_messages",
940                           params_select,
941                           returned_fragments,
942                           cb, cb_cls);
943 }
944
945
946 /**
947  * Retrieve a fragment of message specified by its message ID and fragment
948  * offset.
949  *
950  * @see GNUNET_PSYCSTORE_message_get_fragment()
951  *
952  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
953  */
954 static int
955 message_get_fragment (void *cls,
956                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
957                       uint64_t message_id,
958                       uint64_t fragment_offset,
959                       GNUNET_PSYCSTORE_FragmentCallback cb,
960                       void *cb_cls)
961 {
962   PGresult *res;
963   struct Plugin *plugin = cls;
964   int ret = GNUNET_SYSERR;
965   const char *stmt = "select_message_fragment";
966
967   struct GNUNET_PQ_QueryParam params_select[] = {
968     GNUNET_PQ_query_param_auto_from_type (channel_key),
969     GNUNET_PQ_query_param_uint64 (&message_id),
970     GNUNET_PQ_query_param_uint64 (&fragment_offset),
971     GNUNET_PQ_query_param_end
972   };
973
974   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
975   if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh,
976                                                  res,
977                                                  PGRES_TUPLES_OK,
978                                                  "PQexecPrepared", stmt))
979   {
980     if (PQntuples (res) == 0)
981       ret = GNUNET_NO;
982     else
983       ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
984
985     PQclear (res);
986   }
987
988   return ret;
989 }
990
991 /**
992  * Retrieve the max. values of message counters for a channel.
993  *
994  * @see GNUNET_PSYCSTORE_counters_get()
995  *
996  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
997  */
998 static int
999 counters_message_get (void *cls,
1000                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1001                       uint64_t *max_fragment_id,
1002                       uint64_t *max_message_id,
1003                       uint64_t *max_group_generation)
1004 {
1005   PGresult *res;
1006   struct Plugin *plugin = cls;
1007
1008   const char *stmt = "select_counters_message";
1009
1010   struct GNUNET_PQ_QueryParam params_select[] = {
1011     GNUNET_PQ_query_param_auto_from_type (channel_key),
1012     GNUNET_PQ_query_param_end
1013   };
1014
1015   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1016   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1017                                                  res,
1018                                                  PGRES_TUPLES_OK,
1019                                                  "PQexecPrepared", stmt))
1020   {
1021     return GNUNET_SYSERR;
1022   }
1023
1024   struct GNUNET_PQ_ResultSpec results_select[] = {
1025     GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
1026     GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
1027     GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
1028     GNUNET_PQ_result_spec_end
1029   };
1030
1031   if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0))
1032   {
1033     PQclear (res);
1034     return GNUNET_SYSERR;
1035   }
1036
1037   GNUNET_PQ_cleanup_result(results_select);
1038   PQclear (res);
1039
1040   return GNUNET_OK;
1041 }
1042
1043 /**
1044  * Retrieve the max. values of state counters for a channel.
1045  *
1046  * @see GNUNET_PSYCSTORE_counters_get()
1047  *
1048  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1049  */
1050 static int
1051 counters_state_get (void *cls,
1052                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1053                     uint64_t *max_state_message_id)
1054 {
1055   PGresult *res;
1056   struct Plugin *plugin = cls;
1057
1058   const char *stmt = "select_counters_state";
1059
1060   int ret = GNUNET_SYSERR;
1061
1062   struct GNUNET_PQ_QueryParam params_select[] = {
1063     GNUNET_PQ_query_param_auto_from_type (channel_key),
1064     GNUNET_PQ_query_param_end
1065   };
1066
1067   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1068   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1069                                                  res,
1070                                                  PGRES_TUPLES_OK,
1071                                                  "PQexecPrepared", stmt))
1072   {
1073     return GNUNET_SYSERR;
1074   }
1075
1076   struct GNUNET_PQ_ResultSpec results_select[] = {
1077     GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1078     GNUNET_PQ_result_spec_end
1079   };
1080
1081   ret = GNUNET_PQ_extract_result (res, results_select, 0);
1082
1083   if (GNUNET_OK != ret)
1084   {
1085     PQclear (res);
1086     return GNUNET_SYSERR;
1087   }
1088
1089   GNUNET_PQ_cleanup_result(results_select);
1090   PQclear (res);
1091
1092   return ret;
1093 }
1094
1095
1096 /**
1097  * Assign a value to a state variable.
1098  *
1099  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1100  */
1101 static int
1102 state_assign (struct Plugin *plugin, const char *stmt,
1103               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1104               const char *name, const void *value, size_t value_size)
1105 {
1106   struct GNUNET_PQ_QueryParam params[] = {
1107     GNUNET_PQ_query_param_auto_from_type (channel_key),
1108     GNUNET_PQ_query_param_string (name),
1109     GNUNET_PQ_query_param_fixed_size (value, value_size),
1110     GNUNET_PQ_query_param_end
1111   };
1112
1113   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
1114       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1115     return GNUNET_SYSERR;
1116
1117   return GNUNET_OK;
1118 }
1119
1120
1121 static int
1122 update_message_id (struct Plugin *plugin,
1123                    const char *stmt,
1124                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1125                    uint64_t message_id)
1126 {
1127   struct GNUNET_PQ_QueryParam params[] = {
1128     GNUNET_PQ_query_param_uint64 (&message_id),
1129     GNUNET_PQ_query_param_auto_from_type (channel_key),
1130     GNUNET_PQ_query_param_end
1131   };
1132
1133   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
1134       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1135     return GNUNET_SYSERR;
1136
1137   return GNUNET_OK;
1138 }
1139
1140
1141 /**
1142  * Begin modifying current state.
1143  */
1144 static int
1145 state_modify_begin (void *cls,
1146                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1147                     uint64_t message_id, uint64_t state_delta)
1148 {
1149   struct Plugin *plugin = cls;
1150
1151   if (state_delta > 0)
1152   {
1153     /**
1154      * We can only apply state modifiers in the current message if modifiers in
1155      * the previous stateful message (message_id - state_delta) were already
1156      * applied.
1157      */
1158
1159     uint64_t max_state_message_id = 0;
1160     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1161     switch (ret)
1162     {
1163     case GNUNET_OK:
1164     case GNUNET_NO: // no state yet
1165       ret = GNUNET_OK;
1166       break;
1167
1168     default:
1169       return ret;
1170     }
1171
1172     if (max_state_message_id < message_id - state_delta)
1173       return GNUNET_NO; /* some stateful messages not yet applied */
1174     else if (message_id - state_delta < max_state_message_id)
1175       return GNUNET_NO; /* changes already applied */
1176   }
1177
1178   if (TRANSACTION_NONE != plugin->transaction)
1179   {
1180     /** @todo FIXME: wait for other transaction to finish  */
1181     return GNUNET_SYSERR;
1182   }
1183   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1184 }
1185
1186
1187 /**
1188  * Set the current value of state variable.
1189  *
1190  * @see GNUNET_PSYCSTORE_state_modify()
1191  *
1192  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1193  */
1194 static int
1195 state_modify_op (void *cls,
1196                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1197                  enum GNUNET_PSYC_Operator op,
1198                  const char *name, const void *value, size_t value_size)
1199 {
1200   struct Plugin *plugin = cls;
1201   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1202
1203   switch (op)
1204   {
1205   case GNUNET_PSYC_OP_ASSIGN:
1206     return state_assign (plugin, "insert_state_current",
1207                          channel_key, name, value, value_size);
1208
1209   default: /** @todo implement more state operations */
1210     GNUNET_break (0);
1211     return GNUNET_SYSERR;
1212   }
1213 }
1214
1215
1216 /**
1217  * End modifying current state.
1218  */
1219 static int
1220 state_modify_end (void *cls,
1221                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222                   uint64_t message_id)
1223 {
1224   struct Plugin *plugin = cls;
1225   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1226
1227   return
1228     GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1229     && GNUNET_OK == update_message_id (plugin,
1230                                        "update_max_state_message_id",
1231                                        channel_key, message_id)
1232     && GNUNET_OK == transaction_commit (plugin)
1233     ? GNUNET_OK : GNUNET_SYSERR;
1234 }
1235
1236
1237 /**
1238  * Begin state synchronization.
1239  */
1240 static int
1241 state_sync_begin (void *cls,
1242                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1243 {
1244   struct Plugin *plugin = cls;
1245   return exec_channel (plugin, "delete_state_sync", channel_key);
1246 }
1247
1248
1249 /**
1250  * Assign current value of a state variable.
1251  *
1252  * @see GNUNET_PSYCSTORE_state_modify()
1253  *
1254  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1255  */
1256 static int
1257 state_sync_assign (void *cls,
1258                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1259                 const char *name, const void *value, size_t value_size)
1260 {
1261   struct Plugin *plugin = cls;
1262   return state_assign (plugin, "insert_state_sync",
1263                        channel_key, name, value, value_size);
1264 }
1265
1266
1267 /**
1268  * End modifying current state.
1269  */
1270 static int
1271 state_sync_end (void *cls,
1272                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1273                 uint64_t max_state_message_id,
1274                 uint64_t state_hash_message_id)
1275 {
1276   struct Plugin *plugin = cls;
1277   int ret = GNUNET_SYSERR;
1278
1279   if (TRANSACTION_NONE != plugin->transaction)
1280   {
1281     /** @todo FIXME: wait for other transaction to finish  */
1282     return GNUNET_SYSERR;
1283   }
1284
1285   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1286     && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1287     && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1288                                   channel_key)
1289     && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1290                                   channel_key)
1291     && GNUNET_OK == update_message_id (plugin,
1292                                        "update_state_hash_message_id",
1293                                        channel_key, state_hash_message_id)
1294     && GNUNET_OK == update_message_id (plugin,
1295                                        "update_max_state_message_id",
1296                                        channel_key, max_state_message_id)
1297     && GNUNET_OK == transaction_commit (plugin)
1298     ? ret = GNUNET_OK
1299     : transaction_rollback (plugin);
1300   return ret;
1301 }
1302
1303
1304 /**
1305  * Delete the whole state.
1306  *
1307  * @see GNUNET_PSYCSTORE_state_reset()
1308  *
1309  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1310  */
1311 static int
1312 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1313 {
1314   struct Plugin *plugin = cls;
1315   return exec_channel (plugin, "delete_state", channel_key);
1316 }
1317
1318
1319 /**
1320  * Update signed values of state variables in the state store.
1321  *
1322  * @see GNUNET_PSYCSTORE_state_hash_update()
1323  *
1324  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1325  */
1326 static int
1327 state_update_signed (void *cls,
1328                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1329 {
1330   struct Plugin *plugin = cls;
1331   return exec_channel (plugin, "update_state_signed", channel_key);
1332 }
1333
1334
1335 /**
1336  * Retrieve a state variable by name.
1337  *
1338  * @see GNUNET_PSYCSTORE_state_get()
1339  *
1340  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1341  */
1342 static int
1343 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1344            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1345 {
1346   PGresult *res;
1347
1348   struct Plugin *plugin = cls;
1349   int ret = GNUNET_SYSERR;
1350
1351   const char *stmt = "select_state_one";
1352
1353   struct GNUNET_PQ_QueryParam params_select[] = {
1354     GNUNET_PQ_query_param_auto_from_type (channel_key),
1355     GNUNET_PQ_query_param_string (name),
1356     GNUNET_PQ_query_param_end
1357   };
1358
1359   void *value_current = NULL;
1360   size_t value_size = 0;
1361
1362   struct GNUNET_PQ_ResultSpec results[] = {
1363     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1364     GNUNET_PQ_result_spec_end
1365   };
1366
1367   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1368   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1369                                                  res,
1370                                                  PGRES_TUPLES_OK,
1371                                                  "PQexecPrepared", stmt))
1372   {
1373     return GNUNET_SYSERR;
1374   }
1375
1376   if (PQntuples (res) == 0)
1377   {
1378     PQclear (res);
1379     ret = GNUNET_NO;
1380   }
1381
1382   ret = GNUNET_PQ_extract_result (res, results, 0);
1383
1384   if (GNUNET_OK != ret)
1385   {
1386     PQclear (res);
1387     return GNUNET_SYSERR;
1388   }
1389
1390   ret = cb (cb_cls, name, value_current,
1391             value_size);
1392
1393   GNUNET_PQ_cleanup_result(results);
1394   PQclear (res);
1395
1396   return ret;
1397 }
1398
1399
1400 /**
1401  * Retrieve all state variables for a channel with the given prefix.
1402  *
1403  * @see GNUNET_PSYCSTORE_state_get_prefix()
1404  *
1405  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1406  */
1407 static int
1408 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1409                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1410                   void *cb_cls)
1411 {
1412   PGresult *res;
1413   struct Plugin *plugin = cls;
1414   int ret = GNUNET_NO;
1415
1416   const char *stmt = "select_state_prefix";
1417
1418   uint32_t name_len = (uint32_t) strlen (name);
1419
1420   struct GNUNET_PQ_QueryParam params_select[] = {
1421     GNUNET_PQ_query_param_auto_from_type (channel_key),
1422     GNUNET_PQ_query_param_string (name),
1423     GNUNET_PQ_query_param_uint32 (&name_len),
1424     GNUNET_PQ_query_param_string (name),
1425     GNUNET_PQ_query_param_end
1426   };
1427
1428   char *name2 = "";
1429   void *value_current = NULL;
1430   size_t value_size = 0;
1431
1432   struct GNUNET_PQ_ResultSpec results[] = {
1433     GNUNET_PQ_result_spec_string ("name", &name2),
1434     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1435     GNUNET_PQ_result_spec_end
1436   };
1437
1438   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1439   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1440                                                  res,
1441                                                  PGRES_TUPLES_OK,
1442                                                  "PQexecPrepared", stmt))
1443   {
1444     return GNUNET_SYSERR;
1445   }
1446
1447   int nrows = PQntuples (res);
1448   for (int row = 0; row < nrows; row++)
1449   {
1450     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1451     {
1452       break;
1453     }
1454
1455     ret = cb (cb_cls, (const char *) name2,
1456               value_current,
1457               value_size);
1458     GNUNET_PQ_cleanup_result(results);
1459   }
1460
1461   PQclear (res);
1462
1463   return ret;
1464 }
1465
1466
1467 /**
1468  * Retrieve all signed state variables for a channel.
1469  *
1470  * @see GNUNET_PSYCSTORE_state_get_signed()
1471  *
1472  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1473  */
1474 static int
1475 state_get_signed (void *cls,
1476                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1477                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1478 {
1479   PGresult *res;
1480   struct Plugin *plugin = cls;
1481   int ret = GNUNET_NO;
1482
1483   const char *stmt = "select_state_signed";
1484
1485   struct GNUNET_PQ_QueryParam params_select[] = {
1486     GNUNET_PQ_query_param_auto_from_type (channel_key),
1487     GNUNET_PQ_query_param_end
1488   };
1489
1490   char *name = "";
1491   void *value_signed = NULL;
1492   size_t value_size = 0;
1493
1494   struct GNUNET_PQ_ResultSpec results[] = {
1495     GNUNET_PQ_result_spec_string ("name", &name),
1496     GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size),
1497     GNUNET_PQ_result_spec_end
1498   };
1499
1500   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1501   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1502                                                  res,
1503                                                  PGRES_TUPLES_OK,
1504                                                  "PQexecPrepared", stmt))
1505   {
1506     return GNUNET_SYSERR;
1507   }
1508
1509   int nrows = PQntuples (res);
1510   for (int row = 0; row < nrows; row++)
1511   {
1512     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1513     {
1514       break;
1515     }
1516
1517     ret = cb (cb_cls, (const char *) name,
1518               value_signed,
1519               value_size);
1520
1521     GNUNET_PQ_cleanup_result (results);
1522   }
1523
1524   PQclear (res);
1525
1526   return ret;
1527 }
1528
1529
1530 /**
1531  * Entry point for the plugin.
1532  *
1533  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1534  * @return NULL on error, otherwise the plugin context
1535  */
1536 void *
1537 libgnunet_plugin_psycstore_postgres_init (void *cls)
1538 {
1539   static struct Plugin plugin;
1540   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1541   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1542
1543   if (NULL != plugin.cfg)
1544     return NULL;                /* can only initialize once! */
1545   memset (&plugin, 0, sizeof (struct Plugin));
1546   plugin.cfg = cfg;
1547   if (GNUNET_OK != database_setup (&plugin))
1548   {
1549     database_shutdown (&plugin);
1550     return NULL;
1551   }
1552   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1553   api->cls = &plugin;
1554   api->membership_store = &postgres_membership_store;
1555   api->membership_test = &membership_test;
1556   api->fragment_store = &fragment_store;
1557   api->message_add_flags = &message_add_flags;
1558   api->fragment_get = &fragment_get;
1559   api->fragment_get_latest = &fragment_get_latest;
1560   api->message_get = &message_get;
1561   api->message_get_latest = &message_get_latest;
1562   api->message_get_fragment = &message_get_fragment;
1563   api->counters_message_get = &counters_message_get;
1564   api->counters_state_get = &counters_state_get;
1565   api->state_modify_begin = &state_modify_begin;
1566   api->state_modify_op = &state_modify_op;
1567   api->state_modify_end = &state_modify_end;
1568   api->state_sync_begin = &state_sync_begin;
1569   api->state_sync_assign = &state_sync_assign;
1570   api->state_sync_end = &state_sync_end;
1571   api->state_reset = &state_reset;
1572   api->state_update_signed = &state_update_signed;
1573   api->state_get = &state_get;
1574   api->state_get_prefix = &state_get_prefix;
1575   api->state_get_signed = &state_get_signed;
1576
1577   LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1578   return api;
1579 }
1580
1581
1582 /**
1583  * Exit point from the plugin.
1584  *
1585  * @param cls The plugin context (as returned by "init")
1586  * @return Always NULL
1587  */
1588 void *
1589 libgnunet_plugin_psycstore_postgres_done (void *cls)
1590 {
1591   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1592   struct Plugin *plugin = api->cls;
1593
1594   database_shutdown (plugin);
1595   plugin->cfg = NULL;
1596   GNUNET_free (api);
1597   LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1598   return NULL;
1599 }
1600
1601 /* end of plugin_psycstore_postgres.c */