- doc
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_postgres.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2016 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_postgres.c
23  * @brief PostgresQL-based psycstore backend
24  * @author Daniel Golle
25  * @author Gabor X Toth
26  * @author Christian Grothoff
27  * @author Christophe Genevey
28  */
29
30 #include "platform.h"
31 #include "gnunet_psycstore_plugin.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "psycstore.h"
37 #include "gnunet_postgres_lib.h"
38 #include "gnunet_pq_lib.h"
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
55
56 enum Transactions {
57   TRANSACTION_NONE = 0,
58   TRANSACTION_STATE_MODIFY,
59   TRANSACTION_STATE_SYNC,
60 };
61
62 /**
63  * Context for all functions in this plugin.
64  */
65 struct Plugin
66 {
67
68   const struct GNUNET_CONFIGURATION_Handle *cfg;
69
70   /**
71    * Native Postgres database handle.
72    */
73   PGconn *dbh;
74
75   enum Transactions transaction;
76
77   void *cls;
78 };
79
80
81 /**
82  * Initialize the database connections and associated
83  * data structures (create tables and indices
84  * as needed as well).
85  *
86  * @param plugin the plugin context (state for this module)
87  * @return GNUNET_OK on success
88  */
89 static int
90 database_setup (struct Plugin *plugin)
91 {
92   /* Open database and precompile statements */
93   plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg,
94                                          "psycstore-postgres");
95   if (NULL == plugin->dbh)
96     return GNUNET_SYSERR;
97
98   /* Create tables */
99   if ((GNUNET_OK !=
100          GNUNET_POSTGRES_exec(plugin->dbh,
101                               "CREATE TABLE IF NOT EXISTS channels (\n"
102                               " id SERIAL,\n"
103                               " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
104                               " max_state_message_id BIGINT,\n"
105                               " state_hash_message_id BIGINT,\n"
106                               " PRIMARY KEY(id)\n"
107                               ")" "WITH OIDS")) ||
108
109       (GNUNET_OK !=
110          GNUNET_POSTGRES_exec(plugin->dbh,
111                               "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
112                               " ON channels (pub_key)")) ||
113
114       (GNUNET_OK !=
115          GNUNET_POSTGRES_exec(plugin->dbh,
116                               "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
117                               " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
118                               "RETURNS NULL ON NULL INPUT")) ||
119
120       (GNUNET_OK !=
121          GNUNET_POSTGRES_exec(plugin->dbh,
122                               "CREATE TABLE IF NOT EXISTS slaves (\n"
123                               " id SERIAL,\n"
124                               " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
125                               " PRIMARY KEY(id)\n"
126                               ")" "WITH OIDS")) ||
127
128       (GNUNET_OK !=
129          GNUNET_POSTGRES_exec(plugin->dbh,
130                               "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
131                               " ON slaves (pub_key)")) ||
132
133       (GNUNET_OK !=
134          GNUNET_POSTGRES_exec(plugin->dbh,
135                               "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
136                               " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
137                               "RETURNS NULL ON NULL INPUT")) ||
138
139       (GNUNET_OK !=
140          GNUNET_POSTGRES_exec(plugin->dbh,
141                               "CREATE TABLE IF NOT EXISTS membership (\n"
142                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
143                               "  slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
144                               "  did_join INT NOT NULL,\n"
145                               "  announced_at BIGINT NOT NULL,\n"
146                               "  effective_since BIGINT NOT NULL,\n"
147                               "  group_generation BIGINT NOT NULL\n"
148                               ")" "WITH OIDS")) ||
149
150       (GNUNET_OK !=
151          GNUNET_POSTGRES_exec(plugin->dbh,
152                               "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
153                               "ON membership (channel_id, slave_id)")) ||
154
155   /** @todo messages table: add method_name column */
156       (GNUNET_OK !=
157          GNUNET_POSTGRES_exec(plugin->dbh,
158                               "CREATE TABLE IF NOT EXISTS messages (\n"
159                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
160                               "  hop_counter INT NOT NULL,\n"
161                               "  signature BYTEA CHECK (LENGTH(signature)=64),\n"
162                               "  purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
163                               "  fragment_id BIGINT NOT NULL,\n"
164                               "  fragment_offset BIGINT NOT NULL,\n"
165                               "  message_id BIGINT NOT NULL,\n"
166                               "  group_generation BIGINT NOT NULL,\n"
167                               "  multicast_flags INT NOT NULL,\n"
168                               "  psycstore_flags INT NOT NULL,\n"
169                               "  data BYTEA,\n"
170                               "  PRIMARY KEY (channel_id, fragment_id),\n"
171                               "  UNIQUE (channel_id, message_id, fragment_offset)\n"
172                               ")" "WITH OIDS")) ||
173
174       (GNUNET_OK !=
175          GNUNET_POSTGRES_exec(plugin->dbh,
176                               "CREATE TABLE IF NOT EXISTS state (\n"
177                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
178                               "  name TEXT NOT NULL,\n"
179                               "  value_current BYTEA,\n"
180                               "  value_signed BYTEA,\n"
181                               "  PRIMARY KEY (channel_id, name)\n"
182                               ")" "WITH OIDS")) ||
183       (GNUNET_OK !=
184          GNUNET_POSTGRES_exec(plugin->dbh,
185                               "CREATE TABLE IF NOT EXISTS state_sync (\n"
186                               "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
187                               "  name TEXT NOT NULL,\n"
188                               "  value BYTEA,\n"
189                               "  PRIMARY KEY (channel_id, name)\n"
190                               ")" "WITH OIDS")))
191   {
192     PQfinish (plugin->dbh);
193     plugin->dbh = NULL;
194     return GNUNET_SYSERR;
195   }
196
197
198   /* Prepare statements */
199   if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
200                            "transaction_begin",
201                            "BEGIN", 0)) ||
202
203       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
204                            "transaction_commit",
205                            "COMMIT", 0)) ||
206
207       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
208                            "transaction_rollback",
209                            "ROLLBACK", 0)) ||
210
211       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
212                            "insert_channel_key",
213                            "INSERT INTO channels (pub_key) VALUES ($1)"
214                            " ON CONFLICT DO NOTHING", 1)) ||
215
216       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
217                            "insert_slave_key",
218                            "INSERT INTO slaves (pub_key) VALUES ($1)"
219                            " ON CONFLICT DO NOTHING", 1)) ||
220
221       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
222                            "insert_membership",
223                            "INSERT INTO membership\n"
224                            " (channel_id, slave_id, did_join, announced_at,\n"
225                            "  effective_since, group_generation)\n"
226                            "VALUES (get_chan_id($1),\n"
227                            "        get_slave_id($2),\n"
228                            "        $3, $4, $5, $6)", 6)) ||
229
230       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
231                            "select_membership",
232                            "SELECT did_join FROM membership\n"
233                            "WHERE channel_id = get_chan_id($1)\n"
234                            "      AND slave_id = get_slave_id($2)\n"
235                            "      AND effective_since <= $3 AND did_join = 1\n"
236                            "ORDER BY announced_at DESC LIMIT 1", 3)) ||
237
238       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
239                            "insert_fragment",
240                            "INSERT INTO messages\n"
241                            " (channel_id, hop_counter, signature, purpose,\n"
242                            "  fragment_id, fragment_offset, message_id,\n"
243                            "  group_generation, multicast_flags, psycstore_flags, data)\n"
244                            "VALUES (get_chan_id($1),\n"
245                            "        $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
246                            "ON CONFLICT DO NOTHING", 11)) ||
247
248       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
249                            "update_message_flags",
250                            "UPDATE messages\n"
251                            "SET psycstore_flags = psycstore_flags | $1\n"
252                            "WHERE channel_id = get_chan_id($2) \n"
253                            "      AND message_id = $3 AND fragment_offset = 0", 3)) ||
254
255       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
256                            "select_fragments",
257                            "SELECT hop_counter, signature, purpose, fragment_id,\n"
258                            "       fragment_offset, message_id, group_generation,\n"
259                            "       multicast_flags, psycstore_flags, data\n"
260                            "FROM messages\n"
261                            "WHERE channel_id = get_chan_id($1) \n"
262                            "      AND $2 <= fragment_id AND fragment_id <= $3", 3)) ||
263
264       /** @todo select_messages: add method_prefix filter */
265       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
266                            "select_messages",
267                            "SELECT hop_counter, signature, purpose, fragment_id,\n"
268                            "       fragment_offset, message_id, group_generation,\n"
269                            "       multicast_flags, psycstore_flags, data\n"
270                            "FROM messages\n"
271                            "WHERE channel_id = get_chan_id($1) \n"
272                            "      AND $2 <= message_id AND message_id <= $3\n"
273                            "LIMIT $4;", 4)) ||
274
275       /** @todo select_latest_messages: add method_prefix filter */
276       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
277                            "select_latest_fragments",
278                            "SELECT  rev.hop_counter AS hop_counter,\n"
279                            "        rev.signature AS signature,\n"
280                            "        rev.purpose AS purpose,\n"
281                            "        rev.fragment_id AS fragment_id,\n"
282                            "        rev.fragment_offset AS fragment_offset,\n"
283                            "        rev.message_id AS message_id,\n"
284                            "        rev.group_generation AS group_generation,\n"
285                            "        rev.multicast_flags AS multicast_flags,\n"
286                            "        rev.psycstore_flags AS psycstore_flags,\n"
287                            "        rev.data AS data\n"
288                            " FROM\n"
289                            " (SELECT hop_counter, signature, purpose, fragment_id,\n"
290                            "        fragment_offset, message_id, group_generation,\n"
291                            "        multicast_flags, psycstore_flags, data \n"
292                            "  FROM messages\n"
293                            "  WHERE channel_id = get_chan_id($1) \n"
294                            "  ORDER BY fragment_id DESC\n"
295                            "  LIMIT $2) AS rev\n"
296                            " ORDER BY rev.fragment_id;", 2)) ||
297
298       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
299                            "select_latest_messages",
300                            "SELECT hop_counter, signature, purpose, fragment_id,\n"
301                            "       fragment_offset, message_id, group_generation,\n"
302                            "        multicast_flags, psycstore_flags, data\n"
303                            "FROM messages\n"
304                            "WHERE channel_id = get_chan_id($1)\n"
305                            "      AND message_id IN\n"
306                            "      (SELECT message_id\n"
307                            "       FROM messages\n"
308                            "       WHERE channel_id = get_chan_id($2) \n"
309                            "       GROUP BY message_id\n"
310                            "       ORDER BY message_id\n"
311                            "       DESC LIMIT $3)\n"
312                            "ORDER BY fragment_id", 3)) ||
313
314       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
315                            "select_message_fragment",
316                            "SELECT hop_counter, signature, purpose, fragment_id,\n"
317                            "       fragment_offset, message_id, group_generation,\n"
318                            "       multicast_flags, psycstore_flags, data\n"
319                            "FROM messages\n"
320                            "WHERE channel_id = get_chan_id($1) \n"
321                            "      AND message_id = $2 AND fragment_offset = $3", 3)) ||
322
323       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
324                            "select_counters_message",
325                            "SELECT fragment_id, message_id, group_generation\n"
326                            "FROM messages\n"
327                            "WHERE channel_id = get_chan_id($1)\n"
328                            "ORDER BY fragment_id DESC LIMIT 1", 1)) ||
329
330       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
331                            "select_counters_state",
332                            "SELECT max_state_message_id\n"
333                            "FROM channels\n"
334                            "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) ||
335
336       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
337                            "update_max_state_message_id",
338                            "UPDATE channels\n"
339                            "SET max_state_message_id = $1\n"
340                            "WHERE pub_key = $2", 2)) ||
341
342       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
343                            "update_state_hash_message_id",
344                            "UPDATE channels\n"
345                            "SET state_hash_message_id = $1\n"
346                            "WHERE pub_key = $2", 2)) ||
347
348       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
349                            "insert_state_current",
350                            "INSERT INTO state\n"
351                            "  (channel_id, name, value_current, value_signed)\n"
352                            "SELECT new.channel_id, new.name,\n"
353                            "       new.value_current, old.value_signed\n"
354                            "FROM (SELECT get_chan_id($1) AS channel_id,\n"
355                            "             $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
356                            "LEFT JOIN (SELECT channel_id, name, value_signed\n"
357                            "           FROM state) AS old\n"
358                            "ON new.channel_id = old.channel_id AND new.name = old.name\n"
359                            "ON CONFLICT (channel_id, name)\n"
360                            "   DO UPDATE SET value_current = EXCLUDED.value_current,\n"
361                            "                 value_signed = EXCLUDED.value_signed", 3)) ||
362
363       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
364                            "delete_state_empty",
365                            "DELETE FROM state\n"
366                            "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
367                            "      AND (value_current IS NULL OR length(value_current) = 0)\n"
368                            "      AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) ||
369
370       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
371                            "update_state_signed",
372                            "UPDATE state\n"
373                            "SET value_signed = value_current\n"
374                            "WHERE channel_id = get_chan_id($1) ", 1)) ||
375
376       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
377                            "delete_state",
378                            "DELETE FROM state\n"
379                            "WHERE channel_id = get_chan_id($1) ", 1)) ||
380
381       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
382                            "insert_state_sync",
383                            "INSERT INTO state_sync (channel_id, name, value)\n"
384                            "VALUES (get_chan_id($1), $2, $3)", 3)) ||
385
386       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
387                            "insert_state_from_sync",
388                            "INSERT INTO state\n"
389                            " (channel_id, name, value_current, value_signed)\n"
390                            "SELECT channel_id, name, value, value\n"
391                            "FROM state_sync\n"
392                            "WHERE channel_id = get_chan_id($1)", 1)) ||
393
394       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
395                            "delete_state_sync",
396                            "DELETE FROM state_sync\n"
397                            "WHERE channel_id = get_chan_id($1)", 1)) ||
398
399       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
400                            "select_state_one",
401                            "SELECT value_current\n"
402                            "FROM state\n"
403                            "WHERE channel_id = get_chan_id($1)\n"
404                            "      AND name = $2", 2)) ||
405
406       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
407                            "select_state_prefix",
408                            "SELECT name, value_current\n"
409                            "FROM state\n"
410                            "WHERE channel_id = get_chan_id($1)\n"
411                            "      AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) ||
412
413       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
414                            "select_state_signed",
415                            "SELECT name, value_signed\n"
416                            "FROM state\n"
417                            "WHERE channel_id = get_chan_id($1)\n"
418                            "      AND value_signed IS NOT NULL", 1)))
419   {
420     PQfinish (plugin->dbh);
421     plugin->dbh = NULL;
422     return GNUNET_SYSERR;
423   }
424
425   return GNUNET_OK;
426 }
427
428
429 /**
430  * Shutdown database connection and associate data
431  * structures.
432  * @param plugin the plugin context (state for this module)
433  */
434 static void
435 database_shutdown (struct Plugin *plugin)
436 {
437   PQfinish (plugin->dbh);
438   plugin->dbh = NULL;
439 }
440
441
442 /**
443  * Execute a prepared statement with a @a channel_key argument.
444  *
445  * @param plugin Plugin handle.
446  * @param stmt Statement to execute.
447  * @param channel_key Public key of the channel.
448  *
449  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
450  */
451 static int
452 exec_channel (struct Plugin *plugin, const char *stmt,
453               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
454 {
455   PGresult *ret;
456   struct GNUNET_PQ_QueryParam params[] = {
457     GNUNET_PQ_query_param_auto_from_type (channel_key),
458     GNUNET_PQ_query_param_end
459   };
460
461   ret = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
462   if (GNUNET_OK !=
463       GNUNET_POSTGRES_check_result (plugin->dbh,
464                                     ret,
465                                     PGRES_COMMAND_OK,
466                                     "PQexecPrepared", stmt))
467     return GNUNET_SYSERR;
468
469   PQclear (ret);
470
471   return GNUNET_OK;
472 }
473
474
475 /**
476  * Begin a transaction.
477  */
478 static int
479 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
480 {
481   PGresult *ret;
482   struct GNUNET_PQ_QueryParam params[] = {
483     GNUNET_PQ_query_param_end
484   };
485
486   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_begin", params);
487   if (GNUNET_OK !=
488       GNUNET_POSTGRES_check_result (plugin->dbh,
489                                     ret,
490                                     PGRES_COMMAND_OK,
491                                     "PQexecPrepared", "transaction_begin"))
492   {
493     return GNUNET_SYSERR;
494   }
495
496   plugin->transaction = transaction;
497   PQclear (ret);
498   return GNUNET_OK;
499 }
500
501
502 /**
503  * Commit current transaction.
504  */
505 static int
506 transaction_commit (struct Plugin *plugin)
507 {
508   PGresult *ret;
509
510   struct GNUNET_PQ_QueryParam params[] = {
511     GNUNET_PQ_query_param_end
512   };
513
514   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_commit", params);
515   if (GNUNET_OK !=
516       GNUNET_POSTGRES_check_result (plugin->dbh,
517                                     ret,
518                                     PGRES_COMMAND_OK,
519                                     "PQexecPrepared", "transaction_commit"))
520   {
521     return GNUNET_SYSERR;
522   }
523
524   PQclear (ret);
525   plugin->transaction = TRANSACTION_NONE;
526   return GNUNET_OK;
527 }
528
529
530 /**
531  * Roll back current transaction.
532  */
533 static int
534 transaction_rollback (struct Plugin *plugin)
535 {
536   PGresult *ret;
537
538   struct GNUNET_PQ_QueryParam params[] = {
539     GNUNET_PQ_query_param_end
540   };
541
542   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_rollback", params);
543   if (GNUNET_OK !=
544       GNUNET_POSTGRES_check_result (plugin->dbh,
545                                     ret,
546                                     PGRES_COMMAND_OK,
547                                     "PQexecPrepared", "transaction_rollback"))
548   {
549     return GNUNET_SYSERR;
550   }
551
552   PQclear (ret);
553   plugin->transaction = TRANSACTION_NONE;
554   return GNUNET_OK;
555 }
556
557
558 static int
559 channel_key_store (struct Plugin *plugin,
560                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
561 {
562   PGresult *ret;
563
564   struct GNUNET_PQ_QueryParam params[] = {
565     GNUNET_PQ_query_param_auto_from_type (channel_key),
566     GNUNET_PQ_query_param_end
567   };
568
569   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_channel_key", params);
570   if (GNUNET_OK !=
571       GNUNET_POSTGRES_check_result (plugin->dbh,
572                                     ret,
573                                     PGRES_COMMAND_OK,
574                                     "PQexecPrepared", "insert_channel_key"))
575   {
576     return GNUNET_SYSERR;
577   }
578
579   PQclear (ret);
580   return GNUNET_OK;
581 }
582
583
584 static int
585 slave_key_store (struct Plugin *plugin,
586                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
587 {
588   PGresult *ret;
589
590   struct GNUNET_PQ_QueryParam params[] = {
591     GNUNET_PQ_query_param_auto_from_type (slave_key),
592     GNUNET_PQ_query_param_end
593   };
594
595   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params);
596   if (GNUNET_OK !=
597       GNUNET_POSTGRES_check_result (plugin->dbh,
598                                     ret,
599                                     PGRES_COMMAND_OK,
600                                     "PQexecPrepared", "insert_slave_key"))
601   {
602     return GNUNET_SYSERR;
603   }
604
605   PQclear (ret);
606   return GNUNET_OK;
607 }
608
609
610 /**
611  * Store join/leave events for a PSYC channel in order to be able to answer
612  * membership test queries later.
613  *
614  * @see GNUNET_PSYCSTORE_membership_store()
615  *
616  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
617  */
618 static int
619 postgres_membership_store (void *cls,
620                            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
621                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
622                            int did_join,
623                            uint64_t announced_at,
624                            uint64_t effective_since,
625                            uint64_t group_generation)
626 {
627   PGresult *ret;
628   struct Plugin *plugin = cls;
629
630   uint32_t idid_join = (uint32_t)did_join;
631
632   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
633
634   if (announced_at > INT64_MAX ||
635       effective_since > INT64_MAX ||
636       group_generation > INT64_MAX)
637   {
638     GNUNET_break (0);
639     return GNUNET_SYSERR;
640   }
641
642   if (GNUNET_OK != channel_key_store (plugin, channel_key)
643       || GNUNET_OK != slave_key_store (plugin, slave_key))
644     return GNUNET_SYSERR;
645
646   struct GNUNET_PQ_QueryParam params[] = {
647     GNUNET_PQ_query_param_auto_from_type (channel_key),
648     GNUNET_PQ_query_param_auto_from_type (slave_key),
649     GNUNET_PQ_query_param_uint32 (&idid_join),
650     GNUNET_PQ_query_param_uint64 (&announced_at),
651     GNUNET_PQ_query_param_uint64 (&effective_since),
652     GNUNET_PQ_query_param_uint64 (&group_generation),
653     GNUNET_PQ_query_param_end
654   };
655
656   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_membership", params);
657   if (GNUNET_OK !=
658       GNUNET_POSTGRES_check_result (plugin->dbh,
659                                     ret,
660                                     PGRES_COMMAND_OK,
661                                     "PQexecPrepared", "insert_membership"))
662   {
663     return GNUNET_SYSERR;
664   }
665
666   PQclear (ret);
667   return GNUNET_OK;
668 }
669
670 /**
671  * Test if a member was admitted to the channel at the given message ID.
672  *
673  * @see GNUNET_PSYCSTORE_membership_test()
674  *
675  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
676  *         #GNUNET_SYSERR if there was en error.
677  */
678 static int
679 membership_test (void *cls,
680                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
681                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
682                  uint64_t message_id)
683 {
684   PGresult *res;
685   struct Plugin *plugin = cls;
686
687   uint32_t did_join = 0;
688
689   int ret = GNUNET_SYSERR;
690
691   struct GNUNET_PQ_QueryParam params_select[] = {
692     GNUNET_PQ_query_param_auto_from_type (channel_key),
693     GNUNET_PQ_query_param_auto_from_type (slave_key),
694     GNUNET_PQ_query_param_uint64 (&message_id),
695     GNUNET_PQ_query_param_end
696   };
697
698   res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select);
699   if (GNUNET_OK !=
700       GNUNET_POSTGRES_check_result (plugin->dbh,
701                                     res,
702                                     PGRES_TUPLES_OK,
703                                     "PQexecPrepared", "select_membership"))
704   {
705     return GNUNET_SYSERR;
706   }
707
708   struct GNUNET_PQ_ResultSpec results_select[] = {
709     GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
710     GNUNET_PQ_result_spec_end
711   };
712
713   switch (GNUNET_PQ_extract_result (res, results_select, 0))
714   {
715     case GNUNET_OK:
716       ret = GNUNET_YES;
717       break;
718
719     default:
720       ret = GNUNET_NO;
721       break;
722   }
723
724   PQclear (res);
725
726   return ret;
727 }
728
729 /**
730  * Store a message fragment sent to a channel.
731  *
732  * @see GNUNET_PSYCSTORE_fragment_store()
733  *
734  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
735  */
736 static int
737 fragment_store (void *cls,
738                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
739                 const struct GNUNET_MULTICAST_MessageHeader *msg,
740                 uint32_t psycstore_flags)
741 {
742   PGresult *res;
743   struct Plugin *plugin = cls;
744
745   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
746
747   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
748
749   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
750   uint64_t message_id = GNUNET_ntohll (msg->message_id);
751   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
752
753   uint32_t hop_counter = ntohl(msg->hop_counter);
754   uint32_t flags = ntohl(msg->flags);
755
756   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
757       message_id > INT64_MAX || group_generation > INT64_MAX)
758   {
759     LOG(GNUNET_ERROR_TYPE_ERROR,
760          "Tried to store fragment with a field > INT64_MAX: "
761          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
762          message_id, group_generation);
763     GNUNET_break (0);
764     return GNUNET_SYSERR;
765   }
766
767   if (GNUNET_OK != channel_key_store (plugin, channel_key))
768     return GNUNET_SYSERR;
769
770   struct GNUNET_PQ_QueryParam params_insert[] = {
771     GNUNET_PQ_query_param_auto_from_type (channel_key),
772     GNUNET_PQ_query_param_uint32 (&hop_counter),
773     GNUNET_PQ_query_param_auto_from_type (&msg->signature),
774     GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
775     GNUNET_PQ_query_param_uint64 (&fragment_id),
776     GNUNET_PQ_query_param_uint64 (&fragment_offset),
777     GNUNET_PQ_query_param_uint64 (&message_id),
778     GNUNET_PQ_query_param_uint64 (&group_generation),
779     GNUNET_PQ_query_param_uint32 (&flags),
780     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
781     GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
782     GNUNET_PQ_query_param_end
783   };
784
785   res = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_fragment", params_insert);
786   if (GNUNET_OK !=
787       GNUNET_POSTGRES_check_result (plugin->dbh,
788                                     res,
789                                     PGRES_COMMAND_OK,
790                                     "PQexecPrepared", "insert_fragment"))
791     return GNUNET_SYSERR;
792
793   PQclear (res);
794   return GNUNET_OK;
795 }
796
797 /**
798  * Set additional flags for a given message.
799  *
800  * They are OR'd with any existing flags set.
801  *
802  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
803  */
804 static int
805 message_add_flags (void *cls,
806                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
807                    uint64_t message_id,
808                    uint32_t psycstore_flags)
809 {
810   PGresult *res;
811   struct Plugin *plugin = cls;
812
813   struct GNUNET_PQ_QueryParam params_update[] = {
814     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
815     GNUNET_PQ_query_param_auto_from_type (channel_key),
816     GNUNET_PQ_query_param_uint64 (&message_id),
817     GNUNET_PQ_query_param_end
818   };
819
820   res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update);
821   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
822                                                  res,
823                                                  PGRES_COMMAND_OK,
824                                                  "PQexecPrepared","update_message_flags"))
825     return GNUNET_SYSERR;
826
827   PQclear (res);
828   return GNUNET_OK;
829 }
830
831
832 static int
833 fragment_row (struct Plugin *plugin,
834               const char *stmt,
835               PGresult *res,
836               GNUNET_PSYCSTORE_FragmentCallback cb,
837               void *cb_cls,
838               uint64_t *returned_fragments)
839 {
840   uint32_t hop_counter;
841   void *signature = NULL;
842   void *purpose = NULL;
843   size_t signature_size;
844   size_t purpose_size;
845
846   uint64_t fragment_id;
847   uint64_t fragment_offset;
848   uint64_t message_id;
849   uint64_t group_generation;
850   uint32_t flags;
851   void *buf;
852   size_t buf_size;
853   int ret = GNUNET_SYSERR;
854   struct GNUNET_MULTICAST_MessageHeader *mp;
855
856   uint32_t msg_flags;
857
858   struct GNUNET_PQ_ResultSpec results[] = {
859     GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
860     GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
861     GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
862     GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
863     GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
864     GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
865     GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
866     GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
867     GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
868     GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
869     GNUNET_PQ_result_spec_end
870   };
871
872   if (GNUNET_OK !=
873       GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
874                                     "PQexecPrepared",
875                                     stmt))
876   {
877     LOG (GNUNET_ERROR_TYPE_DEBUG,
878          "Failing fragment lookup (postgres error)\n");
879     return GNUNET_SYSERR;
880   }
881
882   int nrows = PQntuples (res);
883   for (int row = 0; row < nrows; row++)
884   {
885     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
886     {
887       break;
888     }
889
890     mp = GNUNET_malloc (sizeof (*mp) + buf_size);
891
892     mp->header.size = htons (sizeof (*mp) + buf_size);
893     mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
894     mp->hop_counter = htonl (hop_counter);
895     GNUNET_memcpy (&mp->signature,
896                    signature,
897                    signature_size);
898     GNUNET_memcpy (&mp->purpose,
899                    purpose,
900                    purpose_size);
901     mp->fragment_id = GNUNET_htonll (fragment_id);
902     mp->fragment_offset = GNUNET_htonll (fragment_offset);
903     mp->message_id = GNUNET_htonll (message_id);
904     mp->group_generation = GNUNET_htonll (group_generation);
905     mp->flags = htonl(msg_flags);
906
907     GNUNET_memcpy (&mp[1],
908                    buf,
909                    buf_size);
910     GNUNET_PQ_cleanup_result(results);
911     ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
912     if (NULL != returned_fragments)
913       (*returned_fragments)++;
914   }
915
916   return ret;
917 }
918
919
920 static int
921 fragment_select (struct Plugin *plugin,
922                  const char *stmt,
923                  struct GNUNET_PQ_QueryParam *params,
924                  uint64_t *returned_fragments,
925                  GNUNET_PSYCSTORE_FragmentCallback cb,
926                  void *cb_cls)
927 {
928   PGresult *res;
929   int ret = GNUNET_SYSERR;
930
931   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
932   if (GNUNET_YES ==
933       GNUNET_POSTGRES_check_result (plugin->dbh,
934                                     res,
935                                     PGRES_TUPLES_OK,
936                                     "PQexecPrepared", stmt))
937   {
938     if (PQntuples (res) == 0)
939       ret = GNUNET_NO;
940     else
941     {
942       ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
943     }
944     PQclear (res);
945   }
946
947   return ret;
948 }
949
950 /**
951  * Retrieve a message fragment range by fragment ID.
952  *
953  * @see GNUNET_PSYCSTORE_fragment_get()
954  *
955  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
956  */
957 static int
958 fragment_get (void *cls,
959               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
960               uint64_t first_fragment_id,
961               uint64_t last_fragment_id,
962               uint64_t *returned_fragments,
963               GNUNET_PSYCSTORE_FragmentCallback cb,
964               void *cb_cls)
965 {
966   struct Plugin *plugin = cls;
967   *returned_fragments = 0;
968
969   struct GNUNET_PQ_QueryParam params_select[] = {
970     GNUNET_PQ_query_param_auto_from_type (channel_key),
971     GNUNET_PQ_query_param_uint64 (&first_fragment_id),
972     GNUNET_PQ_query_param_uint64 (&last_fragment_id),
973     GNUNET_PQ_query_param_end
974   };
975
976   return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls);
977 }
978
979
980 /**
981  * Retrieve a message fragment range by fragment ID.
982  *
983  * @see GNUNET_PSYCSTORE_fragment_get_latest()
984  *
985  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
986  */
987 static int
988 fragment_get_latest (void *cls,
989                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
990                      uint64_t fragment_limit,
991                      uint64_t *returned_fragments,
992                      GNUNET_PSYCSTORE_FragmentCallback cb,
993                      void *cb_cls)
994 {
995   struct Plugin *plugin = cls;
996
997   *returned_fragments = 0;
998
999   struct GNUNET_PQ_QueryParam params_select[] = {
1000     GNUNET_PQ_query_param_auto_from_type (channel_key),
1001     GNUNET_PQ_query_param_uint64 (&fragment_limit),
1002     GNUNET_PQ_query_param_end
1003   };
1004
1005   return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls);
1006 }
1007
1008
1009 /**
1010  * Retrieve all fragments of a message ID range.
1011  *
1012  * @see GNUNET_PSYCSTORE_message_get()
1013  *
1014  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1015  */
1016 static int
1017 message_get (void *cls,
1018              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1019              uint64_t first_message_id,
1020              uint64_t last_message_id,
1021              uint64_t fragment_limit,
1022              uint64_t *returned_fragments,
1023              GNUNET_PSYCSTORE_FragmentCallback cb,
1024              void *cb_cls)
1025 {
1026   struct Plugin *plugin = cls;
1027   *returned_fragments = 0;
1028
1029   if (0 == fragment_limit)
1030     fragment_limit = INT64_MAX;
1031
1032   struct GNUNET_PQ_QueryParam params_select[] = {
1033     GNUNET_PQ_query_param_auto_from_type (channel_key),
1034     GNUNET_PQ_query_param_uint64 (&first_message_id),
1035     GNUNET_PQ_query_param_uint64 (&last_message_id),
1036     GNUNET_PQ_query_param_uint64 (&fragment_limit),
1037     GNUNET_PQ_query_param_end
1038   };
1039
1040   return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls);
1041 }
1042
1043
1044 /**
1045  * Retrieve all fragments of the latest messages.
1046  *
1047  * @see GNUNET_PSYCSTORE_message_get_latest()
1048  *
1049  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1050  */
1051 static int
1052 message_get_latest (void *cls,
1053                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1054                     uint64_t message_limit,
1055                     uint64_t *returned_fragments,
1056                     GNUNET_PSYCSTORE_FragmentCallback cb,
1057                     void *cb_cls)
1058 {
1059   struct Plugin *plugin = cls;
1060   *returned_fragments = 0;
1061
1062   struct GNUNET_PQ_QueryParam params_select[] = {
1063     GNUNET_PQ_query_param_auto_from_type (channel_key),
1064     GNUNET_PQ_query_param_auto_from_type (channel_key),
1065     GNUNET_PQ_query_param_uint64 (&message_limit),
1066     GNUNET_PQ_query_param_end
1067   };
1068
1069   return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls);
1070 }
1071
1072
1073 /**
1074  * Retrieve a fragment of message specified by its message ID and fragment
1075  * offset.
1076  *
1077  * @see GNUNET_PSYCSTORE_message_get_fragment()
1078  *
1079  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1080  */
1081 static int
1082 message_get_fragment (void *cls,
1083                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1084                       uint64_t message_id,
1085                       uint64_t fragment_offset,
1086                       GNUNET_PSYCSTORE_FragmentCallback cb,
1087                       void *cb_cls)
1088 {
1089   PGresult *res;
1090   struct Plugin *plugin = cls;
1091   int ret = GNUNET_SYSERR;
1092   const char *stmt = "select_message_fragment";
1093
1094   struct GNUNET_PQ_QueryParam params_select[] = {
1095     GNUNET_PQ_query_param_auto_from_type (channel_key),
1096     GNUNET_PQ_query_param_uint64 (&message_id),
1097     GNUNET_PQ_query_param_uint64 (&fragment_offset),
1098     GNUNET_PQ_query_param_end
1099   };
1100
1101   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1102   if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh,
1103                                                  res,
1104                                                  PGRES_TUPLES_OK,
1105                                                  "PQexecPrepared", stmt))
1106   {
1107     if (PQntuples (res) == 0)
1108       ret = GNUNET_NO;
1109     else
1110       ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
1111
1112     PQclear (res);
1113   }
1114
1115   return ret;
1116 }
1117
1118 /**
1119  * Retrieve the max. values of message counters for a channel.
1120  *
1121  * @see GNUNET_PSYCSTORE_counters_get()
1122  *
1123  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1124  */
1125 static int
1126 counters_message_get (void *cls,
1127                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1128                       uint64_t *max_fragment_id,
1129                       uint64_t *max_message_id,
1130                       uint64_t *max_group_generation)
1131 {
1132   PGresult *res;
1133   struct Plugin *plugin = cls;
1134
1135   const char *stmt = "select_counters_message";
1136
1137   struct GNUNET_PQ_QueryParam params_select[] = {
1138     GNUNET_PQ_query_param_auto_from_type (channel_key),
1139     GNUNET_PQ_query_param_end
1140   };
1141
1142   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1143   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1144                                                  res,
1145                                                  PGRES_TUPLES_OK,
1146                                                  "PQexecPrepared", stmt))
1147   {
1148     return GNUNET_SYSERR;
1149   }
1150
1151   struct GNUNET_PQ_ResultSpec results_select[] = {
1152     GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
1153     GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
1154     GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
1155     GNUNET_PQ_result_spec_end
1156   };
1157
1158   if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0))
1159   {
1160     PQclear (res);
1161     return GNUNET_SYSERR;
1162   }
1163
1164   GNUNET_PQ_cleanup_result(results_select);
1165   PQclear (res);
1166
1167   return GNUNET_OK;
1168 }
1169
1170 /**
1171  * Retrieve the max. values of state counters for a channel.
1172  *
1173  * @see GNUNET_PSYCSTORE_counters_get()
1174  *
1175  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1176  */
1177 static int
1178 counters_state_get (void *cls,
1179                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1180                     uint64_t *max_state_message_id)
1181 {
1182   PGresult *res;
1183   struct Plugin *plugin = cls;
1184
1185   const char *stmt = "select_counters_state";
1186
1187   int ret = GNUNET_SYSERR;
1188
1189   struct GNUNET_PQ_QueryParam params_select[] = {
1190     GNUNET_PQ_query_param_auto_from_type (channel_key),
1191     GNUNET_PQ_query_param_end
1192   };
1193
1194   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1195   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1196                                                  res,
1197                                                  PGRES_TUPLES_OK,
1198                                                  "PQexecPrepared", stmt))
1199   {
1200     return GNUNET_SYSERR;
1201   }
1202
1203   struct GNUNET_PQ_ResultSpec results_select[] = {
1204     GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1205     GNUNET_PQ_result_spec_end
1206   };
1207
1208   ret = GNUNET_PQ_extract_result (res, results_select, 0);
1209
1210   if (GNUNET_OK != ret)
1211   {
1212     PQclear (res);
1213     return GNUNET_SYSERR;
1214   }
1215
1216   GNUNET_PQ_cleanup_result(results_select);
1217   PQclear (res);
1218
1219   return ret;
1220 }
1221
1222
1223 /**
1224  * Assign a value to a state variable.
1225  *
1226  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1227  */
1228 static int
1229 state_assign (struct Plugin *plugin, const char *stmt,
1230               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1231               const char *name, const void *value, size_t value_size)
1232 {
1233   PGresult *res;
1234
1235   struct GNUNET_PQ_QueryParam params[] = {
1236     GNUNET_PQ_query_param_auto_from_type (channel_key),
1237     GNUNET_PQ_query_param_string (name),
1238     GNUNET_PQ_query_param_fixed_size (value, value_size),
1239     GNUNET_PQ_query_param_end
1240   };
1241
1242   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
1243   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1244                                                  res,
1245                                                  PGRES_COMMAND_OK,
1246                                                  "PQexecPrepared", stmt))
1247   {
1248     return GNUNET_SYSERR;
1249   }
1250
1251   PQclear (res);
1252
1253   return GNUNET_OK;
1254 }
1255
1256
1257 static int
1258 update_message_id (struct Plugin *plugin, const char *stmt,
1259                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1260                    uint64_t message_id)
1261 {
1262   PGresult *res;
1263
1264   struct GNUNET_PQ_QueryParam params[] = {
1265     GNUNET_PQ_query_param_uint64 (&message_id),
1266     GNUNET_PQ_query_param_auto_from_type (channel_key),
1267     GNUNET_PQ_query_param_end
1268   };
1269
1270   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
1271   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1272                                                  res,
1273                                                  PGRES_COMMAND_OK,
1274                                                  "PQexecPrepared", stmt))
1275   {
1276     return GNUNET_SYSERR;
1277   }
1278
1279   PQclear (res);
1280
1281   return GNUNET_OK;
1282 }
1283
1284
1285 /**
1286  * Begin modifying current state.
1287  */
1288 static int
1289 state_modify_begin (void *cls,
1290                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1291                     uint64_t message_id, uint64_t state_delta)
1292 {
1293   struct Plugin *plugin = cls;
1294
1295   if (state_delta > 0)
1296   {
1297     /**
1298      * We can only apply state modifiers in the current message if modifiers in
1299      * the previous stateful message (message_id - state_delta) were already
1300      * applied.
1301      */
1302
1303     uint64_t max_state_message_id = 0;
1304     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1305     switch (ret)
1306     {
1307     case GNUNET_OK:
1308     case GNUNET_NO: // no state yet
1309       ret = GNUNET_OK;
1310       break;
1311
1312     default:
1313       return ret;
1314     }
1315
1316     if (max_state_message_id < message_id - state_delta)
1317       return GNUNET_NO; /* some stateful messages not yet applied */
1318     else if (message_id - state_delta < max_state_message_id)
1319       return GNUNET_NO; /* changes already applied */
1320   }
1321
1322   if (TRANSACTION_NONE != plugin->transaction)
1323   {
1324     /** @todo FIXME: wait for other transaction to finish  */
1325     return GNUNET_SYSERR;
1326   }
1327   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1328 }
1329
1330
1331 /**
1332  * Set the current value of state variable.
1333  *
1334  * @see GNUNET_PSYCSTORE_state_modify()
1335  *
1336  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1337  */
1338 static int
1339 state_modify_op (void *cls,
1340                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1341                  enum GNUNET_PSYC_Operator op,
1342                  const char *name, const void *value, size_t value_size)
1343 {
1344   struct Plugin *plugin = cls;
1345   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1346
1347   switch (op)
1348   {
1349   case GNUNET_PSYC_OP_ASSIGN:
1350     return state_assign (plugin, "insert_state_current",
1351                          channel_key, name, value, value_size);
1352
1353   default: /** @todo implement more state operations */
1354     GNUNET_break (0);
1355     return GNUNET_SYSERR;
1356   }
1357 }
1358
1359
1360 /**
1361  * End modifying current state.
1362  */
1363 static int
1364 state_modify_end (void *cls,
1365                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1366                   uint64_t message_id)
1367 {
1368   struct Plugin *plugin = cls;
1369   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1370
1371   return
1372     GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1373     && GNUNET_OK == update_message_id (plugin,
1374                                        "update_max_state_message_id",
1375                                        channel_key, message_id)
1376     && GNUNET_OK == transaction_commit (plugin)
1377     ? GNUNET_OK : GNUNET_SYSERR;
1378 }
1379
1380
1381 /**
1382  * Begin state synchronization.
1383  */
1384 static int
1385 state_sync_begin (void *cls,
1386                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1387 {
1388   struct Plugin *plugin = cls;
1389   return exec_channel (plugin, "delete_state_sync", channel_key);
1390 }
1391
1392
1393 /**
1394  * Assign current value of a state variable.
1395  *
1396  * @see GNUNET_PSYCSTORE_state_modify()
1397  *
1398  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1399  */
1400 static int
1401 state_sync_assign (void *cls,
1402                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1403                 const char *name, const void *value, size_t value_size)
1404 {
1405   struct Plugin *plugin = cls;
1406   return state_assign (plugin, "insert_state_sync",
1407                        channel_key, name, value, value_size);
1408 }
1409
1410
1411 /**
1412  * End modifying current state.
1413  */
1414 static int
1415 state_sync_end (void *cls,
1416                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1417                 uint64_t max_state_message_id,
1418                 uint64_t state_hash_message_id)
1419 {
1420   struct Plugin *plugin = cls;
1421   int ret = GNUNET_SYSERR;
1422
1423   if (TRANSACTION_NONE != plugin->transaction)
1424   {
1425     /** @todo FIXME: wait for other transaction to finish  */
1426     return GNUNET_SYSERR;
1427   }
1428
1429   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1430     && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1431     && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1432                                   channel_key)
1433     && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1434                                   channel_key)
1435     && GNUNET_OK == update_message_id (plugin,
1436                                        "update_state_hash_message_id",
1437                                        channel_key, state_hash_message_id)
1438     && GNUNET_OK == update_message_id (plugin,
1439                                        "update_max_state_message_id",
1440                                        channel_key, max_state_message_id)
1441     && GNUNET_OK == transaction_commit (plugin)
1442     ? ret = GNUNET_OK
1443     : transaction_rollback (plugin);
1444   return ret;
1445 }
1446
1447
1448 /**
1449  * Delete the whole state.
1450  *
1451  * @see GNUNET_PSYCSTORE_state_reset()
1452  *
1453  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1454  */
1455 static int
1456 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1457 {
1458   struct Plugin *plugin = cls;
1459   return exec_channel (plugin, "delete_state", channel_key);
1460 }
1461
1462
1463 /**
1464  * Update signed values of state variables in the state store.
1465  *
1466  * @see GNUNET_PSYCSTORE_state_hash_update()
1467  *
1468  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1469  */
1470 static int
1471 state_update_signed (void *cls,
1472                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1473 {
1474   struct Plugin *plugin = cls;
1475   return exec_channel (plugin, "update_state_signed", channel_key);
1476 }
1477
1478
1479 /**
1480  * Retrieve a state variable by name.
1481  *
1482  * @see GNUNET_PSYCSTORE_state_get()
1483  *
1484  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1485  */
1486 static int
1487 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1488            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1489 {
1490   PGresult *res;
1491
1492   struct Plugin *plugin = cls;
1493   int ret = GNUNET_SYSERR;
1494
1495   const char *stmt = "select_state_one";
1496
1497   struct GNUNET_PQ_QueryParam params_select[] = {
1498     GNUNET_PQ_query_param_auto_from_type (channel_key),
1499     GNUNET_PQ_query_param_string (name),
1500     GNUNET_PQ_query_param_end
1501   };
1502
1503   void *value_current = NULL;
1504   size_t value_size = 0;
1505
1506   struct GNUNET_PQ_ResultSpec results[] = {
1507     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1508     GNUNET_PQ_result_spec_end
1509   };
1510
1511   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1512   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1513                                                  res,
1514                                                  PGRES_TUPLES_OK,
1515                                                  "PQexecPrepared", stmt))
1516   {
1517     return GNUNET_SYSERR;
1518   }
1519
1520   if (PQntuples (res) == 0)
1521   {
1522     PQclear (res);
1523     ret = GNUNET_NO;
1524   }
1525
1526   ret = GNUNET_PQ_extract_result (res, results, 0);
1527
1528   if (GNUNET_OK != ret)
1529   {
1530     PQclear (res);
1531     return GNUNET_SYSERR;
1532   }
1533
1534   ret = cb (cb_cls, name, value_current,
1535             value_size);
1536
1537   GNUNET_PQ_cleanup_result(results);
1538   PQclear (res);
1539
1540   return ret;
1541 }
1542
1543
1544 /**
1545  * Retrieve all state variables for a channel with the given prefix.
1546  *
1547  * @see GNUNET_PSYCSTORE_state_get_prefix()
1548  *
1549  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1550  */
1551 static int
1552 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1553                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1554                   void *cb_cls)
1555 {
1556   PGresult *res;
1557   struct Plugin *plugin = cls;
1558   int ret = GNUNET_NO;
1559
1560   const char *stmt = "select_state_prefix";
1561
1562   uint32_t name_len = (uint32_t) strlen (name);
1563
1564   struct GNUNET_PQ_QueryParam params_select[] = {
1565     GNUNET_PQ_query_param_auto_from_type (channel_key),
1566     GNUNET_PQ_query_param_string (name),
1567     GNUNET_PQ_query_param_uint32 (&name_len),
1568     GNUNET_PQ_query_param_string (name),
1569     GNUNET_PQ_query_param_end
1570   };
1571
1572   char *name2 = "";
1573   void *value_current = NULL;
1574   size_t value_size = 0;
1575
1576   struct GNUNET_PQ_ResultSpec results[] = {
1577     GNUNET_PQ_result_spec_string ("name", &name2),
1578     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1579     GNUNET_PQ_result_spec_end
1580   };
1581
1582   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1583   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1584                                                  res,
1585                                                  PGRES_TUPLES_OK,
1586                                                  "PQexecPrepared", stmt))
1587   {
1588     return GNUNET_SYSERR;
1589   }
1590
1591   int nrows = PQntuples (res);
1592   for (int row = 0; row < nrows; row++)
1593   {
1594     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1595     {
1596       break;
1597     }
1598
1599     ret = cb (cb_cls, (const char *) name2,
1600               value_current,
1601               value_size);
1602     GNUNET_PQ_cleanup_result(results);
1603   }
1604
1605   PQclear (res);
1606
1607   return ret;
1608 }
1609
1610
1611 /**
1612  * Retrieve all signed state variables for a channel.
1613  *
1614  * @see GNUNET_PSYCSTORE_state_get_signed()
1615  *
1616  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1617  */
1618 static int
1619 state_get_signed (void *cls,
1620                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1621                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1622 {
1623   PGresult *res;
1624   struct Plugin *plugin = cls;
1625   int ret = GNUNET_NO;
1626
1627   const char *stmt = "select_state_signed";
1628
1629   struct GNUNET_PQ_QueryParam params_select[] = {
1630     GNUNET_PQ_query_param_auto_from_type (channel_key),
1631     GNUNET_PQ_query_param_end
1632   };
1633
1634   char *name = "";
1635   void *value_signed = NULL;
1636   size_t value_size = 0;
1637
1638   struct GNUNET_PQ_ResultSpec results[] = {
1639     GNUNET_PQ_result_spec_string ("name", &name),
1640     GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size),
1641     GNUNET_PQ_result_spec_end
1642   };
1643
1644   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1645   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1646                                                  res,
1647                                                  PGRES_TUPLES_OK,
1648                                                  "PQexecPrepared", stmt))
1649   {
1650     return GNUNET_SYSERR;
1651   }
1652
1653   int nrows = PQntuples (res);
1654   for (int row = 0; row < nrows; row++)
1655   {
1656     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1657     {
1658       break;
1659     }
1660
1661     ret = cb (cb_cls, (const char *) name,
1662               value_signed,
1663               value_size);
1664
1665     GNUNET_PQ_cleanup_result (results);
1666   }
1667
1668   PQclear (res);
1669
1670   return ret;
1671 }
1672
1673
1674 /**
1675  * Entry point for the plugin.
1676  *
1677  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1678  * @return NULL on error, otherwise the plugin context
1679  */
1680 void *
1681 libgnunet_plugin_psycstore_postgres_init (void *cls)
1682 {
1683   static struct Plugin plugin;
1684   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1685   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1686
1687   if (NULL != plugin.cfg)
1688     return NULL;                /* can only initialize once! */
1689   memset (&plugin, 0, sizeof (struct Plugin));
1690   plugin.cfg = cfg;
1691   if (GNUNET_OK != database_setup (&plugin))
1692   {
1693     database_shutdown (&plugin);
1694     return NULL;
1695   }
1696   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1697   api->cls = &plugin;
1698   api->membership_store = &postgres_membership_store;
1699   api->membership_test = &membership_test;
1700   api->fragment_store = &fragment_store;
1701   api->message_add_flags = &message_add_flags;
1702   api->fragment_get = &fragment_get;
1703   api->fragment_get_latest = &fragment_get_latest;
1704   api->message_get = &message_get;
1705   api->message_get_latest = &message_get_latest;
1706   api->message_get_fragment = &message_get_fragment;
1707   api->counters_message_get = &counters_message_get;
1708   api->counters_state_get = &counters_state_get;
1709   api->state_modify_begin = &state_modify_begin;
1710   api->state_modify_op = &state_modify_op;
1711   api->state_modify_end = &state_modify_end;
1712   api->state_sync_begin = &state_sync_begin;
1713   api->state_sync_assign = &state_sync_assign;
1714   api->state_sync_end = &state_sync_end;
1715   api->state_reset = &state_reset;
1716   api->state_update_signed = &state_update_signed;
1717   api->state_get = &state_get;
1718   api->state_get_prefix = &state_get_prefix;
1719   api->state_get_signed = &state_get_signed;
1720
1721   LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1722   return api;
1723 }
1724
1725
1726 /**
1727  * Exit point from the plugin.
1728  *
1729  * @param cls The plugin context (as returned by "init")
1730  * @return Always NULL
1731  */
1732 void *
1733 libgnunet_plugin_psycstore_postgres_done (void *cls)
1734 {
1735   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1736   struct Plugin *plugin = api->cls;
1737
1738   database_shutdown (plugin);
1739   plugin->cfg = NULL;
1740   GNUNET_free (api);
1741   LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1742   return NULL;
1743 }
1744
1745 /* end of plugin_psycstore_postgres.c */