psycstore: fix puckish copy&paste in postgres plugin
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_postgres.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2016 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_postgres.c
23  * @brief PostgresQL-based psycstore backend
24  * @author Daniel Golle
25  * @author Gabor X Toth
26  * @author Christian Grothoff
27  * @author Christophe Genevey
28  */
29
30 #include "platform.h"
31 #include "gnunet_psycstore_plugin.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "psycstore.h"
37 #include "gnunet_postgres_lib.h"
38 #include "gnunet_pq_lib.h"
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
55
56 enum Transactions {
57   TRANSACTION_NONE = 0,
58   TRANSACTION_STATE_MODIFY,
59   TRANSACTION_STATE_SYNC,
60 };
61
62 /**
63  * Context for all functions in this plugin.
64  */
65 struct Plugin
66 {
67
68   const struct GNUNET_CONFIGURATION_Handle *cfg;
69
70   /**
71    * Native Postgres database handle.
72    */
73   PGconn *dbh;
74
75   enum Transactions transaction;
76
77   void *cls;
78 };
79
80
81 /**
82  * Initialize the database connections and associated
83  * data structures (create tables and indices
84  * as needed as well).
85  *
86  * @param plugin the plugin context (state for this module)
87  * @return GNUNET_OK on success
88  */
89 static int
90 database_setup (struct Plugin *plugin)
91 {
92   /* Open database and precompile statements */
93   plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg,
94                                          "psycstore-postgres");
95   if (NULL == plugin->dbh)
96     return GNUNET_SYSERR;
97
98   /* Create tables */
99   if ((GNUNET_OK !=
100          GNUNET_POSTGRES_exec(plugin->dbh,
101                               "CREATE TABLE IF NOT EXISTS channels (\n"
102                               " id SERIAL,\n"
103                               " pub_key BYTEA,\n"
104                               " max_state_message_id INT,\n"
105                               " state_hash_message_id INT,\n"
106                               " PRIMARY KEY(id)\n"
107                               ")" "WITH OIDS")) ||
108
109       (GNUNET_OK !=
110          GNUNET_POSTGRES_exec(plugin->dbh,
111                               "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
112                               " ON channels (substring(pub_key from 1 for 5)\n"
113                               ")")) ||
114
115       (GNUNET_OK !=
116          GNUNET_POSTGRES_exec(plugin->dbh,
117                               "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
118                               " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
119                               "RETURNS NULL ON NULL INPUT")) ||
120
121       (GNUNET_OK !=
122          GNUNET_POSTGRES_exec(plugin->dbh,
123                               "CREATE TABLE IF NOT EXISTS slaves (\n"
124                               " id SERIAL,\n"
125                               " pub_key BYTEA,\n"
126                               " PRIMARY KEY(id)\n"
127                               ")" "WITH OIDS")) ||
128
129       (GNUNET_OK !=
130          GNUNET_POSTGRES_exec(plugin->dbh,
131                               "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
132                               " ON slaves (substring(pub_key from 1 for 5)\n"
133                               ")")) ||
134
135       (GNUNET_OK !=
136          GNUNET_POSTGRES_exec(plugin->dbh,
137                               "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
138                               " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
139                               "RETURNS NULL ON NULL INPUT")) ||
140
141       (GNUNET_OK !=
142          GNUNET_POSTGRES_exec(plugin->dbh,
143                               "CREATE TABLE IF NOT EXISTS membership (\n"
144                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
145                               "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
146                               "  did_join INT NOT NULL,\n"
147                               "  announced_at BIGINT NOT NULL,\n"
148                               "  effective_since BIGINT NOT NULL,\n"
149                               "  group_generation BIGINT NOT NULL\n"
150                               ")" "WITH OIDS")) ||
151
152       (GNUNET_OK !=
153          GNUNET_POSTGRES_exec(plugin->dbh,
154                               "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
155                               "ON membership (channel_id, slave_id)")) ||
156
157   /** @todo messages table: add method_name column */
158       (GNUNET_OK !=
159          GNUNET_POSTGRES_exec(plugin->dbh,
160                               "CREATE TABLE IF NOT EXISTS messages (\n"
161                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
162                               "  hop_counter BIGINT NOT NULL,\n"
163                               "  signature BYTEA,\n"
164                               "  purpose BYTEA,\n"
165                               "  fragment_id BIGINT NOT NULL,\n"
166                               "  fragment_offset BIGINT NOT NULL,\n"
167                               "  message_id BIGINT NOT NULL,\n"
168                               "  group_generation BIGINT NOT NULL,\n"
169                               "  multicast_flags BIGINT NOT NULL,\n"
170                               "  psycstore_flags BIGINT NOT NULL,\n"
171                               "  data BYTEA,\n"
172                               "  PRIMARY KEY (channel_id, fragment_id),\n"
173                               "  UNIQUE (channel_id, message_id, fragment_offset)\n"
174                               ")" "WITH OIDS")) ||
175
176       (GNUNET_OK !=
177          GNUNET_POSTGRES_exec(plugin->dbh,
178                               "CREATE TABLE IF NOT EXISTS state (\n"
179                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
180                               "  name TEXT NOT NULL,\n"
181                               "  value_current BYTEA,\n"
182                               "  value_signed BYTEA\n"
183                               ")" "WITH OIDS")) ||
184
185       (GNUNET_OK !=
186          GNUNET_POSTGRES_exec(plugin->dbh,
187                               "CREATE UNIQUE INDEX IF NOT EXISTS state_uniq_idx \n"
188                               " ON state (channel_id, substring(name from 1 for 5)\n"
189                               ")")) ||
190
191       (GNUNET_OK !=
192          GNUNET_POSTGRES_exec(plugin->dbh,
193                               "CREATE TABLE IF NOT EXISTS state_sync (\n"
194                               "  channel_id INT NOT NULL REFERENCES channels(id),\n"
195                               "  name TEXT NOT NULL,\n"
196                               "  value BYTEA,\n"
197                               "  PRIMARY KEY (channel_id)\n"
198                               ")" "WITH OIDS")) ||
199
200       (GNUNET_OK !=
201          GNUNET_POSTGRES_exec(plugin->dbh,
202                               "CREATE UNIQUE INDEX IF NOT EXISTS state_sync_name_idx \n"
203                               " ON state_sync (substring(name from 1 for 5)\n"
204                               ")")))
205   {
206     PQfinish (plugin->dbh);
207     plugin->dbh = NULL;
208     return GNUNET_SYSERR;
209   }
210
211
212   /* Prepare statements */
213   if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
214                            "transaction_begin",
215                            "BEGIN", 0)) ||
216
217       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
218                            "transaction_commit",
219                            "COMMIT", 0)) ||
220
221       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
222                            "transaction_rollback",
223                            "ROLLBACK", 0)) ||
224
225       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
226                            "insert_channel_key",
227                            "INSERT INTO channels (pub_key) VALUES ($1)"
228                            " ON CONFLICT DO NOTHING", 1)) ||
229
230       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
231                            "insert_slave_key",
232                            "INSERT INTO slaves (pub_key) VALUES ($1)"
233                            " ON CONFLICT DO NOTHING", 1)) ||
234
235       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
236                            "insert_membership",
237                            "INSERT INTO membership\n"
238                            " (channel_id, slave_id, did_join, announced_at,\n"
239                            "  effective_since, group_generation)\n"
240                            "VALUES (get_chan_id($1),\n"
241                            "        get_slave_id($2),\n"
242                            "        $3, $4, $5, $6)", 6)) ||
243
244       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
245                            "select_membership",
246                            "SELECT did_join FROM membership\n"
247                            "WHERE channel_id = get_chan_id($1)\n"
248                            "      AND slave_id = get_slave_id($2)\n"
249                            "      AND effective_since <= $3 AND did_join = 1\n"
250                            "ORDER BY announced_at DESC LIMIT 1", 3)) ||
251
252       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
253                            "insert_fragment",
254                            "INSERT INTO messages\n"
255                            " (channel_id, hop_counter, signature, purpose,\n"
256                            "  fragment_id, fragment_offset, message_id,\n"
257                            "  group_generation, multicast_flags, psycstore_flags, data)\n"
258                            "VALUES (get_chan_id($1),\n"
259                            "        $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
260                            "ON CONFLICT DO NOTHING", 11)) ||
261
262       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
263                            "update_message_flags",
264                            "UPDATE messages\n"
265                            "SET psycstore_flags = psycstore_flags | $1\n"
266                            "WHERE channel_id = get_chan_id($2) \n"
267                            "      AND message_id = $3 AND fragment_offset = 0", 3)) ||
268
269       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
270                            "select_fragments",
271                            "SELECT hop_counter, signature, purpose, fragment_id,\n"
272                            "       fragment_offset, message_id, group_generation,\n"
273                            "       multicast_flags, psycstore_flags, data\n"
274                            "FROM messages\n"
275                            "WHERE channel_id = get_chan_id($1) \n"
276                            "      AND $2 <= fragment_id AND fragment_id <= $3", 3)) ||
277
278       /** @todo select_messages: add method_prefix filter */
279       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
280                 "select_messages",
281                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
282                 "       fragment_offset, message_id, group_generation,\n"
283                 "       multicast_flags, psycstore_flags, data\n"
284                 "FROM messages\n"
285                 "WHERE channel_id = get_chan_id($1) \n"
286                 "      AND $2 <= message_id AND message_id <= $3"
287                 "LIMIT $4;", 4)) ||
288
289       /** @todo select_latest_messages: add method_prefix filter */
290       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
291                 "select_latest_fragments",
292                 "SELECT  rev.hop_counter AS hop_counter,\n"
293                 "        rev.signature AS signature,\n"
294                 "        rev.purpose AS purpose,\n"
295                 "        rev.fragment_id AS fragment_id,\n"
296                 "        rev.fragment_offset AS fragment_offset,\n"
297                 "        rev.message_id AS message_id,\n"
298                 "        rev.group_generation AS group_generation,\n"
299                 "        rev.multicast_flags AS multicast_flags,\n"
300                 "        rev.psycstore_flags AS psycstore_flags,\n"
301                 "        rev.data AS data\n"
302                 " FROM\n"
303                 " (SELECT hop_counter, signature, purpose, fragment_id,\n"
304                 "        fragment_offset, message_id, group_generation,\n"
305                 "        multicast_flags, psycstore_flags, data \n"
306                 "  FROM messages\n"
307                 "  WHERE channel_id = get_chan_id($1) \n"
308                 "  ORDER BY fragment_id DESC\n"
309                 "  LIMIT $2) AS rev\n"
310                 " ORDER BY rev.fragment_id;", 2)) ||
311
312       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
313                 "select_latest_messages",
314                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
315                 "       fragment_offset, message_id, group_generation,\n"
316                 "        multicast_flags, psycstore_flags, data\n"
317                 "FROM messages\n"
318                 "WHERE channel_id = get_chan_id($1)\n"
319                 "      AND message_id IN\n"
320                 "      (SELECT message_id\n"
321                 "       FROM messages\n"
322                 "       WHERE channel_id = get_chan_id($2) \n"
323                 "       GROUP BY message_id\n"
324                 "       ORDER BY message_id\n"
325                 "       DESC LIMIT $3)\n"
326                 "ORDER BY fragment_id", 3)) ||
327
328       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
329                 "select_message_fragment",
330                 "SELECT hop_counter, signature, purpose, fragment_id,\n"
331                 "       fragment_offset, message_id, group_generation,\n"
332                 "       multicast_flags, psycstore_flags, data\n"
333                 "FROM messages\n"
334                 "WHERE channel_id = get_chan_id($1) \n"
335                 "      AND message_id = $2 AND fragment_offset = $3", 3)) ||
336
337       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
338                 "select_counters_message",
339                 "SELECT fragment_id, message_id, group_generation\n"
340                 "FROM messages\n"
341                 "WHERE channel_id = get_chan_id($1)\n"
342                 "ORDER BY fragment_id DESC LIMIT 1", 1)) ||
343
344       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
345                 "select_counters_state",
346                 "SELECT max_state_message_id\n"
347                 "FROM channels\n"
348                 "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) ||
349
350       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
351                 "update_max_state_message_id",
352                 "UPDATE channels\n"
353                 "SET max_state_message_id = $1\n"
354                 "WHERE pub_key = $2", 2)) ||
355
356       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
357                 "update_state_hash_message_id",
358                 "UPDATE channels\n"
359                 "SET state_hash_message_id = $1\n"
360                 "WHERE pub_key = $2", 2)) ||
361
362       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
363                 "insert_state_current",
364                 "INSERT INTO state\n"
365                 "  (channel_id, name, value_current, value_signed)\n"
366                 "SELECT new.channel_id, new.name,\n"
367                 "       new.value_current, old.value_signed\n"
368                 "FROM (SELECT get_chan_id($1) AS channel_id,\n"
369                 "             $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
370                 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
371                 "           FROM state) AS old\n"
372                 "ON new.channel_id = old.channel_id AND new.name = old.name\n"
373                 "ON CONFLICT ( channel_id, substring(name from 1 for 5) )\n"
374                 "   DO UPDATE SET value_current = EXCLUDED.value_current,\n"
375                 "                 value_signed = EXCLUDED.value_signed", 3)) ||
376
377       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
378                 "delete_state_empty",
379                 "DELETE FROM state\n"
380                 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
381                 "      AND (value_current IS NULL OR length(value_current) = 0)\n"
382                 "      AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) ||
383
384       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
385                 "update_state_signed",
386                 "UPDATE state\n"
387                 "SET value_signed = value_current\n"
388                 "WHERE channel_id = get_chan_id($1) ", 1)) ||
389
390       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
391                 "delete_state",
392                 "DELETE FROM state\n"
393                 "WHERE channel_id = get_chan_id($1) ", 1)) ||
394
395       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
396                 "insert_state_sync",
397                 "INSERT INTO state_sync (channel_id, name, value)\n"
398                 "VALUES (get_chan_id($1), $2, $3)", 3)) ||
399
400       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
401                 "insert_state_from_sync",
402                 "INSERT INTO state\n"
403                 " (channel_id, name, value_current, value_signed)\n"
404                 "SELECT channel_id, name, value, value\n"
405                 "FROM state_sync\n"
406                 "WHERE channel_id = get_chan_id($1)", 1)) ||
407
408       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
409                 "delete_state_sync",
410                 "DELETE FROM state_sync\n"
411                 "WHERE channel_id = get_chan_id($1)", 1)) ||
412
413       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
414                 "select_state_one",
415                 "SELECT value_current\n"
416                 "FROM state\n"
417                 "WHERE channel_id = get_chan_id($1)\n"
418                 "      AND name = $2", 2)) ||
419
420       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
421                 "select_state_prefix",
422                 "SELECT name, value_current\n"
423                 "FROM state\n"
424                 "WHERE channel_id = get_chan_id($1)\n"
425                 "      AND (name = $2 OR substr(name, 1, $3) = $4 || '_')", 4)) ||
426
427       (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
428                 "select_state_signed",
429                 "SELECT name, value_signed\n"
430                 "FROM state\n"
431                 "WHERE channel_id = get_chan_id($1)\n"
432                 "      AND value_signed IS NOT NULL", 1)))
433   {
434     PQfinish (plugin->dbh);
435     plugin->dbh = NULL;
436     return GNUNET_SYSERR;
437   }
438
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Shutdown database connection and associate data
445  * structures.
446  * @param plugin the plugin context (state for this module)
447  */
448 static void
449 database_shutdown (struct Plugin *plugin)
450 {
451   PQfinish (plugin->dbh);
452   plugin->dbh = NULL;
453 }
454
455
456 /**
457  * Execute a prepared statement with a @a channel_key argument.
458  *
459  * @param plugin Plugin handle.
460  * @param stmt Statement to execute.
461  * @param channel_key Public key of the channel.
462  *
463  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
464  */
465 static int
466 exec_channel (struct Plugin *plugin, const char *stmt,
467               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
468 {
469   PGresult *ret;
470   struct GNUNET_PQ_QueryParam params[] = {
471     GNUNET_PQ_query_param_auto_from_type (channel_key),
472     GNUNET_PQ_query_param_end
473   };
474
475   ret = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
476   if (GNUNET_OK !=
477       GNUNET_POSTGRES_check_result (plugin->dbh,
478                                     ret,
479                                     PGRES_COMMAND_OK,
480                                     "PQexecPrepared", stmt))
481     return GNUNET_SYSERR;
482
483   PQclear (ret);
484
485   return GNUNET_OK;
486 }
487
488
489 /**
490  * Begin a transaction.
491  */
492 static int
493 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
494 {
495   PGresult *ret;
496   struct GNUNET_PQ_QueryParam params[] = {
497     GNUNET_PQ_query_param_end
498   };
499
500   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_begin", params);
501   if (GNUNET_OK !=
502       GNUNET_POSTGRES_check_result (plugin->dbh,
503                                     ret,
504                                     PGRES_COMMAND_OK,
505                                     "PQexecPrepared", "transaction_begin"))
506   {
507     return GNUNET_SYSERR;
508   }
509
510   plugin->transaction = transaction;
511   PQclear (ret);
512   return GNUNET_OK;
513 }
514
515
516 /**
517  * Commit current transaction.
518  */
519 static int
520 transaction_commit (struct Plugin *plugin)
521 {
522   PGresult *ret;
523
524   struct GNUNET_PQ_QueryParam params[] = {
525     GNUNET_PQ_query_param_end
526   };
527
528   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_commit", params);
529   if (GNUNET_OK !=
530       GNUNET_POSTGRES_check_result (plugin->dbh,
531                                     ret,
532                                     PGRES_COMMAND_OK,
533                                     "PQexecPrepared", "transaction_commit"))
534   {
535     return GNUNET_SYSERR;
536   }
537
538   PQclear (ret);
539   plugin->transaction = TRANSACTION_NONE;
540   return GNUNET_OK;
541 }
542
543
544 /**
545  * Roll back current transaction.
546  */
547 static int
548 transaction_rollback (struct Plugin *plugin)
549 {
550   PGresult *ret;
551
552   struct GNUNET_PQ_QueryParam params[] = {
553     GNUNET_PQ_query_param_end
554   };
555
556   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_rollback", params);
557   if (GNUNET_OK !=
558       GNUNET_POSTGRES_check_result (plugin->dbh,
559                                     ret,
560                                     PGRES_COMMAND_OK,
561                                     "PQexecPrepared", "transaction_rollback"))
562   {
563     return GNUNET_SYSERR;
564   }
565
566   PQclear (ret);
567   plugin->transaction = TRANSACTION_NONE;
568   return GNUNET_OK;
569 }
570
571
572 static int
573 channel_key_store (struct Plugin *plugin,
574                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
575 {
576   PGresult *ret;
577
578   struct GNUNET_PQ_QueryParam params[] = {
579     GNUNET_PQ_query_param_auto_from_type (channel_key),
580     GNUNET_PQ_query_param_end
581   };
582
583   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_channel_key", params);
584   if (GNUNET_OK !=
585       GNUNET_POSTGRES_check_result (plugin->dbh,
586                                     ret,
587                                     PGRES_COMMAND_OK,
588                                     "PQexecPrepared", "insert_channel_key"))
589   {
590     return GNUNET_SYSERR;
591   }
592
593   PQclear (ret);
594   return GNUNET_OK;
595 }
596
597
598 static int
599 slave_key_store (struct Plugin *plugin,
600                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
601 {
602   PGresult *ret;
603
604   struct GNUNET_PQ_QueryParam params[] = {
605     GNUNET_PQ_query_param_auto_from_type (slave_key),
606     GNUNET_PQ_query_param_end
607   };
608
609   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params);
610   if (GNUNET_OK !=
611       GNUNET_POSTGRES_check_result (plugin->dbh,
612                                     ret,
613                                     PGRES_COMMAND_OK,
614                                     "PQexecPrepared", "insert_slave_key"))
615   {
616     return GNUNET_SYSERR;
617   }
618
619   PQclear (ret);
620   return GNUNET_OK;
621 }
622
623
624 /**
625  * Store join/leave events for a PSYC channel in order to be able to answer
626  * membership test queries later.
627  *
628  * @see GNUNET_PSYCSTORE_membership_store()
629  *
630  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
631  */
632 static int
633 postgres_membership_store (void *cls,
634                            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
635                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
636                            int did_join,
637                            uint64_t announced_at,
638                            uint64_t effective_since,
639                            uint64_t group_generation)
640 {
641   PGresult *ret;
642   struct Plugin *plugin = cls;
643
644   uint32_t idid_join = (uint32_t)did_join;
645
646   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
647
648   if (announced_at > INT64_MAX ||
649       effective_since > INT64_MAX ||
650       group_generation > INT64_MAX)
651   {
652     GNUNET_break (0);
653     return GNUNET_SYSERR;
654   }
655
656   if (GNUNET_OK != channel_key_store (plugin, channel_key)
657       || GNUNET_OK != slave_key_store (plugin, slave_key))
658     return GNUNET_SYSERR;
659
660   struct GNUNET_PQ_QueryParam params[] = {
661     GNUNET_PQ_query_param_auto_from_type (channel_key),
662     GNUNET_PQ_query_param_auto_from_type (slave_key),
663     GNUNET_PQ_query_param_uint32 (&idid_join),
664     GNUNET_PQ_query_param_uint64 (&announced_at),
665     GNUNET_PQ_query_param_uint64 (&effective_since),
666     GNUNET_PQ_query_param_uint64 (&group_generation),
667     GNUNET_PQ_query_param_end
668   };
669
670   ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_membership", params);
671   if (GNUNET_OK !=
672       GNUNET_POSTGRES_check_result (plugin->dbh,
673                                     ret,
674                                     PGRES_COMMAND_OK,
675                                     "PQexecPrepared", "insert_membership"))
676   {
677     return GNUNET_SYSERR;
678   }
679
680   PQclear (ret);
681   return GNUNET_OK;
682 }
683
684 /**
685  * Test if a member was admitted to the channel at the given message ID.
686  *
687  * @see GNUNET_PSYCSTORE_membership_test()
688  *
689  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
690  *         #GNUNET_SYSERR if there was en error.
691  */
692 static int
693 membership_test (void *cls,
694                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
695                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
696                  uint64_t message_id)
697 {
698   PGresult *res;
699   struct Plugin *plugin = cls;
700
701   uint32_t did_join = 0;
702
703   int ret = GNUNET_SYSERR;
704
705   struct GNUNET_PQ_QueryParam params_select[] = {
706     GNUNET_PQ_query_param_auto_from_type (channel_key),
707     GNUNET_PQ_query_param_auto_from_type (slave_key),
708     GNUNET_PQ_query_param_uint64 (&message_id),
709     GNUNET_PQ_query_param_end
710   };
711
712   res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select);
713   if (GNUNET_OK !=
714       GNUNET_POSTGRES_check_result (plugin->dbh,
715                                     res,
716                                     PGRES_COMMAND_OK,
717                                     "PQexecPrepared", "select_membership"))
718   {
719     return GNUNET_SYSERR;
720   }
721
722   struct GNUNET_PQ_ResultSpec results_select[] = {
723     GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
724     GNUNET_PQ_result_spec_end
725   };
726
727   switch(GNUNET_PQ_extract_result (res, results_select, 0))
728   {
729     case GNUNET_OK:
730       ret = GNUNET_YES;
731       break;
732     default:
733       ret = GNUNET_NO;
734       break;
735   }
736
737   PQclear (res);
738
739   return ret;
740 }
741
742 /**
743  * Store a message fragment sent to a channel.
744  *
745  * @see GNUNET_PSYCSTORE_fragment_store()
746  *
747  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
748  */
749 static int
750 fragment_store (void *cls,
751                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
752                 const struct GNUNET_MULTICAST_MessageHeader *msg,
753                 uint32_t psycstore_flags)
754 {
755   PGresult *res;
756   struct Plugin *plugin = cls;
757
758   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
759
760   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
761
762   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
763   uint64_t message_id = GNUNET_ntohll (msg->message_id);
764   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
765
766   uint64_t hop_counter = ntohl(msg->hop_counter);
767   uint64_t flags = ntohl(msg->flags);
768
769   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
770       message_id > INT64_MAX || group_generation > INT64_MAX)
771   {
772     LOG(GNUNET_ERROR_TYPE_ERROR,
773          "Tried to store fragment with a field > INT64_MAX: "
774          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
775          message_id, group_generation);
776     GNUNET_break (0);
777     return GNUNET_SYSERR;
778   }
779
780   if (GNUNET_OK != channel_key_store (plugin, channel_key))
781     return GNUNET_SYSERR;
782
783   struct GNUNET_PQ_QueryParam params_insert[] = {
784     GNUNET_PQ_query_param_auto_from_type (channel_key),
785     GNUNET_PQ_query_param_uint64 (&hop_counter),
786     GNUNET_PQ_query_param_auto_from_type (&msg->signature),
787     GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
788     GNUNET_PQ_query_param_uint64 (&fragment_id),
789     GNUNET_PQ_query_param_uint64 (&fragment_offset),
790     GNUNET_PQ_query_param_uint64 (&message_id),
791     GNUNET_PQ_query_param_uint64 (&group_generation),
792     GNUNET_PQ_query_param_uint64 (&flags),
793     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
794     GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
795                                                   - sizeof (*msg)),
796     GNUNET_PQ_query_param_end
797   };
798
799   res = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_fragment", params_insert);
800   if (GNUNET_OK !=
801       GNUNET_POSTGRES_check_result (plugin->dbh,
802                                     res,
803                                     PGRES_COMMAND_OK,
804                                     "PQexecPrepared", "insert_fragment"))
805     return GNUNET_SYSERR;
806
807   PQclear (res);
808   return GNUNET_OK;
809 }
810
811 /**
812  * Set additional flags for a given message.
813  *
814  * They are OR'd with any existing flags set.
815  *
816  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
817  */
818 static int
819 message_add_flags (void *cls,
820                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
821                    uint64_t message_id,
822                    uint64_t psycstore_flags)
823 {
824   PGresult *res;
825   struct Plugin *plugin = cls;
826
827   int ret = GNUNET_SYSERR;
828
829   struct GNUNET_PQ_QueryParam params_update[] = {
830     GNUNET_PQ_query_param_uint64 (&psycstore_flags),
831     GNUNET_PQ_query_param_auto_from_type (channel_key),
832     GNUNET_PQ_query_param_uint64 (&message_id),
833     GNUNET_PQ_query_param_end
834   };
835
836   res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update);
837   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
838                                       res,
839                                       PGRES_COMMAND_OK,
840                                       "PQexecPrepared", "update_message_flags"))
841     return ret;
842
843   PQclear (res);
844   return ret;
845 }
846
847
848 static int
849 fragment_row (struct Plugin *plugin,
850               const char *stmt,
851               PGresult *res,
852               GNUNET_PSYCSTORE_FragmentCallback cb,
853               void *cb_cls)
854 {
855   uint32_t hop_counter;
856   void *signature = NULL;
857   void *purpose = NULL;
858   size_t signature_size;
859   size_t purpose_size;
860
861   uint64_t fragment_id;
862   uint64_t fragment_offset;
863   uint64_t message_id;
864   uint64_t group_generation;
865   uint64_t flags;
866   void *buf;
867   size_t buf_size;
868   int ret = GNUNET_SYSERR;
869   struct GNUNET_MULTICAST_MessageHeader *mp;
870
871   uint64_t msg_flags;
872   unsigned int cnt;
873
874   struct GNUNET_PQ_ResultSpec results[] = {
875     GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
876     GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
877     GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
878     GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
879     GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
880     GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
881     GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
882     GNUNET_PQ_result_spec_uint64 ("msg_flags", &msg_flags),
883     GNUNET_PQ_result_spec_uint64 ("flags", &flags),
884     GNUNET_PQ_result_spec_variable_size ("data", &buf,
885                                          &buf_size),
886     GNUNET_PQ_result_spec_end
887   };
888
889   if (GNUNET_OK !=
890       GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
891                                     "PQexecPrepared",
892                                     stmt))
893   {
894     LOG (GNUNET_ERROR_TYPE_DEBUG,
895          "Failing fragment lookup (postgres error)\n");
896     return GNUNET_SYSERR;
897   }
898
899   cnt = PQntuples (res);
900   if (cnt == 0)
901   {
902     ret = GNUNET_NO;
903   }
904   else
905   {
906     if (GNUNET_OK != GNUNET_PQ_extract_result(res, results, 0)) {
907       PQclear (res);
908       return GNUNET_SYSERR;
909     }
910
911     mp = GNUNET_malloc (sizeof (*mp) + buf_size);
912
913     mp->header.size = htons (sizeof (*mp) + buf_size);
914     mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
915     mp->hop_counter = htonl (hop_counter);
916     GNUNET_memcpy (&mp->signature,
917                    signature,
918                    signature_size);
919     GNUNET_memcpy (&mp->purpose,
920                    purpose,
921                    purpose_size);
922     mp->fragment_id = GNUNET_htonll (fragment_id);
923     mp->fragment_offset = GNUNET_htonll (fragment_offset);
924     mp->message_id = GNUNET_htonll (message_id);
925     mp->group_generation = GNUNET_htonll (group_generation);
926     mp->flags = htonl(msg_flags);
927
928     GNUNET_memcpy (&mp[1],
929                    buf,
930                    buf_size);
931     GNUNET_PQ_cleanup_result(results);
932     ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
933   }
934
935   PQclear (res);
936   return ret;
937 }
938
939
940 static int
941 fragment_select (struct Plugin *plugin, const char *stmt,
942                  struct GNUNET_PQ_QueryParam *params,
943                  uint64_t *returned_fragments,
944                  GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
945 {
946   PGresult *res;
947   int ret = GNUNET_SYSERR;
948
949   // FIXME
950   if (NULL == plugin->dbh || NULL == stmt || NULL == params)
951   {
952     fprintf(stderr, "%p %p %p\n", plugin->dbh, stmt, params);
953     return GNUNET_SYSERR;
954   }
955
956   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
957   if (GNUNET_YES ==
958       GNUNET_POSTGRES_check_result (plugin->dbh,
959                                     res,
960                                     PGRES_COMMAND_OK,
961                                     "PQexecPrepared", stmt))
962   {
963     if (PQntuples (res) == 0)
964       ret = GNUNET_NO;
965     else
966     {
967       ret = fragment_row (plugin, stmt, res, cb, cb_cls);
968       (*returned_fragments)++;
969     }
970     PQclear (res);
971   }
972
973   return ret;
974 }
975
976 /**
977  * Retrieve a message fragment range by fragment ID.
978  *
979  * @see GNUNET_PSYCSTORE_fragment_get()
980  *
981  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
982  */
983 static int
984 fragment_get (void *cls,
985               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
986               uint64_t first_fragment_id,
987               uint64_t last_fragment_id,
988               uint64_t *returned_fragments,
989               GNUNET_PSYCSTORE_FragmentCallback cb,
990               void *cb_cls)
991 {
992   struct Plugin *plugin = cls;
993   *returned_fragments = 0;
994
995   struct GNUNET_PQ_QueryParam params_select[] = {
996     GNUNET_PQ_query_param_auto_from_type (channel_key),
997     GNUNET_PQ_query_param_uint64 (&first_fragment_id),
998     GNUNET_PQ_query_param_uint64 (&last_fragment_id),
999     GNUNET_PQ_query_param_end
1000   };
1001
1002   return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls);
1003 }
1004
1005
1006 /**
1007  * Retrieve a message fragment range by fragment ID.
1008  *
1009  * @see GNUNET_PSYCSTORE_fragment_get_latest()
1010  *
1011  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1012  */
1013 static int
1014 fragment_get_latest (void *cls,
1015                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1016                      uint64_t fragment_limit,
1017                      uint64_t *returned_fragments,
1018                      GNUNET_PSYCSTORE_FragmentCallback cb,
1019                      void *cb_cls)
1020 {
1021   struct Plugin *plugin = cls;
1022
1023   *returned_fragments = 0;
1024
1025   struct GNUNET_PQ_QueryParam params_select[] = {
1026     GNUNET_PQ_query_param_auto_from_type (channel_key),
1027     GNUNET_PQ_query_param_uint64 (&fragment_limit),
1028     GNUNET_PQ_query_param_end
1029   };
1030
1031   return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls);
1032 }
1033
1034
1035 /**
1036  * Retrieve all fragments of a message ID range.
1037  *
1038  * @see GNUNET_PSYCSTORE_message_get()
1039  *
1040  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1041  */
1042 static int
1043 message_get (void *cls,
1044              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1045              uint64_t first_message_id,
1046              uint64_t last_message_id,
1047              uint64_t fragment_limit,
1048              uint64_t *returned_fragments,
1049              GNUNET_PSYCSTORE_FragmentCallback cb,
1050              void *cb_cls)
1051 {
1052   struct Plugin *plugin = cls;
1053   *returned_fragments = 0;
1054
1055   struct GNUNET_PQ_QueryParam params_select[] = {
1056     GNUNET_PQ_query_param_auto_from_type (channel_key),
1057     GNUNET_PQ_query_param_uint64 (&first_message_id),
1058     GNUNET_PQ_query_param_uint64 (&last_message_id),
1059     GNUNET_PQ_query_param_uint64 (&fragment_limit),
1060     GNUNET_PQ_query_param_end
1061   };
1062
1063   return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls);
1064 }
1065
1066
1067 /**
1068  * Retrieve all fragments of the latest messages.
1069  *
1070  * @see GNUNET_PSYCSTORE_message_get_latest()
1071  *
1072  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1073  */
1074 static int
1075 message_get_latest (void *cls,
1076                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1077                     uint64_t message_limit,
1078                     uint64_t *returned_fragments,
1079                     GNUNET_PSYCSTORE_FragmentCallback cb,
1080                     void *cb_cls)
1081 {
1082   struct Plugin *plugin = cls;
1083   *returned_fragments = 0;
1084
1085   struct GNUNET_PQ_QueryParam params_select[] = {
1086     GNUNET_PQ_query_param_auto_from_type (channel_key),
1087     GNUNET_PQ_query_param_auto_from_type (channel_key),
1088     GNUNET_PQ_query_param_uint64 (&message_limit),
1089     GNUNET_PQ_query_param_end
1090   };
1091
1092   return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls);
1093 }
1094
1095
1096 /**
1097  * Retrieve a fragment of message specified by its message ID and fragment
1098  * offset.
1099  *
1100  * @see GNUNET_PSYCSTORE_message_get_fragment()
1101  *
1102  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1103  */
1104 static int
1105 message_get_fragment (void *cls,
1106                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1107                       uint64_t message_id,
1108                       uint64_t fragment_offset,
1109                       GNUNET_PSYCSTORE_FragmentCallback cb,
1110                       void *cb_cls)
1111 {
1112   PGresult *res;
1113   struct Plugin *plugin = cls;
1114   int ret = GNUNET_SYSERR;
1115   const char *stmt = "select_message_fragment";
1116
1117   struct GNUNET_PQ_QueryParam params_select[] = {
1118     GNUNET_PQ_query_param_auto_from_type (channel_key),
1119     GNUNET_PQ_query_param_uint64 (&message_id),
1120     GNUNET_PQ_query_param_uint64 (&fragment_offset),
1121     GNUNET_PQ_query_param_end
1122   };
1123
1124   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1125   if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh,
1126                                       res,
1127                                       PGRES_COMMAND_OK,
1128                                       "PQexecPrepared", stmt))
1129   {
1130     if (PQntuples (res) == 0)
1131       ret = GNUNET_NO;
1132     else
1133       ret = fragment_row (plugin, stmt, res, cb, cb_cls);
1134
1135     PQclear (res);
1136   }
1137
1138   return ret;
1139 }
1140
1141 /**
1142  * Retrieve the max. values of message counters for a channel.
1143  *
1144  * @see GNUNET_PSYCSTORE_counters_get()
1145  *
1146  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1147  */
1148 static int
1149 counters_message_get (void *cls,
1150                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1151                       uint64_t *max_fragment_id,
1152                       uint64_t *max_message_id,
1153                       uint64_t *max_group_generation)
1154 {
1155   PGresult *res;
1156   struct Plugin *plugin = cls;
1157
1158   const char *stmt = "select_counters_message";
1159
1160   struct GNUNET_PQ_QueryParam params_select[] = {
1161     GNUNET_PQ_query_param_auto_from_type (channel_key),
1162     GNUNET_PQ_query_param_end
1163   };
1164
1165   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1166   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1167                                       res,
1168                                       PGRES_COMMAND_OK,
1169                                       "PQexecPrepared", stmt))
1170   {
1171     return GNUNET_SYSERR;
1172   }
1173
1174   struct GNUNET_PQ_ResultSpec results_select[] = {
1175     GNUNET_PQ_result_spec_uint64 ("max_fragment_id", max_fragment_id),
1176     GNUNET_PQ_result_spec_uint64 ("max_message_id", max_message_id),
1177     GNUNET_PQ_result_spec_uint64 ("max_group_generation", max_group_generation),
1178     GNUNET_PQ_result_spec_end
1179   };
1180
1181   if (GNUNET_OK != GNUNET_PQ_extract_result (res,
1182                                   results_select, 0))
1183   {
1184     PQclear (res);
1185     return GNUNET_SYSERR;
1186   }
1187
1188   GNUNET_PQ_cleanup_result(results_select);
1189   PQclear (res);
1190
1191   return GNUNET_OK;
1192 }
1193
1194 /**
1195  * Retrieve the max. values of state counters for a channel.
1196  *
1197  * @see GNUNET_PSYCSTORE_counters_get()
1198  *
1199  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1200  */
1201 static int
1202 counters_state_get (void *cls,
1203                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1204                     uint64_t *max_state_message_id)
1205 {
1206   PGresult *res;
1207   struct Plugin *plugin = cls;
1208
1209   const char *stmt = "select_counters_state";
1210
1211   int ret = GNUNET_SYSERR;
1212
1213   struct GNUNET_PQ_QueryParam params_select[] = {
1214     GNUNET_PQ_query_param_auto_from_type (channel_key),
1215     GNUNET_PQ_query_param_end
1216   };
1217
1218   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1219   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1220                                       res,
1221                                       PGRES_COMMAND_OK,
1222                                       "PQexecPrepared", stmt))
1223   {
1224     return GNUNET_SYSERR;
1225   }
1226
1227   struct GNUNET_PQ_ResultSpec results_select[] = {
1228     GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1229     GNUNET_PQ_result_spec_end
1230   };
1231
1232   ret = GNUNET_PQ_extract_result (res,
1233                                   results_select, 0);
1234
1235   if (GNUNET_OK != ret)
1236   {
1237     PQclear (res);
1238     return GNUNET_SYSERR;
1239   }
1240
1241   GNUNET_PQ_cleanup_result(results_select);
1242   PQclear (res);
1243
1244   return ret;
1245 }
1246
1247
1248 /**
1249  * Assign a value to a state variable.
1250  *
1251  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1252  */
1253 static int
1254 state_assign (struct Plugin *plugin, const char *stmt,
1255               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1256               const char *name, const void *value, size_t value_size)
1257 {
1258   PGresult *res;
1259
1260   struct GNUNET_PQ_QueryParam params[] = {
1261     GNUNET_PQ_query_param_auto_from_type (channel_key),
1262     GNUNET_PQ_query_param_string (name),
1263     GNUNET_PQ_query_param_auto_from_type (value),
1264     GNUNET_PQ_query_param_end
1265   };
1266
1267   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
1268   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1269                                       res,
1270                                       PGRES_COMMAND_OK,
1271                                       "PQexecPrepared", stmt))
1272   {
1273     return GNUNET_SYSERR;
1274   }
1275
1276   PQclear (res);
1277
1278   return GNUNET_OK;
1279 }
1280
1281
1282 static int
1283 update_message_id (struct Plugin *plugin, const char *stmt,
1284                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1285                    uint64_t message_id)
1286 {
1287   PGresult *res;
1288
1289   struct GNUNET_PQ_QueryParam params[] = {
1290     GNUNET_PQ_query_param_uint64 (&message_id),
1291     GNUNET_PQ_query_param_auto_from_type (channel_key),
1292     GNUNET_PQ_query_param_end
1293   };
1294
1295   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
1296   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1297                                       res,
1298                                       PGRES_COMMAND_OK,
1299                                       "PQexecPrepared", stmt))
1300   {
1301     return GNUNET_SYSERR;
1302   }
1303
1304   PQclear (res);
1305
1306   return GNUNET_OK;
1307 }
1308
1309
1310 /**
1311  * Begin modifying current state.
1312  */
1313 static int
1314 state_modify_begin (void *cls,
1315                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1316                     uint64_t message_id, uint64_t state_delta)
1317 {
1318   struct Plugin *plugin = cls;
1319
1320   if (state_delta > 0)
1321   {
1322     /**
1323      * We can only apply state modifiers in the current message if modifiers in
1324      * the previous stateful message (message_id - state_delta) were already
1325      * applied.
1326      */
1327
1328     uint64_t max_state_message_id = 0;
1329     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1330     switch (ret)
1331     {
1332     case GNUNET_OK:
1333     case GNUNET_NO: // no state yet
1334       ret = GNUNET_OK;
1335       break;
1336     default:
1337       return ret;
1338     }
1339
1340     if (max_state_message_id < message_id - state_delta)
1341       return GNUNET_NO; /* some stateful messages not yet applied */
1342     else if (message_id - state_delta < max_state_message_id)
1343       return GNUNET_NO; /* changes already applied */
1344   }
1345
1346   if (TRANSACTION_NONE != plugin->transaction)
1347   {
1348     /** @todo FIXME: wait for other transaction to finish  */
1349     return GNUNET_SYSERR;
1350   }
1351   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1352 }
1353
1354
1355 /**
1356  * Set the current value of state variable.
1357  *
1358  * @see GNUNET_PSYCSTORE_state_modify()
1359  *
1360  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1361  */
1362 static int
1363 state_modify_op (void *cls,
1364                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1365                  enum GNUNET_PSYC_Operator op,
1366                  const char *name, const void *value, size_t value_size)
1367 {
1368   struct Plugin *plugin = cls;
1369   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1370
1371   switch (op)
1372   {
1373   case GNUNET_PSYC_OP_ASSIGN:
1374     return state_assign (plugin, "insert_state_current", channel_key,
1375                          name, value, value_size);
1376
1377   default: /** @todo implement more state operations */
1378     GNUNET_break (0);
1379     return GNUNET_SYSERR;
1380   }
1381 }
1382
1383
1384 /**
1385  * End modifying current state.
1386  */
1387 static int
1388 state_modify_end (void *cls,
1389                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1390                   uint64_t message_id)
1391 {
1392   struct Plugin *plugin = cls;
1393   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1394
1395   return
1396     GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1397     && GNUNET_OK == update_message_id (plugin,
1398                                        "update_max_state_message_id",
1399                                        channel_key, message_id)
1400     && GNUNET_OK == transaction_commit (plugin)
1401     ? GNUNET_OK : GNUNET_SYSERR;
1402 }
1403
1404
1405 /**
1406  * Begin state synchronization.
1407  */
1408 static int
1409 state_sync_begin (void *cls,
1410                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1411 {
1412   struct Plugin *plugin = cls;
1413   return exec_channel (plugin, "delete_state_sync", channel_key);
1414 }
1415
1416
1417 /**
1418  * Assign current value of a state variable.
1419  *
1420  * @see GNUNET_PSYCSTORE_state_modify()
1421  *
1422  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1423  */
1424 static int
1425 state_sync_assign (void *cls,
1426                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1427                 const char *name, const void *value, size_t value_size)
1428 {
1429   struct Plugin *plugin = cls;
1430   return state_assign (plugin, "insert_state_sync", channel_key,
1431                        name, value, value_size);
1432 }
1433
1434
1435 /**
1436  * End modifying current state.
1437  */
1438 static int
1439 state_sync_end (void *cls,
1440                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1441                 uint64_t max_state_message_id,
1442                 uint64_t state_hash_message_id)
1443 {
1444   struct Plugin *plugin = cls;
1445   int ret = GNUNET_SYSERR;
1446
1447   if (TRANSACTION_NONE != plugin->transaction)
1448   {
1449     /** @todo FIXME: wait for other transaction to finish  */
1450     return GNUNET_SYSERR;
1451   }
1452
1453   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1454     && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1455     && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1456                                   channel_key)
1457     && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1458                                   channel_key)
1459     && GNUNET_OK == update_message_id (plugin,
1460                                        "update_state_hash_message_id",
1461                                        channel_key, state_hash_message_id)
1462     && GNUNET_OK == update_message_id (plugin,
1463                                        "update_max_state_message_id",
1464                                        channel_key, max_state_message_id)
1465     && GNUNET_OK == transaction_commit (plugin)
1466     ? ret = GNUNET_OK
1467     : transaction_rollback (plugin);
1468   return ret;
1469 }
1470
1471
1472 /**
1473  * Delete the whole state.
1474  *
1475  * @see GNUNET_PSYCSTORE_state_reset()
1476  *
1477  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1478  */
1479 static int
1480 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1481 {
1482   struct Plugin *plugin = cls;
1483   return exec_channel (plugin, "delete_state", channel_key);
1484 }
1485
1486
1487 /**
1488  * Update signed values of state variables in the state store.
1489  *
1490  * @see GNUNET_PSYCSTORE_state_hash_update()
1491  *
1492  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1493  */
1494 static int
1495 state_update_signed (void *cls,
1496                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1497 {
1498   struct Plugin *plugin = cls;
1499   return exec_channel (plugin, "update_state_signed", channel_key);
1500 }
1501
1502
1503 /**
1504  * Retrieve a state variable by name.
1505  *
1506  * @see GNUNET_PSYCSTORE_state_get()
1507  *
1508  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1509  */
1510 static int
1511 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1512            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1513 {
1514   PGresult *res;
1515
1516   struct Plugin *plugin = cls;
1517   int ret = GNUNET_SYSERR;
1518
1519   const char *stmt = "select_state_one";
1520
1521   struct GNUNET_PQ_QueryParam params_select[] = {
1522     GNUNET_PQ_query_param_auto_from_type (channel_key),
1523     GNUNET_PQ_query_param_string (name),
1524     GNUNET_PQ_query_param_end
1525   };
1526
1527   void *value_current = NULL;
1528   size_t value_size = 0;
1529
1530   struct GNUNET_PQ_ResultSpec results[] = {
1531     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1532     GNUNET_PQ_result_spec_end
1533   };
1534
1535   res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1536   if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1537                                       res,
1538                                       PGRES_COMMAND_OK,
1539                                       "PQexecPrepared", stmt))
1540   {
1541     return GNUNET_SYSERR;
1542   }
1543
1544   if (PQntuples (res) == 0)
1545   {
1546     PQclear (res);
1547     ret = GNUNET_NO;
1548   }
1549
1550   ret = GNUNET_PQ_extract_result (res,
1551                                   results, 0);
1552
1553   if (GNUNET_OK != ret)
1554   {
1555     PQclear (res);
1556     return GNUNET_SYSERR;
1557   }
1558
1559   ret = cb (cb_cls, name, value_current,
1560             value_size);
1561
1562   GNUNET_PQ_cleanup_result(results);
1563   PQclear (res);
1564
1565   return ret;
1566 }
1567
1568
1569 /**
1570  * Retrieve all state variables for a channel with the given prefix.
1571  *
1572  * @see GNUNET_PSYCSTORE_state_get_prefix()
1573  *
1574  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1575  */
1576 static int
1577 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1578                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1579                   void *cb_cls)
1580 {
1581   PGresult *res;
1582   struct Plugin *plugin = cls;
1583   int ret = GNUNET_SYSERR;
1584
1585   const char *stmt = "select_state_prefix";
1586
1587   uint32_t name_len = (uint32_t) strlen (name);
1588
1589   struct GNUNET_PQ_QueryParam params_select[] = {
1590     GNUNET_PQ_query_param_auto_from_type (channel_key),
1591     GNUNET_PQ_query_param_string (name),
1592     GNUNET_PQ_query_param_uint32 (&name_len),
1593     GNUNET_PQ_query_param_string (name),
1594     GNUNET_PQ_query_param_end
1595   };
1596
1597   char *name2 = "";
1598   void *value_current = NULL;
1599   size_t value_size = 0;
1600
1601   struct GNUNET_PQ_ResultSpec results[] = {
1602     GNUNET_PQ_result_spec_string ("name", &name2),
1603     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1604     GNUNET_PQ_result_spec_end
1605   };
1606
1607   do
1608   {
1609     res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1610     if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1611                                       res,
1612                                       PGRES_COMMAND_OK,
1613                                       "PQexecPrepared", stmt))
1614     {
1615       break;
1616     }
1617
1618     if (PQntuples (res) == 0)
1619     {
1620       PQclear (res);
1621       ret = GNUNET_NO;
1622       break;
1623     }
1624
1625     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0))
1626     {
1627       PQclear (res);
1628       break;
1629     }
1630
1631     ret = cb (cb_cls, (const char *) name2,
1632               value_current,
1633               value_size);
1634     GNUNET_PQ_cleanup_result(results);
1635   }
1636   while (ret == GNUNET_YES);
1637
1638   PQclear (res);
1639
1640   return ret;
1641 }
1642
1643
1644 /**
1645  * Retrieve all signed state variables for a channel.
1646  *
1647  * @see GNUNET_PSYCSTORE_state_get_signed()
1648  *
1649  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1650  */
1651 static int
1652 state_get_signed (void *cls,
1653                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1654                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1655 {
1656   PGresult *res;
1657   struct Plugin *plugin = cls;
1658   int ret = GNUNET_SYSERR;
1659
1660   const char *stmt = "select_state_signed";
1661
1662   struct GNUNET_PQ_QueryParam params_select[] = {
1663     GNUNET_PQ_query_param_auto_from_type (channel_key),
1664     GNUNET_PQ_query_param_end
1665   };
1666
1667   char *name = "";
1668   void *value_signed = NULL;
1669   size_t value_size = 0;
1670
1671   struct GNUNET_PQ_ResultSpec results[] = {
1672     GNUNET_PQ_result_spec_string ("name", &name),
1673     GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size),
1674     GNUNET_PQ_result_spec_end
1675   };
1676
1677   do
1678   {
1679     res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1680     if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1681                                       res,
1682                                       PGRES_COMMAND_OK,
1683                                       "PQexecPrepared", stmt))
1684     {
1685       break;
1686     }
1687
1688     if (PQntuples (res) == 0)
1689     {
1690       PQclear (res);
1691       ret = GNUNET_NO;
1692       break;
1693     }
1694
1695     if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0))
1696     {
1697       PQclear (res);
1698       break;
1699     }
1700
1701     ret = cb (cb_cls, (const char *) name,
1702               value_signed,
1703               value_size);
1704
1705     GNUNET_PQ_cleanup_result(results);
1706   }
1707   while (ret == GNUNET_YES);
1708
1709   PQclear (res);
1710
1711   return ret;
1712 }
1713
1714
1715 /**
1716  * Entry point for the plugin.
1717  *
1718  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1719  * @return NULL on error, otherwise the plugin context
1720  */
1721 void *
1722 libgnunet_plugin_psycstore_postgres_init (void *cls)
1723 {
1724   static struct Plugin plugin;
1725   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1726   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1727
1728   if (NULL != plugin.cfg)
1729     return NULL;                /* can only initialize once! */
1730   memset (&plugin, 0, sizeof (struct Plugin));
1731   plugin.cfg = cfg;
1732   if (GNUNET_OK != database_setup (&plugin))
1733   {
1734     database_shutdown (&plugin);
1735     return NULL;
1736   }
1737   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1738   api->cls = &plugin;
1739   api->membership_store = &postgres_membership_store;
1740   api->membership_test = &membership_test;
1741   api->fragment_store = &fragment_store;
1742   api->message_add_flags = &message_add_flags;
1743   api->fragment_get = &fragment_get;
1744   api->fragment_get_latest = &fragment_get_latest;
1745   api->message_get = &message_get;
1746   api->message_get_latest = &message_get_latest;
1747   api->message_get_fragment = &message_get_fragment;
1748   api->counters_message_get = &counters_message_get;
1749   api->counters_state_get = &counters_state_get;
1750   api->state_modify_begin = &state_modify_begin;
1751   api->state_modify_op = &state_modify_op;
1752   api->state_modify_end = &state_modify_end;
1753   api->state_sync_begin = &state_sync_begin;
1754   api->state_sync_assign = &state_sync_assign;
1755   api->state_sync_end = &state_sync_end;
1756   api->state_reset = &state_reset;
1757   api->state_update_signed = &state_update_signed;
1758   api->state_get = &state_get;
1759   api->state_get_prefix = &state_get_prefix;
1760   api->state_get_signed = &state_get_signed;
1761
1762   LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1763   return api;
1764 }
1765
1766
1767 /**
1768  * Exit point from the plugin.
1769  *
1770  * @param cls The plugin context (as returned by "init")
1771  * @return Always NULL
1772  */
1773 void *
1774 libgnunet_plugin_psycstore_postgres_done (void *cls)
1775 {
1776   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1777   struct Plugin *plugin = api->cls;
1778
1779   database_shutdown (plugin);
1780   plugin->cfg = NULL;
1781   GNUNET_free (api);
1782   LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1783   return NULL;
1784 }
1785
1786 /* end of plugin_psycstore_postgres.c */