src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_postgres.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2016 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file psycstore/plugin_psycstore_postgres.c
23  * @brief PostgresQL-based psycstore backend
24  * @author Daniel Golle
25  * @author Gabor X Toth
26  * @author Christian Grothoff
27  * @author Christophe Genevey
28  * @author Jeffrey Burdges
29  */
30
31 #include "platform.h"
32 #include "gnunet_psycstore_plugin.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_crypto_lib.h"
36 #include "gnunet_psyc_util_lib.h"
37 #include "psycstore.h"
38 #include "gnunet_pq_lib.h"
39
40 /**
41  * After how many ms "busy" should a DB operation fail for good?  A
42  * low value makes sure that we are more responsive to requests
43  * (especially PUTs).  A high value guarantees a higher success rate
44  * (SELECTs in iterate can take several seconds despite LIMIT=1).
45  *
46  * The default value of 1s should ensure that users do not experience
47  * huge latencies while at the same time allowing operations to
48  * succeed with reasonable probability.
49  */
50 #define BUSY_TIMEOUT_MS 1000
51
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
55
56 enum Transactions {
57   TRANSACTION_NONE = 0,
58   TRANSACTION_STATE_MODIFY,
59   TRANSACTION_STATE_SYNC,
60 };
61
62 /**
63  * Context for all functions in this plugin.
64  */
65 struct Plugin
66 {
67
68   const struct GNUNET_CONFIGURATION_Handle *cfg;
69
70   /**
71    * Native Postgres database handle.
72    */
73   PGconn *dbh;
74
75   enum Transactions transaction;
76
77   void *cls;
78 };
79
80
81 /**
82  * Initialize the database connections and associated
83  * data structures (create tables and indices
84  * as needed as well).
85  *
86  * @param plugin the plugin context (state for this module)
87  * @return #GNUNET_OK on success
88  */
89 static int
90 database_setup (struct Plugin *plugin)
91 {
92   struct GNUNET_PQ_ExecuteStatement es[] = {
93     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n"
94                             " id SERIAL,\n"
95                             " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
96                             " max_state_message_id BIGINT,\n"
97                             " state_hash_message_id BIGINT,\n"
98                             " PRIMARY KEY(id)\n"
99                             ")"
100                             "WITH OIDS"),
101     GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
102                             " ON channels (pub_key)"),
103     GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
104                             " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
105                             "RETURNS NULL ON NULL INPUT"),
106     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n"
107                             " id SERIAL,\n"
108                             " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
109                             " PRIMARY KEY(id)\n"
110                             ")"
111                             "WITH OIDS"),
112     GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
113                             " ON slaves (pub_key)"),
114     GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
115                             " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
116                             "RETURNS NULL ON NULL INPUT"),
117     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n"
118                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
119                             "  slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
120                             "  did_join INT NOT NULL,\n"
121                             "  announced_at BIGINT NOT NULL,\n"
122                             "  effective_since BIGINT NOT NULL,\n"
123                             "  group_generation BIGINT NOT NULL\n"
124                             ")"
125                             "WITH OIDS"),
126     GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
127                             "ON membership (channel_id, slave_id)"),
128     /** @todo messages table: add method_name column */
129     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n"
130                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
131                             "  hop_counter INT NOT NULL,\n"
132                             "  signature BYTEA CHECK (LENGTH(signature)=64),\n"
133                             "  purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
134                             "  fragment_id BIGINT NOT NULL,\n"
135                             "  fragment_offset BIGINT NOT NULL,\n"
136                             "  message_id BIGINT NOT NULL,\n"
137                             "  group_generation BIGINT NOT NULL,\n"
138                             "  multicast_flags INT NOT NULL,\n"
139                             "  psycstore_flags INT NOT NULL,\n"
140                             "  data BYTEA,\n"
141                             "  PRIMARY KEY (channel_id, fragment_id),\n"
142                             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
143                             ")"
144                             "WITH OIDS"),
145     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n"
146                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
147                             "  name TEXT NOT NULL,\n"
148                             "  value_current BYTEA,\n"
149                             "  value_signed BYTEA,\n"
150                             "  PRIMARY KEY (channel_id, name)\n"
151                             ")"
152                             "WITH OIDS"),
153     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n"
154                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
155                             "  name TEXT NOT NULL,\n"
156                             "  value BYTEA,\n"
157                             "  PRIMARY KEY (channel_id, name)\n"
158                             ")"
159                             "WITH OIDS"),
160     GNUNET_PQ_EXECUTE_STATEMENT_END
161   };
162
163   /* Open database and precompile statements */
164   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg,
165                                             "psycstore-postgres");
166   if (NULL == plugin->dbh)
167     return GNUNET_SYSERR;
168   if (GNUNET_OK !=
169       GNUNET_PQ_exec_statements (plugin->dbh,
170                                  es))
171   {
172     PQfinish (plugin->dbh);
173     plugin->dbh = NULL;
174     return GNUNET_SYSERR;
175   }
176
177   /* Prepare statements */
178   {
179     struct GNUNET_PQ_PreparedStatement ps[] = {
180       GNUNET_PQ_make_prepare ("transaction_begin",
181                               "BEGIN", 0),
182       GNUNET_PQ_make_prepare ("transaction_commit",
183                               "COMMIT", 0),
184       GNUNET_PQ_make_prepare ("transaction_rollback",
185                               "ROLLBACK", 0),
186       GNUNET_PQ_make_prepare ("insert_channel_key",
187                               "INSERT INTO channels (pub_key) VALUES ($1)"
188                               " ON CONFLICT DO NOTHING", 1),
189       GNUNET_PQ_make_prepare ("insert_slave_key",
190                               "INSERT INTO slaves (pub_key) VALUES ($1)"
191                               " ON CONFLICT DO NOTHING", 1),
192       GNUNET_PQ_make_prepare ("insert_membership",
193                               "INSERT INTO membership\n"
194                               " (channel_id, slave_id, did_join, announced_at,\n"
195                               "  effective_since, group_generation)\n"
196                               "VALUES (get_chan_id($1),\n"
197                               "        get_slave_id($2),\n"
198                               "        $3, $4, $5, $6)", 6),
199       GNUNET_PQ_make_prepare ("select_membership",
200                               "SELECT did_join FROM membership\n"
201                               "WHERE channel_id = get_chan_id($1)\n"
202                               "      AND slave_id = get_slave_id($2)\n"
203                               "      AND effective_since <= $3 AND did_join = 1\n"
204                               "ORDER BY announced_at DESC LIMIT 1", 3),
205       GNUNET_PQ_make_prepare ("insert_fragment",
206                               "INSERT INTO messages\n"
207                               " (channel_id, hop_counter, signature, purpose,\n"
208                               "  fragment_id, fragment_offset, message_id,\n"
209                               "  group_generation, multicast_flags, psycstore_flags, data)\n"
210                               "VALUES (get_chan_id($1),\n"
211                               "        $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
212                               "ON CONFLICT DO NOTHING", 11),
213       GNUNET_PQ_make_prepare ("update_message_flags",
214                               "UPDATE messages\n"
215                               "SET psycstore_flags = psycstore_flags | $1\n"
216                               "WHERE channel_id = get_chan_id($2) \n"
217                               "      AND message_id = $3 AND fragment_offset = 0", 3),
218       GNUNET_PQ_make_prepare ("select_fragments",
219                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
220                               "       fragment_offset, message_id, group_generation,\n"
221                               "       multicast_flags, psycstore_flags, data\n"
222                               "FROM messages\n"
223                               "WHERE channel_id = get_chan_id($1) \n"
224                               "      AND $2 <= fragment_id AND fragment_id <= $3", 3),
225       /** @todo select_messages: add method_prefix filter */
226       GNUNET_PQ_make_prepare ("select_messages",
227                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
228                               "       fragment_offset, message_id, group_generation,\n"
229                               "       multicast_flags, psycstore_flags, data\n"
230                               "FROM messages\n"
231                               "WHERE channel_id = get_chan_id($1) \n"
232                               "      AND $2 <= message_id AND message_id <= $3\n"
233                               "LIMIT $4;", 4),
234       /** @todo select_latest_messages: add method_prefix filter */
235       GNUNET_PQ_make_prepare ("select_latest_fragments",
236                               "SELECT  rev.hop_counter AS hop_counter,\n"
237                               "        rev.signature AS signature,\n"
238                               "        rev.purpose AS purpose,\n"
239                               "        rev.fragment_id AS fragment_id,\n"
240                               "        rev.fragment_offset AS fragment_offset,\n"
241                               "        rev.message_id AS message_id,\n"
242                               "        rev.group_generation AS group_generation,\n"
243                               "        rev.multicast_flags AS multicast_flags,\n"
244                               "        rev.psycstore_flags AS psycstore_flags,\n"
245                               "        rev.data AS data\n"
246                               " FROM\n"
247                               " (SELECT hop_counter, signature, purpose, fragment_id,\n"
248                               "        fragment_offset, message_id, group_generation,\n"
249                               "        multicast_flags, psycstore_flags, data \n"
250                               "  FROM messages\n"
251                               "  WHERE channel_id = get_chan_id($1) \n"
252                               "  ORDER BY fragment_id DESC\n"
253                               "  LIMIT $2) AS rev\n"
254                               " ORDER BY rev.fragment_id;", 2),
255       GNUNET_PQ_make_prepare ("select_latest_messages",
256                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
257                               "       fragment_offset, message_id, group_generation,\n"
258                               "        multicast_flags, psycstore_flags, data\n"
259                               "FROM messages\n"
260                               "WHERE channel_id = get_chan_id($1)\n"
261                               "      AND message_id IN\n"
262                               "      (SELECT message_id\n"
263                               "       FROM messages\n"
264                               "       WHERE channel_id = get_chan_id($2) \n"
265                               "       GROUP BY message_id\n"
266                               "       ORDER BY message_id\n"
267                               "       DESC LIMIT $3)\n"
268                               "ORDER BY fragment_id", 3),
269       GNUNET_PQ_make_prepare ("select_message_fragment",
270                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
271                               "       fragment_offset, message_id, group_generation,\n"
272                               "       multicast_flags, psycstore_flags, data\n"
273                               "FROM messages\n"
274                               "WHERE channel_id = get_chan_id($1) \n"
275                               "      AND message_id = $2 AND fragment_offset = $3", 3),
276       GNUNET_PQ_make_prepare ("select_counters_message",
277                               "SELECT fragment_id, message_id, group_generation\n"
278                               "FROM messages\n"
279                               "WHERE channel_id = get_chan_id($1)\n"
280                               "ORDER BY fragment_id DESC LIMIT 1", 1),
281       GNUNET_PQ_make_prepare ("select_counters_state",
282                               "SELECT max_state_message_id\n"
283                               "FROM channels\n"
284                               "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1),
285       GNUNET_PQ_make_prepare ("update_max_state_message_id",
286                               "UPDATE channels\n"
287                               "SET max_state_message_id = $1\n"
288                               "WHERE pub_key = $2", 2),
289
290       GNUNET_PQ_make_prepare ("update_state_hash_message_id",
291                               "UPDATE channels\n"
292                               "SET state_hash_message_id = $1\n"
293                               "WHERE pub_key = $2", 2),
294       GNUNET_PQ_make_prepare ("insert_state_current",
295                               "INSERT INTO state\n"
296                               "  (channel_id, name, value_current, value_signed)\n"
297                               "SELECT new.channel_id, new.name,\n"
298                               "       new.value_current, old.value_signed\n"
299                               "FROM (SELECT get_chan_id($1) AS channel_id,\n"
300                               "             $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
301                               "LEFT JOIN (SELECT channel_id, name, value_signed\n"
302                               "           FROM state) AS old\n"
303                               "ON new.channel_id = old.channel_id AND new.name = old.name\n"
304                               "ON CONFLICT (channel_id, name)\n"
305                               "   DO UPDATE SET value_current = EXCLUDED.value_current,\n"
306                               "                 value_signed = EXCLUDED.value_signed", 3),
307       GNUNET_PQ_make_prepare ("delete_state_empty",
308                               "DELETE FROM state\n"
309                               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
310                               "      AND (value_current IS NULL OR length(value_current) = 0)\n"
311                               "      AND (value_signed IS NULL OR length(value_signed) = 0)", 1),
312       GNUNET_PQ_make_prepare ("update_state_signed",
313                               "UPDATE state\n"
314                               "SET value_signed = value_current\n"
315                               "WHERE channel_id = get_chan_id($1) ", 1),
316       GNUNET_PQ_make_prepare ("delete_state",
317                               "DELETE FROM state\n"
318                               "WHERE channel_id = get_chan_id($1) ", 1),
319       GNUNET_PQ_make_prepare ("insert_state_sync",
320                               "INSERT INTO state_sync (channel_id, name, value)\n"
321                               "VALUES (get_chan_id($1), $2, $3)", 3),
322       GNUNET_PQ_make_prepare ("insert_state_from_sync",
323                               "INSERT INTO state\n"
324                               " (channel_id, name, value_current, value_signed)\n"
325                               "SELECT channel_id, name, value, value\n"
326                               "FROM state_sync\n"
327                               "WHERE channel_id = get_chan_id($1)", 1),
328       GNUNET_PQ_make_prepare ("delete_state_sync",
329                               "DELETE FROM state_sync\n"
330                               "WHERE channel_id = get_chan_id($1)", 1),
331       GNUNET_PQ_make_prepare ("select_state_one",
332                               "SELECT value_current\n"
333                               "FROM state\n"
334                               "WHERE channel_id = get_chan_id($1)\n"
335                               "      AND name = $2", 2),
336       GNUNET_PQ_make_prepare ("select_state_prefix",
337                               "SELECT name, value_current\n"
338                               "FROM state\n"
339                               "WHERE channel_id = get_chan_id($1)\n"
340                               "      AND (name = $2 OR substr(name, 1, $3) = $4)", 4),
341       GNUNET_PQ_make_prepare ("select_state_signed",
342                               "SELECT name, value_signed\n"
343                               "FROM state\n"
344                               "WHERE channel_id = get_chan_id($1)\n"
345                               "      AND value_signed IS NOT NULL", 1),
346       GNUNET_PQ_PREPARED_STATEMENT_END
347     };
348
349     if (GNUNET_OK !=
350         GNUNET_PQ_prepare_statements (plugin->dbh,
351                                       ps))
352     {
353       PQfinish (plugin->dbh);
354       plugin->dbh = NULL;
355       return GNUNET_SYSERR;
356     }
357   }
358
359   return GNUNET_OK;
360 }
361
362
363 /**
364  * Shutdown database connection and associate data
365  * structures.
366  * @param plugin the plugin context (state for this module)
367  */
368 static void
369 database_shutdown (struct Plugin *plugin)
370 {
371   PQfinish (plugin->dbh);
372   plugin->dbh = NULL;
373 }
374
375
376 /**
377  * Execute a prepared statement with a @a channel_key argument.
378  *
379  * @param plugin Plugin handle.
380  * @param stmt Statement to execute.
381  * @param channel_key Public key of the channel.
382  *
383  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
384  */
385 static int
386 exec_channel (struct Plugin *plugin, const char *stmt,
387               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
388 {
389   struct GNUNET_PQ_QueryParam params[] = {
390     GNUNET_PQ_query_param_auto_from_type (channel_key),
391     GNUNET_PQ_query_param_end
392   };
393
394   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
395       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
396     return GNUNET_SYSERR;
397
398   return GNUNET_OK;
399 }
400
401
402 /**
403  * Begin a transaction.
404  */
405 static int
406 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
407 {
408   struct GNUNET_PQ_QueryParam params[] = {
409     GNUNET_PQ_query_param_end
410   };
411
412   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
413       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params))
414     return GNUNET_SYSERR;
415
416   plugin->transaction = transaction;
417   return GNUNET_OK;
418 }
419
420
421 /**
422  * Commit current transaction.
423  */
424 static int
425 transaction_commit (struct Plugin *plugin)
426 {
427   struct GNUNET_PQ_QueryParam params[] = {
428     GNUNET_PQ_query_param_end
429   };
430
431   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
432       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params))
433     return GNUNET_SYSERR;
434
435   plugin->transaction = TRANSACTION_NONE;
436   return GNUNET_OK;
437 }
438
439
440 /**
441  * Roll back current transaction.
442  */
443 static int
444 transaction_rollback (struct Plugin *plugin)
445 {
446   struct GNUNET_PQ_QueryParam params[] = {
447     GNUNET_PQ_query_param_end
448   };
449
450   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
451       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params))
452     return GNUNET_SYSERR;
453
454   plugin->transaction = TRANSACTION_NONE;
455   return GNUNET_OK;
456 }
457
458
459 static int
460 channel_key_store (struct Plugin *plugin,
461                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
462 {
463   struct GNUNET_PQ_QueryParam params[] = {
464     GNUNET_PQ_query_param_auto_from_type (channel_key),
465     GNUNET_PQ_query_param_end
466   };
467
468   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
469       GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
470                                           "insert_channel_key",
471                                           params))
472     return GNUNET_SYSERR;
473
474   return GNUNET_OK;
475 }
476
477
478 static int
479 slave_key_store (struct Plugin *plugin,
480                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
481 {
482   struct GNUNET_PQ_QueryParam params[] = {
483     GNUNET_PQ_query_param_auto_from_type (slave_key),
484     GNUNET_PQ_query_param_end
485   };
486
487   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
488       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params))
489     return GNUNET_SYSERR;
490
491   return GNUNET_OK;
492 }
493
494
495 /**
496  * Store join/leave events for a PSYC channel in order to be able to answer
497  * membership test queries later.
498  *
499  * @see GNUNET_PSYCSTORE_membership_store()
500  *
501  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
502  */
503 static int
504 postgres_membership_store (void *cls,
505                            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
506                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
507                            int did_join,
508                            uint64_t announced_at,
509                            uint64_t effective_since,
510                            uint64_t group_generation)
511 {
512   struct Plugin *plugin = cls;
513   uint32_t idid_join = (uint32_t) did_join;
514
515   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
516
517   if ( (announced_at > INT64_MAX) ||
518        (effective_since > INT64_MAX) ||
519        (group_generation > INT64_MAX) )
520   {
521     GNUNET_break (0);
522     return GNUNET_SYSERR;
523   }
524
525   if ( (GNUNET_OK !=
526         channel_key_store (plugin, channel_key)) ||
527        (GNUNET_OK !=
528         slave_key_store (plugin, slave_key)) )
529     return GNUNET_SYSERR;
530
531   struct GNUNET_PQ_QueryParam params[] = {
532     GNUNET_PQ_query_param_auto_from_type (channel_key),
533     GNUNET_PQ_query_param_auto_from_type (slave_key),
534     GNUNET_PQ_query_param_uint32 (&idid_join),
535     GNUNET_PQ_query_param_uint64 (&announced_at),
536     GNUNET_PQ_query_param_uint64 (&effective_since),
537     GNUNET_PQ_query_param_uint64 (&group_generation),
538     GNUNET_PQ_query_param_end
539   };
540
541   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
542       GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
543                                           "insert_membership",
544                                           params))
545     return GNUNET_SYSERR;
546
547   return GNUNET_OK;
548 }
549
550 /**
551  * Test if a member was admitted to the channel at the given message ID.
552  *
553  * @see GNUNET_PSYCSTORE_membership_test()
554  *
555  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
556  *         #GNUNET_SYSERR if there was en error.
557  */
558 static int
559 membership_test (void *cls,
560                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
561                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
562                  uint64_t message_id)
563 {
564   struct Plugin *plugin = cls;
565
566   uint32_t did_join = 0;
567
568   struct GNUNET_PQ_QueryParam params_select[] = {
569     GNUNET_PQ_query_param_auto_from_type (channel_key),
570     GNUNET_PQ_query_param_auto_from_type (slave_key),
571     GNUNET_PQ_query_param_uint64 (&message_id),
572     GNUNET_PQ_query_param_end
573   };
574
575   struct GNUNET_PQ_ResultSpec results_select[] = {
576     GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
577     GNUNET_PQ_result_spec_end
578   };
579
580   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
581       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, "select_membership", 
582                                                 params_select, results_select))
583      return GNUNET_SYSERR;
584
585   return GNUNET_OK;
586 }
587
588 /**
589  * Store a message fragment sent to a channel.
590  *
591  * @see GNUNET_PSYCSTORE_fragment_store()
592  *
593  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
594  */
595 static int
596 fragment_store (void *cls,
597                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
598                 const struct GNUNET_MULTICAST_MessageHeader *msg,
599                 uint32_t psycstore_flags)
600 {
601   struct Plugin *plugin = cls;
602
603   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
604
605   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
606
607   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
608   uint64_t message_id = GNUNET_ntohll (msg->message_id);
609   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
610
611   uint32_t hop_counter = ntohl(msg->hop_counter);
612   uint32_t flags = ntohl(msg->flags);
613
614   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
615       message_id > INT64_MAX || group_generation > INT64_MAX)
616   {
617     LOG(GNUNET_ERROR_TYPE_ERROR,
618          "Tried to store fragment with a field > INT64_MAX: "
619          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
620          message_id, group_generation);
621     GNUNET_break (0);
622     return GNUNET_SYSERR;
623   }
624
625   if (GNUNET_OK != channel_key_store (plugin, channel_key))
626     return GNUNET_SYSERR;
627
628   struct GNUNET_PQ_QueryParam params_insert[] = {
629     GNUNET_PQ_query_param_auto_from_type (channel_key),
630     GNUNET_PQ_query_param_uint32 (&hop_counter),
631     GNUNET_PQ_query_param_auto_from_type (&msg->signature),
632     GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
633     GNUNET_PQ_query_param_uint64 (&fragment_id),
634     GNUNET_PQ_query_param_uint64 (&fragment_offset),
635     GNUNET_PQ_query_param_uint64 (&message_id),
636     GNUNET_PQ_query_param_uint64 (&group_generation),
637     GNUNET_PQ_query_param_uint32 (&flags),
638     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
639     GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
640     GNUNET_PQ_query_param_end
641   };
642
643   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
644       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert))
645     return GNUNET_SYSERR;
646
647   return GNUNET_OK;
648 }
649
650 /**
651  * Set additional flags for a given message.
652  *
653  * They are OR'd with any existing flags set.
654  *
655  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
656  */
657 static int
658 message_add_flags (void *cls,
659                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
660                    uint64_t message_id,
661                    uint32_t psycstore_flags)
662 {
663   struct Plugin *plugin = cls;
664
665   struct GNUNET_PQ_QueryParam params_update[] = {
666     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
667     GNUNET_PQ_query_param_auto_from_type (channel_key),
668     GNUNET_PQ_query_param_uint64 (&message_id),
669     GNUNET_PQ_query_param_end
670   };
671
672   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
673       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update))
674     return GNUNET_SYSERR;
675
676   return GNUNET_OK;
677 }
678
679
680 /**
681  * Closure for #fragment_rows.
682  */
683 struct FragmentRowsContext {
684   GNUNET_PSYCSTORE_FragmentCallback cb;
685   void *cb_cls;
686
687   uint64_t *returned_fragments;
688
689   /* I preserved this but I do not see the point since
690    * it cannot stop the loop early and gets overwritten ?? */
691   int ret;
692 };
693
694
695 /**
696  * Callback that retrieves the results of a SELECT statement
697  * reading form the messages table.
698  *
699  * Only passed to GNUNET_PQ_eval_prepared_multi_select and
700  * has type GNUNET_PQ_PostgresResultHandler.
701  *
702  * @param cls closure
703  * @param result the postgres result
704  * @param num_result the number of results in @a result
705  */
706 void fragment_rows (void *cls,
707                     PGresult *res,
708                     unsigned int num_results)
709 {
710   struct FragmentRowsContext *c = cls;
711
712   for (unsigned int i=0;i<num_results;i++)
713   {
714     uint32_t hop_counter;
715     void *signature = NULL;
716     void *purpose = NULL;
717     size_t signature_size;
718     size_t purpose_size;
719     uint64_t fragment_id;
720     uint64_t fragment_offset;
721     uint64_t message_id;
722     uint64_t group_generation;
723     uint32_t flags;
724     void *buf;
725     size_t buf_size;
726     uint32_t msg_flags;
727     struct GNUNET_PQ_ResultSpec results[] = {
728       GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
729       GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
730       GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
731       GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
732       GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
733       GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
734       GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
735       GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
736       GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
737       GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
738       GNUNET_PQ_result_spec_end
739     };
740     struct GNUNET_MULTICAST_MessageHeader *mp;
741
742     if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
743     {
744       GNUNET_PQ_cleanup_result(results);  /* missing previously, a memory leak?? */
745       break;  /* nothing more?? */
746     }
747
748     mp = GNUNET_malloc (sizeof (*mp) + buf_size);
749
750     mp->header.size = htons (sizeof (*mp) + buf_size);
751     mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
752     mp->hop_counter = htonl (hop_counter);
753     GNUNET_memcpy (&mp->signature,
754                    signature, signature_size);
755     GNUNET_memcpy (&mp->purpose,
756                    purpose, purpose_size);
757     mp->fragment_id = GNUNET_htonll (fragment_id);
758     mp->fragment_offset = GNUNET_htonll (fragment_offset);
759     mp->message_id = GNUNET_htonll (message_id);
760     mp->group_generation = GNUNET_htonll (group_generation);
761     mp->flags = htonl(msg_flags);
762
763     GNUNET_memcpy (&mp[1],
764                    buf, buf_size);
765     GNUNET_PQ_cleanup_result(results);
766     c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
767     if (NULL != c->returned_fragments)
768       (*c->returned_fragments)++;
769   }
770 }
771
772
773 static int
774 fragment_select (struct Plugin *plugin,
775                  const char *stmt,
776                  struct GNUNET_PQ_QueryParam *params,
777                  uint64_t *returned_fragments,
778                  GNUNET_PSYCSTORE_FragmentCallback cb,
779                  void *cb_cls)
780 {
781   /* Stack based closure */
782   struct FragmentRowsContext frc = {
783     .cb = cb,
784     .cb_cls = cb_cls,
785     .returned_fragments = returned_fragments,
786     .ret = GNUNET_SYSERR
787   };
788
789   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
790                                                 stmt, params,
791                                                 &fragment_rows, &frc))
792     return GNUNET_SYSERR;
793   return frc.ret;  /* GNUNET_OK ?? */
794 }
795
796 /**
797  * Retrieve a message fragment range by fragment ID.
798  *
799  * @see GNUNET_PSYCSTORE_fragment_get()
800  *
801  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
802  */
803 static int
804 fragment_get (void *cls,
805               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
806               uint64_t first_fragment_id,
807               uint64_t last_fragment_id,
808               uint64_t *returned_fragments,
809               GNUNET_PSYCSTORE_FragmentCallback cb,
810               void *cb_cls)
811 {
812   struct Plugin *plugin = cls;
813   struct GNUNET_PQ_QueryParam params_select[] = {
814     GNUNET_PQ_query_param_auto_from_type (channel_key),
815     GNUNET_PQ_query_param_uint64 (&first_fragment_id),
816     GNUNET_PQ_query_param_uint64 (&last_fragment_id),
817     GNUNET_PQ_query_param_end
818   };
819
820   *returned_fragments = 0;
821   return fragment_select (plugin,
822                           "select_fragments",
823                           params_select,
824                           returned_fragments,
825                           cb, cb_cls);
826 }
827
828
829 /**
830  * Retrieve a message fragment range by fragment ID.
831  *
832  * @see GNUNET_PSYCSTORE_fragment_get_latest()
833  *
834  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
835  */
836 static int
837 fragment_get_latest (void *cls,
838                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
839                      uint64_t fragment_limit,
840                      uint64_t *returned_fragments,
841                      GNUNET_PSYCSTORE_FragmentCallback cb,
842                      void *cb_cls)
843 {
844   struct Plugin *plugin = cls;
845
846   *returned_fragments = 0;
847
848   struct GNUNET_PQ_QueryParam params_select[] = {
849     GNUNET_PQ_query_param_auto_from_type (channel_key),
850     GNUNET_PQ_query_param_uint64 (&fragment_limit),
851     GNUNET_PQ_query_param_end
852   };
853
854   return fragment_select (plugin,
855                           "select_latest_fragments",
856                           params_select,
857                           returned_fragments,
858                           cb, cb_cls);
859 }
860
861
862 /**
863  * Retrieve all fragments of a message ID range.
864  *
865  * @see GNUNET_PSYCSTORE_message_get()
866  *
867  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
868  */
869 static int
870 message_get (void *cls,
871              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
872              uint64_t first_message_id,
873              uint64_t last_message_id,
874              uint64_t fragment_limit,
875              uint64_t *returned_fragments,
876              GNUNET_PSYCSTORE_FragmentCallback cb,
877              void *cb_cls)
878 {
879   struct Plugin *plugin = cls;
880   struct GNUNET_PQ_QueryParam params_select[] = {
881     GNUNET_PQ_query_param_auto_from_type (channel_key),
882     GNUNET_PQ_query_param_uint64 (&first_message_id),
883     GNUNET_PQ_query_param_uint64 (&last_message_id),
884     GNUNET_PQ_query_param_uint64 (&fragment_limit),
885     GNUNET_PQ_query_param_end
886   };
887
888   if (0 == fragment_limit)
889     fragment_limit = INT64_MAX;
890   *returned_fragments = 0;
891   return fragment_select (plugin,
892                           "select_messages",
893                           params_select,
894                           returned_fragments,
895                           cb, cb_cls);
896 }
897
898
899 /**
900  * Retrieve all fragments of the latest messages.
901  *
902  * @see GNUNET_PSYCSTORE_message_get_latest()
903  *
904  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
905  */
906 static int
907 message_get_latest (void *cls,
908                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
909                     uint64_t message_limit,
910                     uint64_t *returned_fragments,
911                     GNUNET_PSYCSTORE_FragmentCallback cb,
912                     void *cb_cls)
913 {
914   struct Plugin *plugin = cls;
915   struct GNUNET_PQ_QueryParam params_select[] = {
916     GNUNET_PQ_query_param_auto_from_type (channel_key),
917     GNUNET_PQ_query_param_auto_from_type (channel_key),
918     GNUNET_PQ_query_param_uint64 (&message_limit),
919     GNUNET_PQ_query_param_end
920   };
921
922   *returned_fragments = 0;
923   return fragment_select (plugin,
924                           "select_latest_messages",
925                           params_select,
926                           returned_fragments,
927                           cb, cb_cls);
928 }
929
930
931 /**
932  * Retrieve a fragment of message specified by its message ID and fragment
933  * offset.
934  *
935  * @see GNUNET_PSYCSTORE_message_get_fragment()
936  *
937  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
938  */
939 static int
940 message_get_fragment (void *cls,
941                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
942                       uint64_t message_id,
943                       uint64_t fragment_offset,
944                       GNUNET_PSYCSTORE_FragmentCallback cb,
945                       void *cb_cls)
946 {
947   struct Plugin *plugin = cls;
948   const char *stmt = "select_message_fragment";
949
950   struct GNUNET_PQ_QueryParam params_select[] = {
951     GNUNET_PQ_query_param_auto_from_type (channel_key),
952     GNUNET_PQ_query_param_uint64 (&message_id),
953     GNUNET_PQ_query_param_uint64 (&fragment_offset),
954     GNUNET_PQ_query_param_end
955   };
956
957   /* Stack based closure */
958   struct FragmentRowsContext frc = {
959     .cb = cb,
960     .cb_cls = cb_cls,
961     .returned_fragments = NULL,
962     .ret = GNUNET_SYSERR
963   };
964
965   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
966                                                 stmt, params_select,
967                                                 &fragment_rows, &frc))
968     return GNUNET_SYSERR;
969   return frc.ret;  /* GNUNET_OK ?? */
970 }
971
972 /**
973  * Retrieve the max. values of message counters for a channel.
974  *
975  * @see GNUNET_PSYCSTORE_counters_get()
976  *
977  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
978  */
979 static int
980 counters_message_get (void *cls,
981                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
982                       uint64_t *max_fragment_id,
983                       uint64_t *max_message_id,
984                       uint64_t *max_group_generation)
985 {
986   struct Plugin *plugin = cls;
987
988   const char *stmt = "select_counters_message";
989
990   struct GNUNET_PQ_QueryParam params_select[] = {
991     GNUNET_PQ_query_param_auto_from_type (channel_key),
992     GNUNET_PQ_query_param_end
993   };
994
995   struct GNUNET_PQ_ResultSpec results_select[] = {
996     GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
997     GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
998     GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
999     GNUNET_PQ_result_spec_end
1000   };
1001
1002   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1003       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, 
1004                                                 params_select, results_select))
1005      return GNUNET_SYSERR;
1006
1007   return GNUNET_OK;
1008 }
1009
1010 /**
1011  * Retrieve the max. values of state counters for a channel.
1012  *
1013  * @see GNUNET_PSYCSTORE_counters_get()
1014  *
1015  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1016  */
1017 static int
1018 counters_state_get (void *cls,
1019                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1020                     uint64_t *max_state_message_id)
1021 {
1022   struct Plugin *plugin = cls;
1023
1024   const char *stmt = "select_counters_state";
1025
1026   struct GNUNET_PQ_QueryParam params_select[] = {
1027     GNUNET_PQ_query_param_auto_from_type (channel_key),
1028     GNUNET_PQ_query_param_end
1029   };
1030
1031   struct GNUNET_PQ_ResultSpec results_select[] = {
1032     GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1033     GNUNET_PQ_result_spec_end
1034   };
1035
1036   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1037       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, 
1038                                                 params_select, results_select))
1039      return GNUNET_SYSERR;
1040
1041   return GNUNET_OK;
1042 }
1043
1044
1045 /**
1046  * Assign a value to a state variable.
1047  *
1048  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1049  */
1050 static int
1051 state_assign (struct Plugin *plugin, const char *stmt,
1052               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1053               const char *name, const void *value, size_t value_size)
1054 {
1055   struct GNUNET_PQ_QueryParam params[] = {
1056     GNUNET_PQ_query_param_auto_from_type (channel_key),
1057     GNUNET_PQ_query_param_string (name),
1058     GNUNET_PQ_query_param_fixed_size (value, value_size),
1059     GNUNET_PQ_query_param_end
1060   };
1061
1062   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1063       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1064     return GNUNET_SYSERR;
1065
1066   return GNUNET_OK;
1067 }
1068
1069
1070 static int
1071 update_message_id (struct Plugin *plugin,
1072                    const char *stmt,
1073                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1074                    uint64_t message_id)
1075 {
1076   struct GNUNET_PQ_QueryParam params[] = {
1077     GNUNET_PQ_query_param_uint64 (&message_id),
1078     GNUNET_PQ_query_param_auto_from_type (channel_key),
1079     GNUNET_PQ_query_param_end
1080   };
1081
1082   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1083       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1084     return GNUNET_SYSERR;
1085
1086   return GNUNET_OK;
1087 }
1088
1089
1090 /**
1091  * Begin modifying current state.
1092  */
1093 static int
1094 state_modify_begin (void *cls,
1095                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1096                     uint64_t message_id, uint64_t state_delta)
1097 {
1098   struct Plugin *plugin = cls;
1099
1100   if (state_delta > 0)
1101   {
1102     /**
1103      * We can only apply state modifiers in the current message if modifiers in
1104      * the previous stateful message (message_id - state_delta) were already
1105      * applied.
1106      */
1107
1108     uint64_t max_state_message_id = 0;
1109     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1110     switch (ret)
1111     {
1112     case GNUNET_OK:
1113     case GNUNET_NO: // no state yet
1114       ret = GNUNET_OK;
1115       break;
1116
1117     default:
1118       return ret;
1119     }
1120
1121     if (max_state_message_id < message_id - state_delta)
1122       return GNUNET_NO; /* some stateful messages not yet applied */
1123     else if (message_id - state_delta < max_state_message_id)
1124       return GNUNET_NO; /* changes already applied */
1125   }
1126
1127   if (TRANSACTION_NONE != plugin->transaction)
1128   {
1129     /** @todo FIXME: wait for other transaction to finish  */
1130     return GNUNET_SYSERR;
1131   }
1132   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1133 }
1134
1135
1136 /**
1137  * Set the current value of state variable.
1138  *
1139  * @see GNUNET_PSYCSTORE_state_modify()
1140  *
1141  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1142  */
1143 static int
1144 state_modify_op (void *cls,
1145                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1146                  enum GNUNET_PSYC_Operator op,
1147                  const char *name, const void *value, size_t value_size)
1148 {
1149   struct Plugin *plugin = cls;
1150   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1151
1152   switch (op)
1153   {
1154   case GNUNET_PSYC_OP_ASSIGN:
1155     return state_assign (plugin, "insert_state_current",
1156                          channel_key, name, value, value_size);
1157
1158   default: /** @todo implement more state operations */
1159     GNUNET_break (0);
1160     return GNUNET_SYSERR;
1161   }
1162 }
1163
1164
1165 /**
1166  * End modifying current state.
1167  */
1168 static int
1169 state_modify_end (void *cls,
1170                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1171                   uint64_t message_id)
1172 {
1173   struct Plugin *plugin = cls;
1174   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1175
1176   return
1177     GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1178     && GNUNET_OK == update_message_id (plugin,
1179                                        "update_max_state_message_id",
1180                                        channel_key, message_id)
1181     && GNUNET_OK == transaction_commit (plugin)
1182     ? GNUNET_OK : GNUNET_SYSERR;
1183 }
1184
1185
1186 /**
1187  * Begin state synchronization.
1188  */
1189 static int
1190 state_sync_begin (void *cls,
1191                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1192 {
1193   struct Plugin *plugin = cls;
1194   return exec_channel (plugin, "delete_state_sync", channel_key);
1195 }
1196
1197
1198 /**
1199  * Assign current value of a state variable.
1200  *
1201  * @see GNUNET_PSYCSTORE_state_modify()
1202  *
1203  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1204  */
1205 static int
1206 state_sync_assign (void *cls,
1207                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1208                 const char *name, const void *value, size_t value_size)
1209 {
1210   struct Plugin *plugin = cls;
1211   return state_assign (plugin, "insert_state_sync",
1212                        channel_key, name, value, value_size);
1213 }
1214
1215
1216 /**
1217  * End modifying current state.
1218  */
1219 static int
1220 state_sync_end (void *cls,
1221                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222                 uint64_t max_state_message_id,
1223                 uint64_t state_hash_message_id)
1224 {
1225   struct Plugin *plugin = cls;
1226   int ret = GNUNET_SYSERR;
1227
1228   if (TRANSACTION_NONE != plugin->transaction)
1229   {
1230     /** @todo FIXME: wait for other transaction to finish  */
1231     return GNUNET_SYSERR;
1232   }
1233
1234   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1235     && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1236     && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1237                                   channel_key)
1238     && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1239                                   channel_key)
1240     && GNUNET_OK == update_message_id (plugin,
1241                                        "update_state_hash_message_id",
1242                                        channel_key, state_hash_message_id)
1243     && GNUNET_OK == update_message_id (plugin,
1244                                        "update_max_state_message_id",
1245                                        channel_key, max_state_message_id)
1246     && GNUNET_OK == transaction_commit (plugin)
1247     ? ret = GNUNET_OK
1248     : transaction_rollback (plugin);
1249   return ret;
1250 }
1251
1252
1253 /**
1254  * Delete the whole state.
1255  *
1256  * @see GNUNET_PSYCSTORE_state_reset()
1257  *
1258  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1259  */
1260 static int
1261 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1262 {
1263   struct Plugin *plugin = cls;
1264   return exec_channel (plugin, "delete_state", channel_key);
1265 }
1266
1267
1268 /**
1269  * Update signed values of state variables in the state store.
1270  *
1271  * @see GNUNET_PSYCSTORE_state_hash_update()
1272  *
1273  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1274  */
1275 static int
1276 state_update_signed (void *cls,
1277                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1278 {
1279   struct Plugin *plugin = cls;
1280   return exec_channel (plugin, "update_state_signed", channel_key);
1281 }
1282
1283
1284 /**
1285  * Retrieve a state variable by name.
1286  *
1287  * @see GNUNET_PSYCSTORE_state_get()
1288  *
1289  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1290  */
1291 static int
1292 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1293            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1294 {
1295   struct Plugin *plugin = cls;
1296
1297   const char *stmt = "select_state_one";
1298
1299   struct GNUNET_PQ_QueryParam params_select[] = {
1300     GNUNET_PQ_query_param_auto_from_type (channel_key),
1301     GNUNET_PQ_query_param_string (name),
1302     GNUNET_PQ_query_param_end
1303   };
1304
1305   void *value_current = NULL;
1306   size_t value_size = 0;
1307
1308   struct GNUNET_PQ_ResultSpec results_select[] = {
1309     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1310     GNUNET_PQ_result_spec_end
1311   };
1312
1313   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1314       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, 
1315                                                 params_select, results_select))
1316      return GNUNET_SYSERR;
1317
1318   return cb (cb_cls, name, value_current,
1319             value_size);
1320 }
1321
1322
1323
1324 /**
1325  * Closure for #get_state_cb.
1326  */
1327 struct GetStateContext {
1328   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
1329   // const char *name,
1330   GNUNET_PSYCSTORE_StateCallback cb;
1331   void *cb_cls;
1332
1333   const char *value_id;
1334
1335   /* I preserved this but I do not see the point since
1336    * it cannot stop the loop early and gets overwritten ?? */
1337   int ret;
1338 };
1339
1340
1341 /**
1342  * Callback that retrieves the results of a SELECT statement
1343  * reading form the state table.
1344  *
1345  * Only passed to GNUNET_PQ_eval_prepared_multi_select and
1346  * has type GNUNET_PQ_PostgresResultHandler.
1347  *
1348  * @param cls closure
1349  * @param result the postgres result
1350  * @param num_result the number of results in @a result
1351  */
1352 static void 
1353 get_state_cb (void *cls,
1354                 PGresult *res,
1355                 unsigned int num_results)
1356 {
1357   struct GetStateContext *c = cls;
1358
1359   for (unsigned int i=0;i<num_results;i++)
1360   {
1361     char *name = "";
1362     void *value = NULL;
1363     size_t value_size = 0;
1364
1365     struct GNUNET_PQ_ResultSpec results[] = {
1366       GNUNET_PQ_result_spec_string ("name", &name),
1367       GNUNET_PQ_result_spec_variable_size (c->value_id, &value, &value_size),
1368       GNUNET_PQ_result_spec_end
1369     };
1370
1371     if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
1372     {
1373       GNUNET_PQ_cleanup_result(results);  /* previously invoked via PQclear?? */
1374       break;  /* nothing more?? */
1375     }
1376
1377     c->ret = c->cb (c->cb_cls, (const char *) name, value, value_size);
1378     GNUNET_PQ_cleanup_result(results);
1379   }
1380 }
1381
1382 /**
1383  * Retrieve all state variables for a channel with the given prefix.
1384  *
1385  * @see GNUNET_PSYCSTORE_state_get_prefix()
1386  *
1387  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1388  */
1389 static int
1390 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1391                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1392                   void *cb_cls)
1393 {
1394   struct Plugin *plugin = cls;
1395
1396   const char *stmt = "select_state_prefix";
1397
1398   uint32_t name_len = (uint32_t) strlen (name);
1399
1400   struct GNUNET_PQ_QueryParam params_select[] = {
1401     GNUNET_PQ_query_param_auto_from_type (channel_key),
1402     GNUNET_PQ_query_param_string (name),
1403     GNUNET_PQ_query_param_uint32 (&name_len),
1404     GNUNET_PQ_query_param_string (name),
1405     GNUNET_PQ_query_param_end
1406   };
1407
1408   struct GetStateContext gsc = {
1409     .cb = cb,
1410     .cb_cls = cb_cls,
1411     .value_id = "value_current",
1412     .ret = GNUNET_NO
1413   };
1414
1415   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1416                                                 stmt, params_select,
1417                                                 &get_state_cb, &gsc))
1418     return GNUNET_SYSERR;
1419   return gsc.ret;  /* GNUNET_OK ?? */
1420 }
1421
1422
1423 /**
1424  * Retrieve all signed state variables for a channel.
1425  *
1426  * @see GNUNET_PSYCSTORE_state_get_signed()
1427  *
1428  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1429  */
1430 static int
1431 state_get_signed (void *cls,
1432                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1433                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1434 {
1435   struct Plugin *plugin = cls;
1436
1437   const char *stmt = "select_state_signed";
1438
1439   struct GNUNET_PQ_QueryParam params_select[] = {
1440     GNUNET_PQ_query_param_auto_from_type (channel_key),
1441     GNUNET_PQ_query_param_end
1442   };
1443
1444   struct GetStateContext gsc = {
1445     .cb = cb,
1446     .cb_cls = cb_cls,
1447     .value_id = "value_signed",
1448     .ret = GNUNET_NO
1449   };
1450
1451   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1452                                                 stmt, params_select,
1453                                                 &get_state_cb, &gsc))
1454     return GNUNET_SYSERR;
1455   return gsc.ret;  /* GNUNET_OK ?? */
1456 }
1457
1458
1459 /**
1460  * Entry point for the plugin.
1461  *
1462  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1463  * @return NULL on error, otherwise the plugin context
1464  */
1465 void *
1466 libgnunet_plugin_psycstore_postgres_init (void *cls)
1467 {
1468   static struct Plugin plugin;
1469   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1470   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1471
1472   if (NULL != plugin.cfg)
1473     return NULL;                /* can only initialize once! */
1474   memset (&plugin, 0, sizeof (struct Plugin));
1475   plugin.cfg = cfg;
1476   if (GNUNET_OK != database_setup (&plugin))
1477   {
1478     database_shutdown (&plugin);
1479     return NULL;
1480   }
1481   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1482   api->cls = &plugin;
1483   api->membership_store = &postgres_membership_store;
1484   api->membership_test = &membership_test;
1485   api->fragment_store = &fragment_store;
1486   api->message_add_flags = &message_add_flags;
1487   api->fragment_get = &fragment_get;
1488   api->fragment_get_latest = &fragment_get_latest;
1489   api->message_get = &message_get;
1490   api->message_get_latest = &message_get_latest;
1491   api->message_get_fragment = &message_get_fragment;
1492   api->counters_message_get = &counters_message_get;
1493   api->counters_state_get = &counters_state_get;
1494   api->state_modify_begin = &state_modify_begin;
1495   api->state_modify_op = &state_modify_op;
1496   api->state_modify_end = &state_modify_end;
1497   api->state_sync_begin = &state_sync_begin;
1498   api->state_sync_assign = &state_sync_assign;
1499   api->state_sync_end = &state_sync_end;
1500   api->state_reset = &state_reset;
1501   api->state_update_signed = &state_update_signed;
1502   api->state_get = &state_get;
1503   api->state_get_prefix = &state_get_prefix;
1504   api->state_get_signed = &state_get_signed;
1505
1506   LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1507   return api;
1508 }
1509
1510
1511 /**
1512  * Exit point from the plugin.
1513  *
1514  * @param cls The plugin context (as returned by "init")
1515  * @return Always NULL
1516  */
1517 void *
1518 libgnunet_plugin_psycstore_postgres_done (void *cls)
1519 {
1520   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1521   struct Plugin *plugin = api->cls;
1522
1523   database_shutdown (plugin);
1524   plugin->cfg = NULL;
1525   GNUNET_free (api);
1526   LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1527   return NULL;
1528 }
1529
1530 /* end of plugin_psycstore_postgres.c */