2 * This file is part of GNUnet
3 * Copyright (C) 2016 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/plugin_psycstore_postgres.c
23 * @brief PostgresQL-based psycstore backend
24 * @author Daniel Golle
25 * @author Gabor X Toth
26 * @author Christian Grothoff
27 * @author Christophe Genevey
31 #include "gnunet_psycstore_plugin.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "psycstore.h"
37 #include "gnunet_postgres_lib.h"
38 #include "gnunet_pq_lib.h"
41 * After how many ms "busy" should a DB operation fail for good? A
42 * low value makes sure that we are more responsive to requests
43 * (especially PUTs). A high value guarantees a higher success rate
44 * (SELECTs in iterate can take several seconds despite LIMIT=1).
46 * The default value of 1s should ensure that users do not experience
47 * huge latencies while at the same time allowing operations to
48 * succeed with reasonable probability.
50 #define BUSY_TIMEOUT_MS 1000
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
58 TRANSACTION_STATE_MODIFY,
59 TRANSACTION_STATE_SYNC,
63 * Context for all functions in this plugin.
68 const struct GNUNET_CONFIGURATION_Handle *cfg;
71 * Native Postgres database handle.
75 enum Transactions transaction;
82 * Initialize the database connections and associated
83 * data structures (create tables and indices
86 * @param plugin the plugin context (state for this module)
87 * @return #GNUNET_OK on success
90 database_setup (struct Plugin *plugin)
92 struct GNUNET_PQ_ExecuteStatement es[] = {
93 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n"
95 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
96 " max_state_message_id BIGINT,\n"
97 " state_hash_message_id BIGINT,\n"
101 GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
102 " ON channels (pub_key)"),
103 GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
104 " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
105 "RETURNS NULL ON NULL INPUT"),
106 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n"
108 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
112 GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
113 " ON slaves (pub_key)"),
114 GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
115 " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
116 "RETURNS NULL ON NULL INPUT"),
117 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n"
118 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
119 " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
120 " did_join INT NOT NULL,\n"
121 " announced_at BIGINT NOT NULL,\n"
122 " effective_since BIGINT NOT NULL,\n"
123 " group_generation BIGINT NOT NULL\n"
126 GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
127 "ON membership (channel_id, slave_id)"),
128 /** @todo messages table: add method_name column */
129 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n"
130 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
131 " hop_counter INT NOT NULL,\n"
132 " signature BYTEA CHECK (LENGTH(signature)=64),\n"
133 " purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
134 " fragment_id BIGINT NOT NULL,\n"
135 " fragment_offset BIGINT NOT NULL,\n"
136 " message_id BIGINT NOT NULL,\n"
137 " group_generation BIGINT NOT NULL,\n"
138 " multicast_flags INT NOT NULL,\n"
139 " psycstore_flags INT NOT NULL,\n"
141 " PRIMARY KEY (channel_id, fragment_id),\n"
142 " UNIQUE (channel_id, message_id, fragment_offset)\n"
145 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n"
146 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
147 " name TEXT NOT NULL,\n"
148 " value_current BYTEA,\n"
149 " value_signed BYTEA,\n"
150 " PRIMARY KEY (channel_id, name)\n"
153 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n"
154 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
155 " name TEXT NOT NULL,\n"
157 " PRIMARY KEY (channel_id, name)\n"
160 GNUNET_PQ_EXECUTE_STATEMENT_END
163 /* Open database and precompile statements */
164 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg,
165 "psycstore-postgres");
166 if (NULL == plugin->dbh)
167 return GNUNET_SYSERR;
169 GNUNET_PQ_exec_statements (plugin->dbh,
172 PQfinish (plugin->dbh);
174 return GNUNET_SYSERR;
177 /* Prepare statements */
179 struct GNUNET_PQ_PreparedStatement ps[] = {
180 GNUNET_PQ_make_prepare ("transaction_begin",
182 GNUNET_PQ_make_prepare ("transaction_commit",
184 GNUNET_PQ_make_prepare ("transaction_rollback",
186 GNUNET_PQ_make_prepare ("insert_channel_key",
187 "INSERT INTO channels (pub_key) VALUES ($1)"
188 " ON CONFLICT DO NOTHING", 1),
189 GNUNET_PQ_make_prepare ("insert_slave_key",
190 "INSERT INTO slaves (pub_key) VALUES ($1)"
191 " ON CONFLICT DO NOTHING", 1),
192 GNUNET_PQ_make_prepare ("insert_membership",
193 "INSERT INTO membership\n"
194 " (channel_id, slave_id, did_join, announced_at,\n"
195 " effective_since, group_generation)\n"
196 "VALUES (get_chan_id($1),\n"
197 " get_slave_id($2),\n"
198 " $3, $4, $5, $6)", 6),
199 GNUNET_PQ_make_prepare ("select_membership",
200 "SELECT did_join FROM membership\n"
201 "WHERE channel_id = get_chan_id($1)\n"
202 " AND slave_id = get_slave_id($2)\n"
203 " AND effective_since <= $3 AND did_join = 1\n"
204 "ORDER BY announced_at DESC LIMIT 1", 3),
205 GNUNET_PQ_make_prepare ("insert_fragment",
206 "INSERT INTO messages\n"
207 " (channel_id, hop_counter, signature, purpose,\n"
208 " fragment_id, fragment_offset, message_id,\n"
209 " group_generation, multicast_flags, psycstore_flags, data)\n"
210 "VALUES (get_chan_id($1),\n"
211 " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
212 "ON CONFLICT DO NOTHING", 11),
213 GNUNET_PQ_make_prepare ("update_message_flags",
215 "SET psycstore_flags = psycstore_flags | $1\n"
216 "WHERE channel_id = get_chan_id($2) \n"
217 " AND message_id = $3 AND fragment_offset = 0", 3),
218 GNUNET_PQ_make_prepare ("select_fragments",
219 "SELECT hop_counter, signature, purpose, fragment_id,\n"
220 " fragment_offset, message_id, group_generation,\n"
221 " multicast_flags, psycstore_flags, data\n"
223 "WHERE channel_id = get_chan_id($1) \n"
224 " AND $2 <= fragment_id AND fragment_id <= $3", 3),
225 /** @todo select_messages: add method_prefix filter */
226 GNUNET_PQ_make_prepare ("select_messages",
227 "SELECT hop_counter, signature, purpose, fragment_id,\n"
228 " fragment_offset, message_id, group_generation,\n"
229 " multicast_flags, psycstore_flags, data\n"
231 "WHERE channel_id = get_chan_id($1) \n"
232 " AND $2 <= message_id AND message_id <= $3\n"
234 /** @todo select_latest_messages: add method_prefix filter */
235 GNUNET_PQ_make_prepare ("select_latest_fragments",
236 "SELECT rev.hop_counter AS hop_counter,\n"
237 " rev.signature AS signature,\n"
238 " rev.purpose AS purpose,\n"
239 " rev.fragment_id AS fragment_id,\n"
240 " rev.fragment_offset AS fragment_offset,\n"
241 " rev.message_id AS message_id,\n"
242 " rev.group_generation AS group_generation,\n"
243 " rev.multicast_flags AS multicast_flags,\n"
244 " rev.psycstore_flags AS psycstore_flags,\n"
245 " rev.data AS data\n"
247 " (SELECT hop_counter, signature, purpose, fragment_id,\n"
248 " fragment_offset, message_id, group_generation,\n"
249 " multicast_flags, psycstore_flags, data \n"
251 " WHERE channel_id = get_chan_id($1) \n"
252 " ORDER BY fragment_id DESC\n"
253 " LIMIT $2) AS rev\n"
254 " ORDER BY rev.fragment_id;", 2),
255 GNUNET_PQ_make_prepare ("select_latest_messages",
256 "SELECT hop_counter, signature, purpose, fragment_id,\n"
257 " fragment_offset, message_id, group_generation,\n"
258 " multicast_flags, psycstore_flags, data\n"
260 "WHERE channel_id = get_chan_id($1)\n"
261 " AND message_id IN\n"
262 " (SELECT message_id\n"
264 " WHERE channel_id = get_chan_id($2) \n"
265 " GROUP BY message_id\n"
266 " ORDER BY message_id\n"
268 "ORDER BY fragment_id", 3),
269 GNUNET_PQ_make_prepare ("select_message_fragment",
270 "SELECT hop_counter, signature, purpose, fragment_id,\n"
271 " fragment_offset, message_id, group_generation,\n"
272 " multicast_flags, psycstore_flags, data\n"
274 "WHERE channel_id = get_chan_id($1) \n"
275 " AND message_id = $2 AND fragment_offset = $3", 3),
276 GNUNET_PQ_make_prepare ("select_counters_message",
277 "SELECT fragment_id, message_id, group_generation\n"
279 "WHERE channel_id = get_chan_id($1)\n"
280 "ORDER BY fragment_id DESC LIMIT 1", 1),
281 GNUNET_PQ_make_prepare ("select_counters_state",
282 "SELECT max_state_message_id\n"
284 "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1),
285 GNUNET_PQ_make_prepare ("update_max_state_message_id",
287 "SET max_state_message_id = $1\n"
288 "WHERE pub_key = $2", 2),
290 GNUNET_PQ_make_prepare ("update_state_hash_message_id",
292 "SET state_hash_message_id = $1\n"
293 "WHERE pub_key = $2", 2),
294 GNUNET_PQ_make_prepare ("insert_state_current",
295 "INSERT INTO state\n"
296 " (channel_id, name, value_current, value_signed)\n"
297 "SELECT new.channel_id, new.name,\n"
298 " new.value_current, old.value_signed\n"
299 "FROM (SELECT get_chan_id($1) AS channel_id,\n"
300 " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
301 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
302 " FROM state) AS old\n"
303 "ON new.channel_id = old.channel_id AND new.name = old.name\n"
304 "ON CONFLICT (channel_id, name)\n"
305 " DO UPDATE SET value_current = EXCLUDED.value_current,\n"
306 " value_signed = EXCLUDED.value_signed", 3),
307 GNUNET_PQ_make_prepare ("delete_state_empty",
308 "DELETE FROM state\n"
309 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
310 " AND (value_current IS NULL OR length(value_current) = 0)\n"
311 " AND (value_signed IS NULL OR length(value_signed) = 0)", 1),
312 GNUNET_PQ_make_prepare ("update_state_signed",
314 "SET value_signed = value_current\n"
315 "WHERE channel_id = get_chan_id($1) ", 1),
316 GNUNET_PQ_make_prepare ("delete_state",
317 "DELETE FROM state\n"
318 "WHERE channel_id = get_chan_id($1) ", 1),
319 GNUNET_PQ_make_prepare ("insert_state_sync",
320 "INSERT INTO state_sync (channel_id, name, value)\n"
321 "VALUES (get_chan_id($1), $2, $3)", 3),
322 GNUNET_PQ_make_prepare ("insert_state_from_sync",
323 "INSERT INTO state\n"
324 " (channel_id, name, value_current, value_signed)\n"
325 "SELECT channel_id, name, value, value\n"
327 "WHERE channel_id = get_chan_id($1)", 1),
328 GNUNET_PQ_make_prepare ("delete_state_sync",
329 "DELETE FROM state_sync\n"
330 "WHERE channel_id = get_chan_id($1)", 1),
331 GNUNET_PQ_make_prepare ("select_state_one",
332 "SELECT value_current\n"
334 "WHERE channel_id = get_chan_id($1)\n"
335 " AND name = $2", 2),
336 GNUNET_PQ_make_prepare ("select_state_prefix",
337 "SELECT name, value_current\n"
339 "WHERE channel_id = get_chan_id($1)\n"
340 " AND (name = $2 OR substr(name, 1, $3) = $4)", 4),
341 GNUNET_PQ_make_prepare ("select_state_signed",
342 "SELECT name, value_signed\n"
344 "WHERE channel_id = get_chan_id($1)\n"
345 " AND value_signed IS NOT NULL", 1),
346 GNUNET_PQ_PREPARED_STATEMENT_END
350 GNUNET_PQ_prepare_statements (plugin->dbh,
353 PQfinish (plugin->dbh);
355 return GNUNET_SYSERR;
364 * Shutdown database connection and associate data
366 * @param plugin the plugin context (state for this module)
369 database_shutdown (struct Plugin *plugin)
371 PQfinish (plugin->dbh);
377 * Execute a prepared statement with a @a channel_key argument.
379 * @param plugin Plugin handle.
380 * @param stmt Statement to execute.
381 * @param channel_key Public key of the channel.
383 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
386 exec_channel (struct Plugin *plugin, const char *stmt,
387 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
389 struct GNUNET_PQ_QueryParam params[] = {
390 GNUNET_PQ_query_param_auto_from_type (channel_key),
391 GNUNET_PQ_query_param_end
394 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
395 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
396 return GNUNET_SYSERR;
403 * Begin a transaction.
406 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
408 struct GNUNET_PQ_QueryParam params[] = {
409 GNUNET_PQ_query_param_end
412 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
413 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params))
414 return GNUNET_SYSERR;
416 plugin->transaction = transaction;
422 * Commit current transaction.
425 transaction_commit (struct Plugin *plugin)
427 struct GNUNET_PQ_QueryParam params[] = {
428 GNUNET_PQ_query_param_end
431 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
432 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params))
433 return GNUNET_SYSERR;
435 plugin->transaction = TRANSACTION_NONE;
441 * Roll back current transaction.
444 transaction_rollback (struct Plugin *plugin)
446 struct GNUNET_PQ_QueryParam params[] = {
447 GNUNET_PQ_query_param_end
450 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
451 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params))
452 return GNUNET_SYSERR;
454 plugin->transaction = TRANSACTION_NONE;
460 channel_key_store (struct Plugin *plugin,
461 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
463 struct GNUNET_PQ_QueryParam params[] = {
464 GNUNET_PQ_query_param_auto_from_type (channel_key),
465 GNUNET_PQ_query_param_end
468 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
469 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_channel_key", params))
470 return GNUNET_SYSERR;
477 slave_key_store (struct Plugin *plugin,
478 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
480 struct GNUNET_PQ_QueryParam params[] = {
481 GNUNET_PQ_query_param_auto_from_type (slave_key),
482 GNUNET_PQ_query_param_end
485 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
486 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params))
487 return GNUNET_SYSERR;
494 * Store join/leave events for a PSYC channel in order to be able to answer
495 * membership test queries later.
497 * @see GNUNET_PSYCSTORE_membership_store()
499 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
502 postgres_membership_store (void *cls,
503 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
504 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
506 uint64_t announced_at,
507 uint64_t effective_since,
508 uint64_t group_generation)
510 struct Plugin *plugin = cls;
512 uint32_t idid_join = (uint32_t)did_join;
514 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
516 if (announced_at > INT64_MAX ||
517 effective_since > INT64_MAX ||
518 group_generation > INT64_MAX)
521 return GNUNET_SYSERR;
524 if (GNUNET_OK != channel_key_store (plugin, channel_key)
525 || GNUNET_OK != slave_key_store (plugin, slave_key))
526 return GNUNET_SYSERR;
528 struct GNUNET_PQ_QueryParam params[] = {
529 GNUNET_PQ_query_param_auto_from_type (channel_key),
530 GNUNET_PQ_query_param_auto_from_type (slave_key),
531 GNUNET_PQ_query_param_uint32 (&idid_join),
532 GNUNET_PQ_query_param_uint64 (&announced_at),
533 GNUNET_PQ_query_param_uint64 (&effective_since),
534 GNUNET_PQ_query_param_uint64 (&group_generation),
535 GNUNET_PQ_query_param_end
538 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
539 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_membership", params))
540 return GNUNET_SYSERR;
546 * Test if a member was admitted to the channel at the given message ID.
548 * @see GNUNET_PSYCSTORE_membership_test()
550 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
551 * #GNUNET_SYSERR if there was en error.
554 membership_test (void *cls,
555 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
556 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
560 struct Plugin *plugin = cls;
562 uint32_t did_join = 0;
564 int ret = GNUNET_SYSERR;
566 struct GNUNET_PQ_QueryParam params_select[] = {
567 GNUNET_PQ_query_param_auto_from_type (channel_key),
568 GNUNET_PQ_query_param_auto_from_type (slave_key),
569 GNUNET_PQ_query_param_uint64 (&message_id),
570 GNUNET_PQ_query_param_end
573 res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select);
575 GNUNET_POSTGRES_check_result (plugin->dbh,
578 "PQexecPrepared", "select_membership"))
580 return GNUNET_SYSERR;
583 struct GNUNET_PQ_ResultSpec results_select[] = {
584 GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
585 GNUNET_PQ_result_spec_end
588 switch (GNUNET_PQ_extract_result (res, results_select, 0))
605 * Store a message fragment sent to a channel.
607 * @see GNUNET_PSYCSTORE_fragment_store()
609 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
612 fragment_store (void *cls,
613 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
614 const struct GNUNET_MULTICAST_MessageHeader *msg,
615 uint32_t psycstore_flags)
617 struct Plugin *plugin = cls;
619 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
621 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
623 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
624 uint64_t message_id = GNUNET_ntohll (msg->message_id);
625 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
627 uint32_t hop_counter = ntohl(msg->hop_counter);
628 uint32_t flags = ntohl(msg->flags);
630 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
631 message_id > INT64_MAX || group_generation > INT64_MAX)
633 LOG(GNUNET_ERROR_TYPE_ERROR,
634 "Tried to store fragment with a field > INT64_MAX: "
635 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
636 message_id, group_generation);
638 return GNUNET_SYSERR;
641 if (GNUNET_OK != channel_key_store (plugin, channel_key))
642 return GNUNET_SYSERR;
644 struct GNUNET_PQ_QueryParam params_insert[] = {
645 GNUNET_PQ_query_param_auto_from_type (channel_key),
646 GNUNET_PQ_query_param_uint32 (&hop_counter),
647 GNUNET_PQ_query_param_auto_from_type (&msg->signature),
648 GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
649 GNUNET_PQ_query_param_uint64 (&fragment_id),
650 GNUNET_PQ_query_param_uint64 (&fragment_offset),
651 GNUNET_PQ_query_param_uint64 (&message_id),
652 GNUNET_PQ_query_param_uint64 (&group_generation),
653 GNUNET_PQ_query_param_uint32 (&flags),
654 GNUNET_PQ_query_param_uint32 (&psycstore_flags),
655 GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
656 GNUNET_PQ_query_param_end
659 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
660 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert))
661 return GNUNET_SYSERR;
667 * Set additional flags for a given message.
669 * They are OR'd with any existing flags set.
671 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
674 message_add_flags (void *cls,
675 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
677 uint32_t psycstore_flags)
679 struct Plugin *plugin = cls;
681 struct GNUNET_PQ_QueryParam params_update[] = {
682 GNUNET_PQ_query_param_uint32 (&psycstore_flags),
683 GNUNET_PQ_query_param_auto_from_type (channel_key),
684 GNUNET_PQ_query_param_uint64 (&message_id),
685 GNUNET_PQ_query_param_end
688 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
689 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update))
690 return GNUNET_SYSERR;
697 fragment_row (struct Plugin *plugin,
700 GNUNET_PSYCSTORE_FragmentCallback cb,
702 uint64_t *returned_fragments)
704 uint32_t hop_counter;
705 void *signature = NULL;
706 void *purpose = NULL;
707 size_t signature_size;
709 uint64_t fragment_id;
710 uint64_t fragment_offset;
712 uint64_t group_generation;
716 int ret = GNUNET_SYSERR;
717 struct GNUNET_MULTICAST_MessageHeader *mp;
719 struct GNUNET_PQ_ResultSpec results[] = {
720 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
721 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
722 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
723 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
724 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
725 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
726 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
727 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
728 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
729 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
730 GNUNET_PQ_result_spec_end
734 GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
738 LOG (GNUNET_ERROR_TYPE_DEBUG,
739 "Failing fragment lookup (postgres error)\n");
740 return GNUNET_SYSERR;
743 int nrows = PQntuples (res);
744 for (int row = 0; row < nrows; row++)
746 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
751 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
753 mp->header.size = htons (sizeof (*mp) + buf_size);
754 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
755 mp->hop_counter = htonl (hop_counter);
756 GNUNET_memcpy (&mp->signature,
759 GNUNET_memcpy (&mp->purpose,
762 mp->fragment_id = GNUNET_htonll (fragment_id);
763 mp->fragment_offset = GNUNET_htonll (fragment_offset);
764 mp->message_id = GNUNET_htonll (message_id);
765 mp->group_generation = GNUNET_htonll (group_generation);
766 mp->flags = htonl(msg_flags);
768 GNUNET_memcpy (&mp[1],
771 GNUNET_PQ_cleanup_result(results);
772 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
773 if (NULL != returned_fragments)
774 (*returned_fragments)++;
782 fragment_select (struct Plugin *plugin,
784 struct GNUNET_PQ_QueryParam *params,
785 uint64_t *returned_fragments,
786 GNUNET_PSYCSTORE_FragmentCallback cb,
790 int ret = GNUNET_SYSERR;
792 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
794 GNUNET_POSTGRES_check_result (plugin->dbh,
797 "PQexecPrepared", stmt))
799 if (PQntuples (res) == 0)
803 ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
812 * Retrieve a message fragment range by fragment ID.
814 * @see GNUNET_PSYCSTORE_fragment_get()
816 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
819 fragment_get (void *cls,
820 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
821 uint64_t first_fragment_id,
822 uint64_t last_fragment_id,
823 uint64_t *returned_fragments,
824 GNUNET_PSYCSTORE_FragmentCallback cb,
827 struct Plugin *plugin = cls;
828 struct GNUNET_PQ_QueryParam params_select[] = {
829 GNUNET_PQ_query_param_auto_from_type (channel_key),
830 GNUNET_PQ_query_param_uint64 (&first_fragment_id),
831 GNUNET_PQ_query_param_uint64 (&last_fragment_id),
832 GNUNET_PQ_query_param_end
835 *returned_fragments = 0;
836 return fragment_select (plugin,
845 * Retrieve a message fragment range by fragment ID.
847 * @see GNUNET_PSYCSTORE_fragment_get_latest()
849 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
852 fragment_get_latest (void *cls,
853 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
854 uint64_t fragment_limit,
855 uint64_t *returned_fragments,
856 GNUNET_PSYCSTORE_FragmentCallback cb,
859 struct Plugin *plugin = cls;
861 *returned_fragments = 0;
863 struct GNUNET_PQ_QueryParam params_select[] = {
864 GNUNET_PQ_query_param_auto_from_type (channel_key),
865 GNUNET_PQ_query_param_uint64 (&fragment_limit),
866 GNUNET_PQ_query_param_end
869 return fragment_select (plugin,
870 "select_latest_fragments",
878 * Retrieve all fragments of a message ID range.
880 * @see GNUNET_PSYCSTORE_message_get()
882 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
885 message_get (void *cls,
886 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
887 uint64_t first_message_id,
888 uint64_t last_message_id,
889 uint64_t fragment_limit,
890 uint64_t *returned_fragments,
891 GNUNET_PSYCSTORE_FragmentCallback cb,
894 struct Plugin *plugin = cls;
895 struct GNUNET_PQ_QueryParam params_select[] = {
896 GNUNET_PQ_query_param_auto_from_type (channel_key),
897 GNUNET_PQ_query_param_uint64 (&first_message_id),
898 GNUNET_PQ_query_param_uint64 (&last_message_id),
899 GNUNET_PQ_query_param_uint64 (&fragment_limit),
900 GNUNET_PQ_query_param_end
903 if (0 == fragment_limit)
904 fragment_limit = INT64_MAX;
905 *returned_fragments = 0;
906 return fragment_select (plugin,
915 * Retrieve all fragments of the latest messages.
917 * @see GNUNET_PSYCSTORE_message_get_latest()
919 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
922 message_get_latest (void *cls,
923 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
924 uint64_t message_limit,
925 uint64_t *returned_fragments,
926 GNUNET_PSYCSTORE_FragmentCallback cb,
929 struct Plugin *plugin = cls;
930 struct GNUNET_PQ_QueryParam params_select[] = {
931 GNUNET_PQ_query_param_auto_from_type (channel_key),
932 GNUNET_PQ_query_param_auto_from_type (channel_key),
933 GNUNET_PQ_query_param_uint64 (&message_limit),
934 GNUNET_PQ_query_param_end
937 *returned_fragments = 0;
938 return fragment_select (plugin,
939 "select_latest_messages",
947 * Retrieve a fragment of message specified by its message ID and fragment
950 * @see GNUNET_PSYCSTORE_message_get_fragment()
952 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
955 message_get_fragment (void *cls,
956 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
958 uint64_t fragment_offset,
959 GNUNET_PSYCSTORE_FragmentCallback cb,
963 struct Plugin *plugin = cls;
964 int ret = GNUNET_SYSERR;
965 const char *stmt = "select_message_fragment";
967 struct GNUNET_PQ_QueryParam params_select[] = {
968 GNUNET_PQ_query_param_auto_from_type (channel_key),
969 GNUNET_PQ_query_param_uint64 (&message_id),
970 GNUNET_PQ_query_param_uint64 (&fragment_offset),
971 GNUNET_PQ_query_param_end
974 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
975 if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh,
978 "PQexecPrepared", stmt))
980 if (PQntuples (res) == 0)
983 ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
992 * Retrieve the max. values of message counters for a channel.
994 * @see GNUNET_PSYCSTORE_counters_get()
996 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
999 counters_message_get (void *cls,
1000 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1001 uint64_t *max_fragment_id,
1002 uint64_t *max_message_id,
1003 uint64_t *max_group_generation)
1006 struct Plugin *plugin = cls;
1008 const char *stmt = "select_counters_message";
1010 struct GNUNET_PQ_QueryParam params_select[] = {
1011 GNUNET_PQ_query_param_auto_from_type (channel_key),
1012 GNUNET_PQ_query_param_end
1015 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1016 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1019 "PQexecPrepared", stmt))
1021 return GNUNET_SYSERR;
1024 struct GNUNET_PQ_ResultSpec results_select[] = {
1025 GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
1026 GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
1027 GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
1028 GNUNET_PQ_result_spec_end
1031 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0))
1034 return GNUNET_SYSERR;
1037 GNUNET_PQ_cleanup_result(results_select);
1044 * Retrieve the max. values of state counters for a channel.
1046 * @see GNUNET_PSYCSTORE_counters_get()
1048 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1051 counters_state_get (void *cls,
1052 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1053 uint64_t *max_state_message_id)
1056 struct Plugin *plugin = cls;
1058 const char *stmt = "select_counters_state";
1060 int ret = GNUNET_SYSERR;
1062 struct GNUNET_PQ_QueryParam params_select[] = {
1063 GNUNET_PQ_query_param_auto_from_type (channel_key),
1064 GNUNET_PQ_query_param_end
1067 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1068 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1071 "PQexecPrepared", stmt))
1073 return GNUNET_SYSERR;
1076 struct GNUNET_PQ_ResultSpec results_select[] = {
1077 GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1078 GNUNET_PQ_result_spec_end
1081 ret = GNUNET_PQ_extract_result (res, results_select, 0);
1083 if (GNUNET_OK != ret)
1086 return GNUNET_SYSERR;
1089 GNUNET_PQ_cleanup_result(results_select);
1097 * Assign a value to a state variable.
1099 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1102 state_assign (struct Plugin *plugin, const char *stmt,
1103 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1104 const char *name, const void *value, size_t value_size)
1106 struct GNUNET_PQ_QueryParam params[] = {
1107 GNUNET_PQ_query_param_auto_from_type (channel_key),
1108 GNUNET_PQ_query_param_string (name),
1109 GNUNET_PQ_query_param_fixed_size (value, value_size),
1110 GNUNET_PQ_query_param_end
1113 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
1114 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1115 return GNUNET_SYSERR;
1122 update_message_id (struct Plugin *plugin,
1124 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1125 uint64_t message_id)
1127 struct GNUNET_PQ_QueryParam params[] = {
1128 GNUNET_PQ_query_param_uint64 (&message_id),
1129 GNUNET_PQ_query_param_auto_from_type (channel_key),
1130 GNUNET_PQ_query_param_end
1133 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS !=
1134 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1135 return GNUNET_SYSERR;
1142 * Begin modifying current state.
1145 state_modify_begin (void *cls,
1146 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1147 uint64_t message_id, uint64_t state_delta)
1149 struct Plugin *plugin = cls;
1151 if (state_delta > 0)
1154 * We can only apply state modifiers in the current message if modifiers in
1155 * the previous stateful message (message_id - state_delta) were already
1159 uint64_t max_state_message_id = 0;
1160 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1164 case GNUNET_NO: // no state yet
1172 if (max_state_message_id < message_id - state_delta)
1173 return GNUNET_NO; /* some stateful messages not yet applied */
1174 else if (message_id - state_delta < max_state_message_id)
1175 return GNUNET_NO; /* changes already applied */
1178 if (TRANSACTION_NONE != plugin->transaction)
1180 /** @todo FIXME: wait for other transaction to finish */
1181 return GNUNET_SYSERR;
1183 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1188 * Set the current value of state variable.
1190 * @see GNUNET_PSYCSTORE_state_modify()
1192 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1195 state_modify_op (void *cls,
1196 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1197 enum GNUNET_PSYC_Operator op,
1198 const char *name, const void *value, size_t value_size)
1200 struct Plugin *plugin = cls;
1201 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1205 case GNUNET_PSYC_OP_ASSIGN:
1206 return state_assign (plugin, "insert_state_current",
1207 channel_key, name, value, value_size);
1209 default: /** @todo implement more state operations */
1211 return GNUNET_SYSERR;
1217 * End modifying current state.
1220 state_modify_end (void *cls,
1221 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222 uint64_t message_id)
1224 struct Plugin *plugin = cls;
1225 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1228 GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1229 && GNUNET_OK == update_message_id (plugin,
1230 "update_max_state_message_id",
1231 channel_key, message_id)
1232 && GNUNET_OK == transaction_commit (plugin)
1233 ? GNUNET_OK : GNUNET_SYSERR;
1238 * Begin state synchronization.
1241 state_sync_begin (void *cls,
1242 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1244 struct Plugin *plugin = cls;
1245 return exec_channel (plugin, "delete_state_sync", channel_key);
1250 * Assign current value of a state variable.
1252 * @see GNUNET_PSYCSTORE_state_modify()
1254 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1257 state_sync_assign (void *cls,
1258 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1259 const char *name, const void *value, size_t value_size)
1261 struct Plugin *plugin = cls;
1262 return state_assign (plugin, "insert_state_sync",
1263 channel_key, name, value, value_size);
1268 * End modifying current state.
1271 state_sync_end (void *cls,
1272 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1273 uint64_t max_state_message_id,
1274 uint64_t state_hash_message_id)
1276 struct Plugin *plugin = cls;
1277 int ret = GNUNET_SYSERR;
1279 if (TRANSACTION_NONE != plugin->transaction)
1281 /** @todo FIXME: wait for other transaction to finish */
1282 return GNUNET_SYSERR;
1285 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1286 && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1287 && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1289 && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1291 && GNUNET_OK == update_message_id (plugin,
1292 "update_state_hash_message_id",
1293 channel_key, state_hash_message_id)
1294 && GNUNET_OK == update_message_id (plugin,
1295 "update_max_state_message_id",
1296 channel_key, max_state_message_id)
1297 && GNUNET_OK == transaction_commit (plugin)
1299 : transaction_rollback (plugin);
1305 * Delete the whole state.
1307 * @see GNUNET_PSYCSTORE_state_reset()
1309 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1312 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1314 struct Plugin *plugin = cls;
1315 return exec_channel (plugin, "delete_state", channel_key);
1320 * Update signed values of state variables in the state store.
1322 * @see GNUNET_PSYCSTORE_state_hash_update()
1324 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1327 state_update_signed (void *cls,
1328 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1330 struct Plugin *plugin = cls;
1331 return exec_channel (plugin, "update_state_signed", channel_key);
1336 * Retrieve a state variable by name.
1338 * @see GNUNET_PSYCSTORE_state_get()
1340 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1343 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1344 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1348 struct Plugin *plugin = cls;
1349 int ret = GNUNET_SYSERR;
1351 const char *stmt = "select_state_one";
1353 struct GNUNET_PQ_QueryParam params_select[] = {
1354 GNUNET_PQ_query_param_auto_from_type (channel_key),
1355 GNUNET_PQ_query_param_string (name),
1356 GNUNET_PQ_query_param_end
1359 void *value_current = NULL;
1360 size_t value_size = 0;
1362 struct GNUNET_PQ_ResultSpec results[] = {
1363 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1364 GNUNET_PQ_result_spec_end
1367 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1368 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1371 "PQexecPrepared", stmt))
1373 return GNUNET_SYSERR;
1376 if (PQntuples (res) == 0)
1382 ret = GNUNET_PQ_extract_result (res, results, 0);
1384 if (GNUNET_OK != ret)
1387 return GNUNET_SYSERR;
1390 ret = cb (cb_cls, name, value_current,
1393 GNUNET_PQ_cleanup_result(results);
1401 * Retrieve all state variables for a channel with the given prefix.
1403 * @see GNUNET_PSYCSTORE_state_get_prefix()
1405 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1408 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1409 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1413 struct Plugin *plugin = cls;
1414 int ret = GNUNET_NO;
1416 const char *stmt = "select_state_prefix";
1418 uint32_t name_len = (uint32_t) strlen (name);
1420 struct GNUNET_PQ_QueryParam params_select[] = {
1421 GNUNET_PQ_query_param_auto_from_type (channel_key),
1422 GNUNET_PQ_query_param_string (name),
1423 GNUNET_PQ_query_param_uint32 (&name_len),
1424 GNUNET_PQ_query_param_string (name),
1425 GNUNET_PQ_query_param_end
1429 void *value_current = NULL;
1430 size_t value_size = 0;
1432 struct GNUNET_PQ_ResultSpec results[] = {
1433 GNUNET_PQ_result_spec_string ("name", &name2),
1434 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1435 GNUNET_PQ_result_spec_end
1438 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1439 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1442 "PQexecPrepared", stmt))
1444 return GNUNET_SYSERR;
1447 int nrows = PQntuples (res);
1448 for (int row = 0; row < nrows; row++)
1450 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1455 ret = cb (cb_cls, (const char *) name2,
1458 GNUNET_PQ_cleanup_result(results);
1468 * Retrieve all signed state variables for a channel.
1470 * @see GNUNET_PSYCSTORE_state_get_signed()
1472 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1475 state_get_signed (void *cls,
1476 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1477 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1480 struct Plugin *plugin = cls;
1481 int ret = GNUNET_NO;
1483 const char *stmt = "select_state_signed";
1485 struct GNUNET_PQ_QueryParam params_select[] = {
1486 GNUNET_PQ_query_param_auto_from_type (channel_key),
1487 GNUNET_PQ_query_param_end
1491 void *value_signed = NULL;
1492 size_t value_size = 0;
1494 struct GNUNET_PQ_ResultSpec results[] = {
1495 GNUNET_PQ_result_spec_string ("name", &name),
1496 GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size),
1497 GNUNET_PQ_result_spec_end
1500 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1501 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1504 "PQexecPrepared", stmt))
1506 return GNUNET_SYSERR;
1509 int nrows = PQntuples (res);
1510 for (int row = 0; row < nrows; row++)
1512 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1517 ret = cb (cb_cls, (const char *) name,
1521 GNUNET_PQ_cleanup_result (results);
1531 * Entry point for the plugin.
1533 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1534 * @return NULL on error, otherwise the plugin context
1537 libgnunet_plugin_psycstore_postgres_init (void *cls)
1539 static struct Plugin plugin;
1540 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1541 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1543 if (NULL != plugin.cfg)
1544 return NULL; /* can only initialize once! */
1545 memset (&plugin, 0, sizeof (struct Plugin));
1547 if (GNUNET_OK != database_setup (&plugin))
1549 database_shutdown (&plugin);
1552 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1554 api->membership_store = &postgres_membership_store;
1555 api->membership_test = &membership_test;
1556 api->fragment_store = &fragment_store;
1557 api->message_add_flags = &message_add_flags;
1558 api->fragment_get = &fragment_get;
1559 api->fragment_get_latest = &fragment_get_latest;
1560 api->message_get = &message_get;
1561 api->message_get_latest = &message_get_latest;
1562 api->message_get_fragment = &message_get_fragment;
1563 api->counters_message_get = &counters_message_get;
1564 api->counters_state_get = &counters_state_get;
1565 api->state_modify_begin = &state_modify_begin;
1566 api->state_modify_op = &state_modify_op;
1567 api->state_modify_end = &state_modify_end;
1568 api->state_sync_begin = &state_sync_begin;
1569 api->state_sync_assign = &state_sync_assign;
1570 api->state_sync_end = &state_sync_end;
1571 api->state_reset = &state_reset;
1572 api->state_update_signed = &state_update_signed;
1573 api->state_get = &state_get;
1574 api->state_get_prefix = &state_get_prefix;
1575 api->state_get_signed = &state_get_signed;
1577 LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1583 * Exit point from the plugin.
1585 * @param cls The plugin context (as returned by "init")
1586 * @return Always NULL
1589 libgnunet_plugin_psycstore_postgres_done (void *cls)
1591 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1592 struct Plugin *plugin = api->cls;
1594 database_shutdown (plugin);
1597 LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1601 /* end of plugin_psycstore_postgres.c */