glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_postgres.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2016 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  */
15
16 /**
17  * @file psycstore/plugin_psycstore_postgres.c
18  * @brief PostgresQL-based psycstore backend
19  * @author Daniel Golle
20  * @author Gabor X Toth
21  * @author Christian Grothoff
22  * @author Christophe Genevey
23  * @author Jeffrey Burdges
24  */
25
26 #include "platform.h"
27 #include "gnunet_psycstore_plugin.h"
28 #include "gnunet_psycstore_service.h"
29 #include "gnunet_multicast_service.h"
30 #include "gnunet_crypto_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "psycstore.h"
33 #include "gnunet_pq_lib.h"
34
35 /**
36  * After how many ms "busy" should a DB operation fail for good?  A
37  * low value makes sure that we are more responsive to requests
38  * (especially PUTs).  A high value guarantees a higher success rate
39  * (SELECTs in iterate can take several seconds despite LIMIT=1).
40  *
41  * The default value of 1s should ensure that users do not experience
42  * huge latencies while at the same time allowing operations to
43  * succeed with reasonable probability.
44  */
45 #define BUSY_TIMEOUT_MS 1000
46
47 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
48
49 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
50
51 enum Transactions {
52   TRANSACTION_NONE = 0,
53   TRANSACTION_STATE_MODIFY,
54   TRANSACTION_STATE_SYNC,
55 };
56
57 /**
58  * Context for all functions in this plugin.
59  */
60 struct Plugin
61 {
62
63   const struct GNUNET_CONFIGURATION_Handle *cfg;
64
65   /**
66    * Native Postgres database handle.
67    */
68   PGconn *dbh;
69
70   enum Transactions transaction;
71
72   void *cls;
73 };
74
75
76 /**
77  * Initialize the database connections and associated
78  * data structures (create tables and indices
79  * as needed as well).
80  *
81  * @param plugin the plugin context (state for this module)
82  * @return #GNUNET_OK on success
83  */
84 static int
85 database_setup (struct Plugin *plugin)
86 {
87   struct GNUNET_PQ_ExecuteStatement es[] = {
88     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n"
89                             " id SERIAL,\n"
90                             " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
91                             " max_state_message_id BIGINT,\n"
92                             " state_hash_message_id BIGINT,\n"
93                             " PRIMARY KEY(id)\n"
94                             ")"
95                             "WITH OIDS"),
96     GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
97                             " ON channels (pub_key)"),
98     GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
99                             " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
100                             "RETURNS NULL ON NULL INPUT"),
101     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n"
102                             " id SERIAL,\n"
103                             " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
104                             " PRIMARY KEY(id)\n"
105                             ")"
106                             "WITH OIDS"),
107     GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
108                             " ON slaves (pub_key)"),
109     GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
110                             " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
111                             "RETURNS NULL ON NULL INPUT"),
112     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n"
113                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
114                             "  slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
115                             "  did_join INT NOT NULL,\n"
116                             "  announced_at BIGINT NOT NULL,\n"
117                             "  effective_since BIGINT NOT NULL,\n"
118                             "  group_generation BIGINT NOT NULL\n"
119                             ")"
120                             "WITH OIDS"),
121     GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
122                             "ON membership (channel_id, slave_id)"),
123     /** @todo messages table: add method_name column */
124     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n"
125                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
126                             "  hop_counter INT NOT NULL,\n"
127                             "  signature BYTEA CHECK (LENGTH(signature)=64),\n"
128                             "  purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
129                             "  fragment_id BIGINT NOT NULL,\n"
130                             "  fragment_offset BIGINT NOT NULL,\n"
131                             "  message_id BIGINT NOT NULL,\n"
132                             "  group_generation BIGINT NOT NULL,\n"
133                             "  multicast_flags INT NOT NULL,\n"
134                             "  psycstore_flags INT NOT NULL,\n"
135                             "  data BYTEA,\n"
136                             "  PRIMARY KEY (channel_id, fragment_id),\n"
137                             "  UNIQUE (channel_id, message_id, fragment_offset)\n"
138                             ")"
139                             "WITH OIDS"),
140     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n"
141                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
142                             "  name TEXT NOT NULL,\n"
143                             "  value_current BYTEA,\n"
144                             "  value_signed BYTEA,\n"
145                             "  PRIMARY KEY (channel_id, name)\n"
146                             ")"
147                             "WITH OIDS"),
148     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n"
149                             "  channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
150                             "  name TEXT NOT NULL,\n"
151                             "  value BYTEA,\n"
152                             "  PRIMARY KEY (channel_id, name)\n"
153                             ")"
154                             "WITH OIDS"),
155     GNUNET_PQ_EXECUTE_STATEMENT_END
156   };
157
158   /* Open database and precompile statements */
159   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg,
160                                             "psycstore-postgres");
161   if (NULL == plugin->dbh)
162     return GNUNET_SYSERR;
163   if (GNUNET_OK !=
164       GNUNET_PQ_exec_statements (plugin->dbh,
165                                  es))
166   {
167     PQfinish (plugin->dbh);
168     plugin->dbh = NULL;
169     return GNUNET_SYSERR;
170   }
171
172   /* Prepare statements */
173   {
174     struct GNUNET_PQ_PreparedStatement ps[] = {
175       GNUNET_PQ_make_prepare ("transaction_begin",
176                               "BEGIN", 0),
177       GNUNET_PQ_make_prepare ("transaction_commit",
178                               "COMMIT", 0),
179       GNUNET_PQ_make_prepare ("transaction_rollback",
180                               "ROLLBACK", 0),
181       GNUNET_PQ_make_prepare ("insert_channel_key",
182                               "INSERT INTO channels (pub_key) VALUES ($1)"
183                               " ON CONFLICT DO NOTHING", 1),
184       GNUNET_PQ_make_prepare ("insert_slave_key",
185                               "INSERT INTO slaves (pub_key) VALUES ($1)"
186                               " ON CONFLICT DO NOTHING", 1),
187       GNUNET_PQ_make_prepare ("insert_membership",
188                               "INSERT INTO membership\n"
189                               " (channel_id, slave_id, did_join, announced_at,\n"
190                               "  effective_since, group_generation)\n"
191                               "VALUES (get_chan_id($1),\n"
192                               "        get_slave_id($2),\n"
193                               "        $3, $4, $5, $6)", 6),
194       GNUNET_PQ_make_prepare ("select_membership",
195                               "SELECT did_join FROM membership\n"
196                               "WHERE channel_id = get_chan_id($1)\n"
197                               "      AND slave_id = get_slave_id($2)\n"
198                               "      AND effective_since <= $3 AND did_join = 1\n"
199                               "ORDER BY announced_at DESC LIMIT 1", 3),
200       GNUNET_PQ_make_prepare ("insert_fragment",
201                               "INSERT INTO messages\n"
202                               " (channel_id, hop_counter, signature, purpose,\n"
203                               "  fragment_id, fragment_offset, message_id,\n"
204                               "  group_generation, multicast_flags, psycstore_flags, data)\n"
205                               "VALUES (get_chan_id($1),\n"
206                               "        $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
207                               "ON CONFLICT DO NOTHING", 11),
208       GNUNET_PQ_make_prepare ("update_message_flags",
209                               "UPDATE messages\n"
210                               "SET psycstore_flags = psycstore_flags | $1\n"
211                               "WHERE channel_id = get_chan_id($2) \n"
212                               "      AND message_id = $3 AND fragment_offset = 0", 3),
213       GNUNET_PQ_make_prepare ("select_fragments",
214                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
215                               "       fragment_offset, message_id, group_generation,\n"
216                               "       multicast_flags, psycstore_flags, data\n"
217                               "FROM messages\n"
218                               "WHERE channel_id = get_chan_id($1) \n"
219                               "      AND $2 <= fragment_id AND fragment_id <= $3", 3),
220       /** @todo select_messages: add method_prefix filter */
221       GNUNET_PQ_make_prepare ("select_messages",
222                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
223                               "       fragment_offset, message_id, group_generation,\n"
224                               "       multicast_flags, psycstore_flags, data\n"
225                               "FROM messages\n"
226                               "WHERE channel_id = get_chan_id($1) \n"
227                               "      AND $2 <= message_id AND message_id <= $3\n"
228                               "LIMIT $4;", 4),
229       /** @todo select_latest_messages: add method_prefix filter */
230       GNUNET_PQ_make_prepare ("select_latest_fragments",
231                               "SELECT  rev.hop_counter AS hop_counter,\n"
232                               "        rev.signature AS signature,\n"
233                               "        rev.purpose AS purpose,\n"
234                               "        rev.fragment_id AS fragment_id,\n"
235                               "        rev.fragment_offset AS fragment_offset,\n"
236                               "        rev.message_id AS message_id,\n"
237                               "        rev.group_generation AS group_generation,\n"
238                               "        rev.multicast_flags AS multicast_flags,\n"
239                               "        rev.psycstore_flags AS psycstore_flags,\n"
240                               "        rev.data AS data\n"
241                               " FROM\n"
242                               " (SELECT hop_counter, signature, purpose, fragment_id,\n"
243                               "        fragment_offset, message_id, group_generation,\n"
244                               "        multicast_flags, psycstore_flags, data \n"
245                               "  FROM messages\n"
246                               "  WHERE channel_id = get_chan_id($1) \n"
247                               "  ORDER BY fragment_id DESC\n"
248                               "  LIMIT $2) AS rev\n"
249                               " ORDER BY rev.fragment_id;", 2),
250       GNUNET_PQ_make_prepare ("select_latest_messages",
251                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
252                               "       fragment_offset, message_id, group_generation,\n"
253                               "        multicast_flags, psycstore_flags, data\n"
254                               "FROM messages\n"
255                               "WHERE channel_id = get_chan_id($1)\n"
256                               "      AND message_id IN\n"
257                               "      (SELECT message_id\n"
258                               "       FROM messages\n"
259                               "       WHERE channel_id = get_chan_id($2) \n"
260                               "       GROUP BY message_id\n"
261                               "       ORDER BY message_id\n"
262                               "       DESC LIMIT $3)\n"
263                               "ORDER BY fragment_id", 3),
264       GNUNET_PQ_make_prepare ("select_message_fragment",
265                               "SELECT hop_counter, signature, purpose, fragment_id,\n"
266                               "       fragment_offset, message_id, group_generation,\n"
267                               "       multicast_flags, psycstore_flags, data\n"
268                               "FROM messages\n"
269                               "WHERE channel_id = get_chan_id($1) \n"
270                               "      AND message_id = $2 AND fragment_offset = $3", 3),
271       GNUNET_PQ_make_prepare ("select_counters_message",
272                               "SELECT fragment_id, message_id, group_generation\n"
273                               "FROM messages\n"
274                               "WHERE channel_id = get_chan_id($1)\n"
275                               "ORDER BY fragment_id DESC LIMIT 1", 1),
276       GNUNET_PQ_make_prepare ("select_counters_state",
277                               "SELECT max_state_message_id\n"
278                               "FROM channels\n"
279                               "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1),
280       GNUNET_PQ_make_prepare ("update_max_state_message_id",
281                               "UPDATE channels\n"
282                               "SET max_state_message_id = $1\n"
283                               "WHERE pub_key = $2", 2),
284
285       GNUNET_PQ_make_prepare ("update_state_hash_message_id",
286                               "UPDATE channels\n"
287                               "SET state_hash_message_id = $1\n"
288                               "WHERE pub_key = $2", 2),
289       GNUNET_PQ_make_prepare ("insert_state_current",
290                               "INSERT INTO state\n"
291                               "  (channel_id, name, value_current, value_signed)\n"
292                               "SELECT new.channel_id, new.name,\n"
293                               "       new.value_current, old.value_signed\n"
294                               "FROM (SELECT get_chan_id($1) AS channel_id,\n"
295                               "             $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
296                               "LEFT JOIN (SELECT channel_id, name, value_signed\n"
297                               "           FROM state) AS old\n"
298                               "ON new.channel_id = old.channel_id AND new.name = old.name\n"
299                               "ON CONFLICT (channel_id, name)\n"
300                               "   DO UPDATE SET value_current = EXCLUDED.value_current,\n"
301                               "                 value_signed = EXCLUDED.value_signed", 3),
302       GNUNET_PQ_make_prepare ("delete_state_empty",
303                               "DELETE FROM state\n"
304                               "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
305                               "      AND (value_current IS NULL OR length(value_current) = 0)\n"
306                               "      AND (value_signed IS NULL OR length(value_signed) = 0)", 1),
307       GNUNET_PQ_make_prepare ("update_state_signed",
308                               "UPDATE state\n"
309                               "SET value_signed = value_current\n"
310                               "WHERE channel_id = get_chan_id($1) ", 1),
311       GNUNET_PQ_make_prepare ("delete_state",
312                               "DELETE FROM state\n"
313                               "WHERE channel_id = get_chan_id($1) ", 1),
314       GNUNET_PQ_make_prepare ("insert_state_sync",
315                               "INSERT INTO state_sync (channel_id, name, value)\n"
316                               "VALUES (get_chan_id($1), $2, $3)", 3),
317       GNUNET_PQ_make_prepare ("insert_state_from_sync",
318                               "INSERT INTO state\n"
319                               " (channel_id, name, value_current, value_signed)\n"
320                               "SELECT channel_id, name, value, value\n"
321                               "FROM state_sync\n"
322                               "WHERE channel_id = get_chan_id($1)", 1),
323       GNUNET_PQ_make_prepare ("delete_state_sync",
324                               "DELETE FROM state_sync\n"
325                               "WHERE channel_id = get_chan_id($1)", 1),
326       GNUNET_PQ_make_prepare ("select_state_one",
327                               "SELECT value_current\n"
328                               "FROM state\n"
329                               "WHERE channel_id = get_chan_id($1)\n"
330                               "      AND name = $2", 2),
331       GNUNET_PQ_make_prepare ("select_state_prefix",
332                               "SELECT name, value_current\n"
333                               "FROM state\n"
334                               "WHERE channel_id = get_chan_id($1)\n"
335                               "      AND (name = $2 OR substr(name, 1, $3) = $4)", 4),
336       GNUNET_PQ_make_prepare ("select_state_signed",
337                               "SELECT name, value_signed\n"
338                               "FROM state\n"
339                               "WHERE channel_id = get_chan_id($1)\n"
340                               "      AND value_signed IS NOT NULL", 1),
341       GNUNET_PQ_PREPARED_STATEMENT_END
342     };
343
344     if (GNUNET_OK !=
345         GNUNET_PQ_prepare_statements (plugin->dbh,
346                                       ps))
347     {
348       PQfinish (plugin->dbh);
349       plugin->dbh = NULL;
350       return GNUNET_SYSERR;
351     }
352   }
353
354   return GNUNET_OK;
355 }
356
357
358 /**
359  * Shutdown database connection and associate data
360  * structures.
361  * @param plugin the plugin context (state for this module)
362  */
363 static void
364 database_shutdown (struct Plugin *plugin)
365 {
366   PQfinish (plugin->dbh);
367   plugin->dbh = NULL;
368 }
369
370
371 /**
372  * Execute a prepared statement with a @a channel_key argument.
373  *
374  * @param plugin Plugin handle.
375  * @param stmt Statement to execute.
376  * @param channel_key Public key of the channel.
377  *
378  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
379  */
380 static int
381 exec_channel (struct Plugin *plugin, const char *stmt,
382               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
383 {
384   struct GNUNET_PQ_QueryParam params[] = {
385     GNUNET_PQ_query_param_auto_from_type (channel_key),
386     GNUNET_PQ_query_param_end
387   };
388
389   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
390       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
391     return GNUNET_SYSERR;
392
393   return GNUNET_OK;
394 }
395
396
397 /**
398  * Begin a transaction.
399  */
400 static int
401 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
402 {
403   struct GNUNET_PQ_QueryParam params[] = {
404     GNUNET_PQ_query_param_end
405   };
406
407   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
408       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params))
409     return GNUNET_SYSERR;
410
411   plugin->transaction = transaction;
412   return GNUNET_OK;
413 }
414
415
416 /**
417  * Commit current transaction.
418  */
419 static int
420 transaction_commit (struct Plugin *plugin)
421 {
422   struct GNUNET_PQ_QueryParam params[] = {
423     GNUNET_PQ_query_param_end
424   };
425
426   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
427       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params))
428     return GNUNET_SYSERR;
429
430   plugin->transaction = TRANSACTION_NONE;
431   return GNUNET_OK;
432 }
433
434
435 /**
436  * Roll back current transaction.
437  */
438 static int
439 transaction_rollback (struct Plugin *plugin)
440 {
441   struct GNUNET_PQ_QueryParam params[] = {
442     GNUNET_PQ_query_param_end
443   };
444
445   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
446       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params))
447     return GNUNET_SYSERR;
448
449   plugin->transaction = TRANSACTION_NONE;
450   return GNUNET_OK;
451 }
452
453
454 static int
455 channel_key_store (struct Plugin *plugin,
456                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
457 {
458   struct GNUNET_PQ_QueryParam params[] = {
459     GNUNET_PQ_query_param_auto_from_type (channel_key),
460     GNUNET_PQ_query_param_end
461   };
462
463   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
464       GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
465                                           "insert_channel_key",
466                                           params))
467     return GNUNET_SYSERR;
468
469   return GNUNET_OK;
470 }
471
472
473 static int
474 slave_key_store (struct Plugin *plugin,
475                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
476 {
477   struct GNUNET_PQ_QueryParam params[] = {
478     GNUNET_PQ_query_param_auto_from_type (slave_key),
479     GNUNET_PQ_query_param_end
480   };
481
482   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
483       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params))
484     return GNUNET_SYSERR;
485
486   return GNUNET_OK;
487 }
488
489
490 /**
491  * Store join/leave events for a PSYC channel in order to be able to answer
492  * membership test queries later.
493  *
494  * @see GNUNET_PSYCSTORE_membership_store()
495  *
496  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
497  */
498 static int
499 postgres_membership_store (void *cls,
500                            const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
501                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
502                            int did_join,
503                            uint64_t announced_at,
504                            uint64_t effective_since,
505                            uint64_t group_generation)
506 {
507   struct Plugin *plugin = cls;
508   uint32_t idid_join = (uint32_t) did_join;
509
510   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
511
512   if ( (announced_at > INT64_MAX) ||
513        (effective_since > INT64_MAX) ||
514        (group_generation > INT64_MAX) )
515   {
516     GNUNET_break (0);
517     return GNUNET_SYSERR;
518   }
519
520   if ( (GNUNET_OK !=
521         channel_key_store (plugin, channel_key)) ||
522        (GNUNET_OK !=
523         slave_key_store (plugin, slave_key)) )
524     return GNUNET_SYSERR;
525
526   struct GNUNET_PQ_QueryParam params[] = {
527     GNUNET_PQ_query_param_auto_from_type (channel_key),
528     GNUNET_PQ_query_param_auto_from_type (slave_key),
529     GNUNET_PQ_query_param_uint32 (&idid_join),
530     GNUNET_PQ_query_param_uint64 (&announced_at),
531     GNUNET_PQ_query_param_uint64 (&effective_since),
532     GNUNET_PQ_query_param_uint64 (&group_generation),
533     GNUNET_PQ_query_param_end
534   };
535
536   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
537       GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
538                                           "insert_membership",
539                                           params))
540     return GNUNET_SYSERR;
541
542   return GNUNET_OK;
543 }
544
545 /**
546  * Test if a member was admitted to the channel at the given message ID.
547  *
548  * @see GNUNET_PSYCSTORE_membership_test()
549  *
550  * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
551  *         #GNUNET_SYSERR if there was en error.
552  */
553 static int
554 membership_test (void *cls,
555                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
556                  const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
557                  uint64_t message_id)
558 {
559   struct Plugin *plugin = cls;
560
561   uint32_t did_join = 0;
562
563   struct GNUNET_PQ_QueryParam params_select[] = {
564     GNUNET_PQ_query_param_auto_from_type (channel_key),
565     GNUNET_PQ_query_param_auto_from_type (slave_key),
566     GNUNET_PQ_query_param_uint64 (&message_id),
567     GNUNET_PQ_query_param_end
568   };
569
570   struct GNUNET_PQ_ResultSpec results_select[] = {
571     GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
572     GNUNET_PQ_result_spec_end
573   };
574
575   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
576       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, "select_membership", 
577                                                 params_select, results_select))
578      return GNUNET_SYSERR;
579
580   return GNUNET_OK;
581 }
582
583 /**
584  * Store a message fragment sent to a channel.
585  *
586  * @see GNUNET_PSYCSTORE_fragment_store()
587  *
588  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
589  */
590 static int
591 fragment_store (void *cls,
592                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
593                 const struct GNUNET_MULTICAST_MessageHeader *msg,
594                 uint32_t psycstore_flags)
595 {
596   struct Plugin *plugin = cls;
597
598   GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
599
600   uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
601
602   uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
603   uint64_t message_id = GNUNET_ntohll (msg->message_id);
604   uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
605
606   uint32_t hop_counter = ntohl(msg->hop_counter);
607   uint32_t flags = ntohl(msg->flags);
608
609   if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
610       message_id > INT64_MAX || group_generation > INT64_MAX)
611   {
612     LOG(GNUNET_ERROR_TYPE_ERROR,
613          "Tried to store fragment with a field > INT64_MAX: "
614          "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
615          message_id, group_generation);
616     GNUNET_break (0);
617     return GNUNET_SYSERR;
618   }
619
620   if (GNUNET_OK != channel_key_store (plugin, channel_key))
621     return GNUNET_SYSERR;
622
623   struct GNUNET_PQ_QueryParam params_insert[] = {
624     GNUNET_PQ_query_param_auto_from_type (channel_key),
625     GNUNET_PQ_query_param_uint32 (&hop_counter),
626     GNUNET_PQ_query_param_auto_from_type (&msg->signature),
627     GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
628     GNUNET_PQ_query_param_uint64 (&fragment_id),
629     GNUNET_PQ_query_param_uint64 (&fragment_offset),
630     GNUNET_PQ_query_param_uint64 (&message_id),
631     GNUNET_PQ_query_param_uint64 (&group_generation),
632     GNUNET_PQ_query_param_uint32 (&flags),
633     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
634     GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
635     GNUNET_PQ_query_param_end
636   };
637
638   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
639       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert))
640     return GNUNET_SYSERR;
641
642   return GNUNET_OK;
643 }
644
645 /**
646  * Set additional flags for a given message.
647  *
648  * They are OR'd with any existing flags set.
649  *
650  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
651  */
652 static int
653 message_add_flags (void *cls,
654                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
655                    uint64_t message_id,
656                    uint32_t psycstore_flags)
657 {
658   struct Plugin *plugin = cls;
659
660   struct GNUNET_PQ_QueryParam params_update[] = {
661     GNUNET_PQ_query_param_uint32 (&psycstore_flags),
662     GNUNET_PQ_query_param_auto_from_type (channel_key),
663     GNUNET_PQ_query_param_uint64 (&message_id),
664     GNUNET_PQ_query_param_end
665   };
666
667   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
668       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update))
669     return GNUNET_SYSERR;
670
671   return GNUNET_OK;
672 }
673
674
675 /**
676  * Closure for #fragment_rows.
677  */
678 struct FragmentRowsContext {
679   GNUNET_PSYCSTORE_FragmentCallback cb;
680   void *cb_cls;
681
682   uint64_t *returned_fragments;
683
684   /* I preserved this but I do not see the point since
685    * it cannot stop the loop early and gets overwritten ?? */
686   int ret;
687 };
688
689
690 /**
691  * Callback that retrieves the results of a SELECT statement
692  * reading form the messages table.
693  *
694  * Only passed to GNUNET_PQ_eval_prepared_multi_select and
695  * has type GNUNET_PQ_PostgresResultHandler.
696  *
697  * @param cls closure
698  * @param result the postgres result
699  * @param num_result the number of results in @a result
700  */
701 void fragment_rows (void *cls,
702                     PGresult *res,
703                     unsigned int num_results)
704 {
705   struct FragmentRowsContext *c = cls;
706
707   for (unsigned int i=0;i<num_results;i++)
708   {
709     uint32_t hop_counter;
710     void *signature = NULL;
711     void *purpose = NULL;
712     size_t signature_size;
713     size_t purpose_size;
714     uint64_t fragment_id;
715     uint64_t fragment_offset;
716     uint64_t message_id;
717     uint64_t group_generation;
718     uint32_t flags;
719     void *buf;
720     size_t buf_size;
721     uint32_t msg_flags;
722     struct GNUNET_PQ_ResultSpec results[] = {
723       GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
724       GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
725       GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
726       GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
727       GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
728       GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
729       GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
730       GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
731       GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
732       GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
733       GNUNET_PQ_result_spec_end
734     };
735     struct GNUNET_MULTICAST_MessageHeader *mp;
736
737     if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
738     {
739       GNUNET_PQ_cleanup_result(results);  /* missing previously, a memory leak?? */
740       break;  /* nothing more?? */
741     }
742
743     mp = GNUNET_malloc (sizeof (*mp) + buf_size);
744
745     mp->header.size = htons (sizeof (*mp) + buf_size);
746     mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
747     mp->hop_counter = htonl (hop_counter);
748     GNUNET_memcpy (&mp->signature,
749                    signature, signature_size);
750     GNUNET_memcpy (&mp->purpose,
751                    purpose, purpose_size);
752     mp->fragment_id = GNUNET_htonll (fragment_id);
753     mp->fragment_offset = GNUNET_htonll (fragment_offset);
754     mp->message_id = GNUNET_htonll (message_id);
755     mp->group_generation = GNUNET_htonll (group_generation);
756     mp->flags = htonl(msg_flags);
757
758     GNUNET_memcpy (&mp[1],
759                    buf, buf_size);
760     GNUNET_PQ_cleanup_result(results);
761     c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
762     if (NULL != c->returned_fragments)
763       (*c->returned_fragments)++;
764   }
765 }
766
767
768 static int
769 fragment_select (struct Plugin *plugin,
770                  const char *stmt,
771                  struct GNUNET_PQ_QueryParam *params,
772                  uint64_t *returned_fragments,
773                  GNUNET_PSYCSTORE_FragmentCallback cb,
774                  void *cb_cls)
775 {
776   /* Stack based closure */
777   struct FragmentRowsContext frc = {
778     .cb = cb,
779     .cb_cls = cb_cls,
780     .returned_fragments = returned_fragments,
781     .ret = GNUNET_SYSERR
782   };
783
784   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
785                                                 stmt, params,
786                                                 &fragment_rows, &frc))
787     return GNUNET_SYSERR;
788   return frc.ret;  /* GNUNET_OK ?? */
789 }
790
791 /**
792  * Retrieve a message fragment range by fragment ID.
793  *
794  * @see GNUNET_PSYCSTORE_fragment_get()
795  *
796  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
797  */
798 static int
799 fragment_get (void *cls,
800               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
801               uint64_t first_fragment_id,
802               uint64_t last_fragment_id,
803               uint64_t *returned_fragments,
804               GNUNET_PSYCSTORE_FragmentCallback cb,
805               void *cb_cls)
806 {
807   struct Plugin *plugin = cls;
808   struct GNUNET_PQ_QueryParam params_select[] = {
809     GNUNET_PQ_query_param_auto_from_type (channel_key),
810     GNUNET_PQ_query_param_uint64 (&first_fragment_id),
811     GNUNET_PQ_query_param_uint64 (&last_fragment_id),
812     GNUNET_PQ_query_param_end
813   };
814
815   *returned_fragments = 0;
816   return fragment_select (plugin,
817                           "select_fragments",
818                           params_select,
819                           returned_fragments,
820                           cb, cb_cls);
821 }
822
823
824 /**
825  * Retrieve a message fragment range by fragment ID.
826  *
827  * @see GNUNET_PSYCSTORE_fragment_get_latest()
828  *
829  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
830  */
831 static int
832 fragment_get_latest (void *cls,
833                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
834                      uint64_t fragment_limit,
835                      uint64_t *returned_fragments,
836                      GNUNET_PSYCSTORE_FragmentCallback cb,
837                      void *cb_cls)
838 {
839   struct Plugin *plugin = cls;
840
841   *returned_fragments = 0;
842
843   struct GNUNET_PQ_QueryParam params_select[] = {
844     GNUNET_PQ_query_param_auto_from_type (channel_key),
845     GNUNET_PQ_query_param_uint64 (&fragment_limit),
846     GNUNET_PQ_query_param_end
847   };
848
849   return fragment_select (plugin,
850                           "select_latest_fragments",
851                           params_select,
852                           returned_fragments,
853                           cb, cb_cls);
854 }
855
856
857 /**
858  * Retrieve all fragments of a message ID range.
859  *
860  * @see GNUNET_PSYCSTORE_message_get()
861  *
862  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
863  */
864 static int
865 message_get (void *cls,
866              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
867              uint64_t first_message_id,
868              uint64_t last_message_id,
869              uint64_t fragment_limit,
870              uint64_t *returned_fragments,
871              GNUNET_PSYCSTORE_FragmentCallback cb,
872              void *cb_cls)
873 {
874   struct Plugin *plugin = cls;
875   struct GNUNET_PQ_QueryParam params_select[] = {
876     GNUNET_PQ_query_param_auto_from_type (channel_key),
877     GNUNET_PQ_query_param_uint64 (&first_message_id),
878     GNUNET_PQ_query_param_uint64 (&last_message_id),
879     GNUNET_PQ_query_param_uint64 (&fragment_limit),
880     GNUNET_PQ_query_param_end
881   };
882
883   if (0 == fragment_limit)
884     fragment_limit = INT64_MAX;
885   *returned_fragments = 0;
886   return fragment_select (plugin,
887                           "select_messages",
888                           params_select,
889                           returned_fragments,
890                           cb, cb_cls);
891 }
892
893
894 /**
895  * Retrieve all fragments of the latest messages.
896  *
897  * @see GNUNET_PSYCSTORE_message_get_latest()
898  *
899  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
900  */
901 static int
902 message_get_latest (void *cls,
903                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
904                     uint64_t message_limit,
905                     uint64_t *returned_fragments,
906                     GNUNET_PSYCSTORE_FragmentCallback cb,
907                     void *cb_cls)
908 {
909   struct Plugin *plugin = cls;
910   struct GNUNET_PQ_QueryParam params_select[] = {
911     GNUNET_PQ_query_param_auto_from_type (channel_key),
912     GNUNET_PQ_query_param_auto_from_type (channel_key),
913     GNUNET_PQ_query_param_uint64 (&message_limit),
914     GNUNET_PQ_query_param_end
915   };
916
917   *returned_fragments = 0;
918   return fragment_select (plugin,
919                           "select_latest_messages",
920                           params_select,
921                           returned_fragments,
922                           cb, cb_cls);
923 }
924
925
926 /**
927  * Retrieve a fragment of message specified by its message ID and fragment
928  * offset.
929  *
930  * @see GNUNET_PSYCSTORE_message_get_fragment()
931  *
932  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
933  */
934 static int
935 message_get_fragment (void *cls,
936                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
937                       uint64_t message_id,
938                       uint64_t fragment_offset,
939                       GNUNET_PSYCSTORE_FragmentCallback cb,
940                       void *cb_cls)
941 {
942   struct Plugin *plugin = cls;
943   const char *stmt = "select_message_fragment";
944
945   struct GNUNET_PQ_QueryParam params_select[] = {
946     GNUNET_PQ_query_param_auto_from_type (channel_key),
947     GNUNET_PQ_query_param_uint64 (&message_id),
948     GNUNET_PQ_query_param_uint64 (&fragment_offset),
949     GNUNET_PQ_query_param_end
950   };
951
952   /* Stack based closure */
953   struct FragmentRowsContext frc = {
954     .cb = cb,
955     .cb_cls = cb_cls,
956     .returned_fragments = NULL,
957     .ret = GNUNET_SYSERR
958   };
959
960   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
961                                                 stmt, params_select,
962                                                 &fragment_rows, &frc))
963     return GNUNET_SYSERR;
964   return frc.ret;  /* GNUNET_OK ?? */
965 }
966
967 /**
968  * Retrieve the max. values of message counters for a channel.
969  *
970  * @see GNUNET_PSYCSTORE_counters_get()
971  *
972  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
973  */
974 static int
975 counters_message_get (void *cls,
976                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
977                       uint64_t *max_fragment_id,
978                       uint64_t *max_message_id,
979                       uint64_t *max_group_generation)
980 {
981   struct Plugin *plugin = cls;
982
983   const char *stmt = "select_counters_message";
984
985   struct GNUNET_PQ_QueryParam params_select[] = {
986     GNUNET_PQ_query_param_auto_from_type (channel_key),
987     GNUNET_PQ_query_param_end
988   };
989
990   struct GNUNET_PQ_ResultSpec results_select[] = {
991     GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
992     GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
993     GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
994     GNUNET_PQ_result_spec_end
995   };
996
997   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
998       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, 
999                                                 params_select, results_select))
1000      return GNUNET_SYSERR;
1001
1002   return GNUNET_OK;
1003 }
1004
1005 /**
1006  * Retrieve the max. values of state counters for a channel.
1007  *
1008  * @see GNUNET_PSYCSTORE_counters_get()
1009  *
1010  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1011  */
1012 static int
1013 counters_state_get (void *cls,
1014                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1015                     uint64_t *max_state_message_id)
1016 {
1017   struct Plugin *plugin = cls;
1018
1019   const char *stmt = "select_counters_state";
1020
1021   struct GNUNET_PQ_QueryParam params_select[] = {
1022     GNUNET_PQ_query_param_auto_from_type (channel_key),
1023     GNUNET_PQ_query_param_end
1024   };
1025
1026   struct GNUNET_PQ_ResultSpec results_select[] = {
1027     GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1028     GNUNET_PQ_result_spec_end
1029   };
1030
1031   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1032       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, 
1033                                                 params_select, results_select))
1034      return GNUNET_SYSERR;
1035
1036   return GNUNET_OK;
1037 }
1038
1039
1040 /**
1041  * Assign a value to a state variable.
1042  *
1043  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1044  */
1045 static int
1046 state_assign (struct Plugin *plugin, const char *stmt,
1047               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1048               const char *name, const void *value, size_t value_size)
1049 {
1050   struct GNUNET_PQ_QueryParam params[] = {
1051     GNUNET_PQ_query_param_auto_from_type (channel_key),
1052     GNUNET_PQ_query_param_string (name),
1053     GNUNET_PQ_query_param_fixed_size (value, value_size),
1054     GNUNET_PQ_query_param_end
1055   };
1056
1057   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1058       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1059     return GNUNET_SYSERR;
1060
1061   return GNUNET_OK;
1062 }
1063
1064
1065 static int
1066 update_message_id (struct Plugin *plugin,
1067                    const char *stmt,
1068                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1069                    uint64_t message_id)
1070 {
1071   struct GNUNET_PQ_QueryParam params[] = {
1072     GNUNET_PQ_query_param_uint64 (&message_id),
1073     GNUNET_PQ_query_param_auto_from_type (channel_key),
1074     GNUNET_PQ_query_param_end
1075   };
1076
1077   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1078       GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1079     return GNUNET_SYSERR;
1080
1081   return GNUNET_OK;
1082 }
1083
1084
1085 /**
1086  * Begin modifying current state.
1087  */
1088 static int
1089 state_modify_begin (void *cls,
1090                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1091                     uint64_t message_id, uint64_t state_delta)
1092 {
1093   struct Plugin *plugin = cls;
1094
1095   if (state_delta > 0)
1096   {
1097     /**
1098      * We can only apply state modifiers in the current message if modifiers in
1099      * the previous stateful message (message_id - state_delta) were already
1100      * applied.
1101      */
1102
1103     uint64_t max_state_message_id = 0;
1104     int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1105     switch (ret)
1106     {
1107     case GNUNET_OK:
1108     case GNUNET_NO: // no state yet
1109       ret = GNUNET_OK;
1110       break;
1111
1112     default:
1113       return ret;
1114     }
1115
1116     if (max_state_message_id < message_id - state_delta)
1117       return GNUNET_NO; /* some stateful messages not yet applied */
1118     else if (message_id - state_delta < max_state_message_id)
1119       return GNUNET_NO; /* changes already applied */
1120   }
1121
1122   if (TRANSACTION_NONE != plugin->transaction)
1123   {
1124     /** @todo FIXME: wait for other transaction to finish  */
1125     return GNUNET_SYSERR;
1126   }
1127   return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1128 }
1129
1130
1131 /**
1132  * Set the current value of state variable.
1133  *
1134  * @see GNUNET_PSYCSTORE_state_modify()
1135  *
1136  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1137  */
1138 static int
1139 state_modify_op (void *cls,
1140                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1141                  enum GNUNET_PSYC_Operator op,
1142                  const char *name, const void *value, size_t value_size)
1143 {
1144   struct Plugin *plugin = cls;
1145   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1146
1147   switch (op)
1148   {
1149   case GNUNET_PSYC_OP_ASSIGN:
1150     return state_assign (plugin, "insert_state_current",
1151                          channel_key, name, value, value_size);
1152
1153   default: /** @todo implement more state operations */
1154     GNUNET_break (0);
1155     return GNUNET_SYSERR;
1156   }
1157 }
1158
1159
1160 /**
1161  * End modifying current state.
1162  */
1163 static int
1164 state_modify_end (void *cls,
1165                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1166                   uint64_t message_id)
1167 {
1168   struct Plugin *plugin = cls;
1169   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1170
1171   return
1172     GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1173     && GNUNET_OK == update_message_id (plugin,
1174                                        "update_max_state_message_id",
1175                                        channel_key, message_id)
1176     && GNUNET_OK == transaction_commit (plugin)
1177     ? GNUNET_OK : GNUNET_SYSERR;
1178 }
1179
1180
1181 /**
1182  * Begin state synchronization.
1183  */
1184 static int
1185 state_sync_begin (void *cls,
1186                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1187 {
1188   struct Plugin *plugin = cls;
1189   return exec_channel (plugin, "delete_state_sync", channel_key);
1190 }
1191
1192
1193 /**
1194  * Assign current value of a state variable.
1195  *
1196  * @see GNUNET_PSYCSTORE_state_modify()
1197  *
1198  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1199  */
1200 static int
1201 state_sync_assign (void *cls,
1202                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1203                 const char *name, const void *value, size_t value_size)
1204 {
1205   struct Plugin *plugin = cls;
1206   return state_assign (plugin, "insert_state_sync",
1207                        channel_key, name, value, value_size);
1208 }
1209
1210
1211 /**
1212  * End modifying current state.
1213  */
1214 static int
1215 state_sync_end (void *cls,
1216                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1217                 uint64_t max_state_message_id,
1218                 uint64_t state_hash_message_id)
1219 {
1220   struct Plugin *plugin = cls;
1221   int ret = GNUNET_SYSERR;
1222
1223   if (TRANSACTION_NONE != plugin->transaction)
1224   {
1225     /** @todo FIXME: wait for other transaction to finish  */
1226     return GNUNET_SYSERR;
1227   }
1228
1229   GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1230     && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1231     && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1232                                   channel_key)
1233     && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1234                                   channel_key)
1235     && GNUNET_OK == update_message_id (plugin,
1236                                        "update_state_hash_message_id",
1237                                        channel_key, state_hash_message_id)
1238     && GNUNET_OK == update_message_id (plugin,
1239                                        "update_max_state_message_id",
1240                                        channel_key, max_state_message_id)
1241     && GNUNET_OK == transaction_commit (plugin)
1242     ? ret = GNUNET_OK
1243     : transaction_rollback (plugin);
1244   return ret;
1245 }
1246
1247
1248 /**
1249  * Delete the whole state.
1250  *
1251  * @see GNUNET_PSYCSTORE_state_reset()
1252  *
1253  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1254  */
1255 static int
1256 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1257 {
1258   struct Plugin *plugin = cls;
1259   return exec_channel (plugin, "delete_state", channel_key);
1260 }
1261
1262
1263 /**
1264  * Update signed values of state variables in the state store.
1265  *
1266  * @see GNUNET_PSYCSTORE_state_hash_update()
1267  *
1268  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1269  */
1270 static int
1271 state_update_signed (void *cls,
1272                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1273 {
1274   struct Plugin *plugin = cls;
1275   return exec_channel (plugin, "update_state_signed", channel_key);
1276 }
1277
1278
1279 /**
1280  * Retrieve a state variable by name.
1281  *
1282  * @see GNUNET_PSYCSTORE_state_get()
1283  *
1284  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1285  */
1286 static int
1287 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1288            const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1289 {
1290   struct Plugin *plugin = cls;
1291
1292   const char *stmt = "select_state_one";
1293
1294   struct GNUNET_PQ_QueryParam params_select[] = {
1295     GNUNET_PQ_query_param_auto_from_type (channel_key),
1296     GNUNET_PQ_query_param_string (name),
1297     GNUNET_PQ_query_param_end
1298   };
1299
1300   void *value_current = NULL;
1301   size_t value_size = 0;
1302
1303   struct GNUNET_PQ_ResultSpec results_select[] = {
1304     GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1305     GNUNET_PQ_result_spec_end
1306   };
1307
1308   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1309       GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, 
1310                                                 params_select, results_select))
1311      return GNUNET_SYSERR;
1312
1313   return cb (cb_cls, name, value_current,
1314             value_size);
1315 }
1316
1317
1318
1319 /**
1320  * Closure for #get_state_cb.
1321  */
1322 struct GetStateContext {
1323   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
1324   // const char *name,
1325   GNUNET_PSYCSTORE_StateCallback cb;
1326   void *cb_cls;
1327
1328   const char *value_id;
1329
1330   /* I preserved this but I do not see the point since
1331    * it cannot stop the loop early and gets overwritten ?? */
1332   int ret;
1333 };
1334
1335
1336 /**
1337  * Callback that retrieves the results of a SELECT statement
1338  * reading form the state table.
1339  *
1340  * Only passed to GNUNET_PQ_eval_prepared_multi_select and
1341  * has type GNUNET_PQ_PostgresResultHandler.
1342  *
1343  * @param cls closure
1344  * @param result the postgres result
1345  * @param num_result the number of results in @a result
1346  */
1347 static void 
1348 get_state_cb (void *cls,
1349                 PGresult *res,
1350                 unsigned int num_results)
1351 {
1352   struct GetStateContext *c = cls;
1353
1354   for (unsigned int i=0;i<num_results;i++)
1355   {
1356     char *name = "";
1357     void *value = NULL;
1358     size_t value_size = 0;
1359
1360     struct GNUNET_PQ_ResultSpec results[] = {
1361       GNUNET_PQ_result_spec_string ("name", &name),
1362       GNUNET_PQ_result_spec_variable_size (c->value_id, &value, &value_size),
1363       GNUNET_PQ_result_spec_end
1364     };
1365
1366     if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
1367     {
1368       GNUNET_PQ_cleanup_result(results);  /* previously invoked via PQclear?? */
1369       break;  /* nothing more?? */
1370     }
1371
1372     c->ret = c->cb (c->cb_cls, (const char *) name, value, value_size);
1373     GNUNET_PQ_cleanup_result(results);
1374   }
1375 }
1376
1377 /**
1378  * Retrieve all state variables for a channel with the given prefix.
1379  *
1380  * @see GNUNET_PSYCSTORE_state_get_prefix()
1381  *
1382  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1383  */
1384 static int
1385 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1386                   const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1387                   void *cb_cls)
1388 {
1389   struct Plugin *plugin = cls;
1390
1391   const char *stmt = "select_state_prefix";
1392
1393   uint32_t name_len = (uint32_t) strlen (name);
1394
1395   struct GNUNET_PQ_QueryParam params_select[] = {
1396     GNUNET_PQ_query_param_auto_from_type (channel_key),
1397     GNUNET_PQ_query_param_string (name),
1398     GNUNET_PQ_query_param_uint32 (&name_len),
1399     GNUNET_PQ_query_param_string (name),
1400     GNUNET_PQ_query_param_end
1401   };
1402
1403   struct GetStateContext gsc = {
1404     .cb = cb,
1405     .cb_cls = cb_cls,
1406     .value_id = "value_current",
1407     .ret = GNUNET_NO
1408   };
1409
1410   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1411                                                 stmt, params_select,
1412                                                 &get_state_cb, &gsc))
1413     return GNUNET_SYSERR;
1414   return gsc.ret;  /* GNUNET_OK ?? */
1415 }
1416
1417
1418 /**
1419  * Retrieve all signed state variables for a channel.
1420  *
1421  * @see GNUNET_PSYCSTORE_state_get_signed()
1422  *
1423  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1424  */
1425 static int
1426 state_get_signed (void *cls,
1427                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1428                   GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1429 {
1430   struct Plugin *plugin = cls;
1431
1432   const char *stmt = "select_state_signed";
1433
1434   struct GNUNET_PQ_QueryParam params_select[] = {
1435     GNUNET_PQ_query_param_auto_from_type (channel_key),
1436     GNUNET_PQ_query_param_end
1437   };
1438
1439   struct GetStateContext gsc = {
1440     .cb = cb,
1441     .cb_cls = cb_cls,
1442     .value_id = "value_signed",
1443     .ret = GNUNET_NO
1444   };
1445
1446   if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1447                                                 stmt, params_select,
1448                                                 &get_state_cb, &gsc))
1449     return GNUNET_SYSERR;
1450   return gsc.ret;  /* GNUNET_OK ?? */
1451 }
1452
1453
1454 /**
1455  * Entry point for the plugin.
1456  *
1457  * @param cls The struct GNUNET_CONFIGURATION_Handle.
1458  * @return NULL on error, otherwise the plugin context
1459  */
1460 void *
1461 libgnunet_plugin_psycstore_postgres_init (void *cls)
1462 {
1463   static struct Plugin plugin;
1464   const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1465   struct GNUNET_PSYCSTORE_PluginFunctions *api;
1466
1467   if (NULL != plugin.cfg)
1468     return NULL;                /* can only initialize once! */
1469   memset (&plugin, 0, sizeof (struct Plugin));
1470   plugin.cfg = cfg;
1471   if (GNUNET_OK != database_setup (&plugin))
1472   {
1473     database_shutdown (&plugin);
1474     return NULL;
1475   }
1476   api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1477   api->cls = &plugin;
1478   api->membership_store = &postgres_membership_store;
1479   api->membership_test = &membership_test;
1480   api->fragment_store = &fragment_store;
1481   api->message_add_flags = &message_add_flags;
1482   api->fragment_get = &fragment_get;
1483   api->fragment_get_latest = &fragment_get_latest;
1484   api->message_get = &message_get;
1485   api->message_get_latest = &message_get_latest;
1486   api->message_get_fragment = &message_get_fragment;
1487   api->counters_message_get = &counters_message_get;
1488   api->counters_state_get = &counters_state_get;
1489   api->state_modify_begin = &state_modify_begin;
1490   api->state_modify_op = &state_modify_op;
1491   api->state_modify_end = &state_modify_end;
1492   api->state_sync_begin = &state_sync_begin;
1493   api->state_sync_assign = &state_sync_assign;
1494   api->state_sync_end = &state_sync_end;
1495   api->state_reset = &state_reset;
1496   api->state_update_signed = &state_update_signed;
1497   api->state_get = &state_get;
1498   api->state_get_prefix = &state_get_prefix;
1499   api->state_get_signed = &state_get_signed;
1500
1501   LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1502   return api;
1503 }
1504
1505
1506 /**
1507  * Exit point from the plugin.
1508  *
1509  * @param cls The plugin context (as returned by "init")
1510  * @return Always NULL
1511  */
1512 void *
1513 libgnunet_plugin_psycstore_postgres_done (void *cls)
1514 {
1515   struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1516   struct Plugin *plugin = api->cls;
1517
1518   database_shutdown (plugin);
1519   plugin->cfg = NULL;
1520   GNUNET_free (api);
1521   LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1522   return NULL;
1523 }
1524
1525 /* end of plugin_psycstore_postgres.c */