2 * This file is part of GNUnet
3 * Copyright (C) 2016 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/plugin_psycstore_postgres.c
23 * @brief PostgresQL-based psycstore backend
24 * @author Daniel Golle
25 * @author Gabor X Toth
26 * @author Christian Grothoff
27 * @author Christophe Genevey
31 #include "gnunet_psycstore_plugin.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "psycstore.h"
37 #include "gnunet_postgres_lib.h"
38 #include "gnunet_pq_lib.h"
41 * After how many ms "busy" should a DB operation fail for good? A
42 * low value makes sure that we are more responsive to requests
43 * (especially PUTs). A high value guarantees a higher success rate
44 * (SELECTs in iterate can take several seconds despite LIMIT=1).
46 * The default value of 1s should ensure that users do not experience
47 * huge latencies while at the same time allowing operations to
48 * succeed with reasonable probability.
50 #define BUSY_TIMEOUT_MS 1000
52 #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
54 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
58 TRANSACTION_STATE_MODIFY,
59 TRANSACTION_STATE_SYNC,
63 * Context for all functions in this plugin.
68 const struct GNUNET_CONFIGURATION_Handle *cfg;
71 * Native Postgres database handle.
75 enum Transactions transaction;
82 * Initialize the database connections and associated
83 * data structures (create tables and indices
86 * @param plugin the plugin context (state for this module)
87 * @return GNUNET_OK on success
90 database_setup (struct Plugin *plugin)
92 /* Open database and precompile statements */
93 plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg,
94 "psycstore-postgres");
95 if (NULL == plugin->dbh)
100 GNUNET_POSTGRES_exec(plugin->dbh,
101 "CREATE TABLE IF NOT EXISTS channels (\n"
103 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
104 " max_state_message_id BIGINT,\n"
105 " state_hash_message_id BIGINT,\n"
110 GNUNET_POSTGRES_exec(plugin->dbh,
111 "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
112 " ON channels (pub_key)")) ||
115 GNUNET_POSTGRES_exec(plugin->dbh,
116 "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
117 " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
118 "RETURNS NULL ON NULL INPUT")) ||
121 GNUNET_POSTGRES_exec(plugin->dbh,
122 "CREATE TABLE IF NOT EXISTS slaves (\n"
124 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
129 GNUNET_POSTGRES_exec(plugin->dbh,
130 "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
131 " ON slaves (pub_key)")) ||
134 GNUNET_POSTGRES_exec(plugin->dbh,
135 "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
136 " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
137 "RETURNS NULL ON NULL INPUT")) ||
140 GNUNET_POSTGRES_exec(plugin->dbh,
141 "CREATE TABLE IF NOT EXISTS membership (\n"
142 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
143 " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
144 " did_join INT NOT NULL,\n"
145 " announced_at BIGINT NOT NULL,\n"
146 " effective_since BIGINT NOT NULL,\n"
147 " group_generation BIGINT NOT NULL\n"
151 GNUNET_POSTGRES_exec(plugin->dbh,
152 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
153 "ON membership (channel_id, slave_id)")) ||
155 /** @todo messages table: add method_name column */
157 GNUNET_POSTGRES_exec(plugin->dbh,
158 "CREATE TABLE IF NOT EXISTS messages (\n"
159 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
160 " hop_counter INT NOT NULL,\n"
161 " signature BYTEA CHECK (LENGTH(signature)=64),\n"
162 " purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
163 " fragment_id BIGINT NOT NULL,\n"
164 " fragment_offset BIGINT NOT NULL,\n"
165 " message_id BIGINT NOT NULL,\n"
166 " group_generation BIGINT NOT NULL,\n"
167 " multicast_flags INT NOT NULL,\n"
168 " psycstore_flags INT NOT NULL,\n"
170 " PRIMARY KEY (channel_id, fragment_id),\n"
171 " UNIQUE (channel_id, message_id, fragment_offset)\n"
175 GNUNET_POSTGRES_exec(plugin->dbh,
176 "CREATE TABLE IF NOT EXISTS state (\n"
177 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
178 " name TEXT NOT NULL,\n"
179 " value_current BYTEA,\n"
180 " value_signed BYTEA,\n"
181 " PRIMARY KEY (channel_id, name)\n"
184 GNUNET_POSTGRES_exec(plugin->dbh,
185 "CREATE TABLE IF NOT EXISTS state_sync (\n"
186 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
187 " name TEXT NOT NULL,\n"
189 " PRIMARY KEY (channel_id, name)\n"
192 PQfinish (plugin->dbh);
194 return GNUNET_SYSERR;
198 /* Prepare statements */
199 if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
203 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
204 "transaction_commit",
207 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
208 "transaction_rollback",
211 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
212 "insert_channel_key",
213 "INSERT INTO channels (pub_key) VALUES ($1)"
214 " ON CONFLICT DO NOTHING", 1)) ||
216 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
218 "INSERT INTO slaves (pub_key) VALUES ($1)"
219 " ON CONFLICT DO NOTHING", 1)) ||
221 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
223 "INSERT INTO membership\n"
224 " (channel_id, slave_id, did_join, announced_at,\n"
225 " effective_since, group_generation)\n"
226 "VALUES (get_chan_id($1),\n"
227 " get_slave_id($2),\n"
228 " $3, $4, $5, $6)", 6)) ||
230 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
232 "SELECT did_join FROM membership\n"
233 "WHERE channel_id = get_chan_id($1)\n"
234 " AND slave_id = get_slave_id($2)\n"
235 " AND effective_since <= $3 AND did_join = 1\n"
236 "ORDER BY announced_at DESC LIMIT 1", 3)) ||
238 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
240 "INSERT INTO messages\n"
241 " (channel_id, hop_counter, signature, purpose,\n"
242 " fragment_id, fragment_offset, message_id,\n"
243 " group_generation, multicast_flags, psycstore_flags, data)\n"
244 "VALUES (get_chan_id($1),\n"
245 " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
246 "ON CONFLICT DO NOTHING", 11)) ||
248 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
249 "update_message_flags",
251 "SET psycstore_flags = psycstore_flags | $1\n"
252 "WHERE channel_id = get_chan_id($2) \n"
253 " AND message_id = $3 AND fragment_offset = 0", 3)) ||
255 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
257 "SELECT hop_counter, signature, purpose, fragment_id,\n"
258 " fragment_offset, message_id, group_generation,\n"
259 " multicast_flags, psycstore_flags, data\n"
261 "WHERE channel_id = get_chan_id($1) \n"
262 " AND $2 <= fragment_id AND fragment_id <= $3", 3)) ||
264 /** @todo select_messages: add method_prefix filter */
265 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
267 "SELECT hop_counter, signature, purpose, fragment_id,\n"
268 " fragment_offset, message_id, group_generation,\n"
269 " multicast_flags, psycstore_flags, data\n"
271 "WHERE channel_id = get_chan_id($1) \n"
272 " AND $2 <= message_id AND message_id <= $3\n"
275 /** @todo select_latest_messages: add method_prefix filter */
276 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
277 "select_latest_fragments",
278 "SELECT rev.hop_counter AS hop_counter,\n"
279 " rev.signature AS signature,\n"
280 " rev.purpose AS purpose,\n"
281 " rev.fragment_id AS fragment_id,\n"
282 " rev.fragment_offset AS fragment_offset,\n"
283 " rev.message_id AS message_id,\n"
284 " rev.group_generation AS group_generation,\n"
285 " rev.multicast_flags AS multicast_flags,\n"
286 " rev.psycstore_flags AS psycstore_flags,\n"
287 " rev.data AS data\n"
289 " (SELECT hop_counter, signature, purpose, fragment_id,\n"
290 " fragment_offset, message_id, group_generation,\n"
291 " multicast_flags, psycstore_flags, data \n"
293 " WHERE channel_id = get_chan_id($1) \n"
294 " ORDER BY fragment_id DESC\n"
295 " LIMIT $2) AS rev\n"
296 " ORDER BY rev.fragment_id;", 2)) ||
298 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
299 "select_latest_messages",
300 "SELECT hop_counter, signature, purpose, fragment_id,\n"
301 " fragment_offset, message_id, group_generation,\n"
302 " multicast_flags, psycstore_flags, data\n"
304 "WHERE channel_id = get_chan_id($1)\n"
305 " AND message_id IN\n"
306 " (SELECT message_id\n"
308 " WHERE channel_id = get_chan_id($2) \n"
309 " GROUP BY message_id\n"
310 " ORDER BY message_id\n"
312 "ORDER BY fragment_id", 3)) ||
314 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
315 "select_message_fragment",
316 "SELECT hop_counter, signature, purpose, fragment_id,\n"
317 " fragment_offset, message_id, group_generation,\n"
318 " multicast_flags, psycstore_flags, data\n"
320 "WHERE channel_id = get_chan_id($1) \n"
321 " AND message_id = $2 AND fragment_offset = $3", 3)) ||
323 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
324 "select_counters_message",
325 "SELECT fragment_id, message_id, group_generation\n"
327 "WHERE channel_id = get_chan_id($1)\n"
328 "ORDER BY fragment_id DESC LIMIT 1", 1)) ||
330 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
331 "select_counters_state",
332 "SELECT max_state_message_id\n"
334 "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) ||
336 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
337 "update_max_state_message_id",
339 "SET max_state_message_id = $1\n"
340 "WHERE pub_key = $2", 2)) ||
342 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
343 "update_state_hash_message_id",
345 "SET state_hash_message_id = $1\n"
346 "WHERE pub_key = $2", 2)) ||
348 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
349 "insert_state_current",
350 "INSERT INTO state\n"
351 " (channel_id, name, value_current, value_signed)\n"
352 "SELECT new.channel_id, new.name,\n"
353 " new.value_current, old.value_signed\n"
354 "FROM (SELECT get_chan_id($1) AS channel_id,\n"
355 " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
356 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
357 " FROM state) AS old\n"
358 "ON new.channel_id = old.channel_id AND new.name = old.name\n"
359 "ON CONFLICT (channel_id, name)\n"
360 " DO UPDATE SET value_current = EXCLUDED.value_current,\n"
361 " value_signed = EXCLUDED.value_signed", 3)) ||
363 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
364 "delete_state_empty",
365 "DELETE FROM state\n"
366 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
367 " AND (value_current IS NULL OR length(value_current) = 0)\n"
368 " AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) ||
370 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
371 "update_state_signed",
373 "SET value_signed = value_current\n"
374 "WHERE channel_id = get_chan_id($1) ", 1)) ||
376 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
378 "DELETE FROM state\n"
379 "WHERE channel_id = get_chan_id($1) ", 1)) ||
381 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
383 "INSERT INTO state_sync (channel_id, name, value)\n"
384 "VALUES (get_chan_id($1), $2, $3)", 3)) ||
386 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
387 "insert_state_from_sync",
388 "INSERT INTO state\n"
389 " (channel_id, name, value_current, value_signed)\n"
390 "SELECT channel_id, name, value, value\n"
392 "WHERE channel_id = get_chan_id($1)", 1)) ||
394 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
396 "DELETE FROM state_sync\n"
397 "WHERE channel_id = get_chan_id($1)", 1)) ||
399 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
401 "SELECT value_current\n"
403 "WHERE channel_id = get_chan_id($1)\n"
404 " AND name = $2", 2)) ||
406 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
407 "select_state_prefix",
408 "SELECT name, value_current\n"
410 "WHERE channel_id = get_chan_id($1)\n"
411 " AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) ||
413 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
414 "select_state_signed",
415 "SELECT name, value_signed\n"
417 "WHERE channel_id = get_chan_id($1)\n"
418 " AND value_signed IS NOT NULL", 1)))
420 PQfinish (plugin->dbh);
422 return GNUNET_SYSERR;
430 * Shutdown database connection and associate data
432 * @param plugin the plugin context (state for this module)
435 database_shutdown (struct Plugin *plugin)
437 PQfinish (plugin->dbh);
443 * Execute a prepared statement with a @a channel_key argument.
445 * @param plugin Plugin handle.
446 * @param stmt Statement to execute.
447 * @param channel_key Public key of the channel.
449 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
452 exec_channel (struct Plugin *plugin, const char *stmt,
453 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
456 struct GNUNET_PQ_QueryParam params[] = {
457 GNUNET_PQ_query_param_auto_from_type (channel_key),
458 GNUNET_PQ_query_param_end
461 ret = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
463 GNUNET_POSTGRES_check_result (plugin->dbh,
466 "PQexecPrepared", stmt))
467 return GNUNET_SYSERR;
476 * Begin a transaction.
479 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
482 struct GNUNET_PQ_QueryParam params[] = {
483 GNUNET_PQ_query_param_end
486 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_begin", params);
488 GNUNET_POSTGRES_check_result (plugin->dbh,
491 "PQexecPrepared", "transaction_begin"))
493 return GNUNET_SYSERR;
496 plugin->transaction = transaction;
503 * Commit current transaction.
506 transaction_commit (struct Plugin *plugin)
510 struct GNUNET_PQ_QueryParam params[] = {
511 GNUNET_PQ_query_param_end
514 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_commit", params);
516 GNUNET_POSTGRES_check_result (plugin->dbh,
519 "PQexecPrepared", "transaction_commit"))
521 return GNUNET_SYSERR;
525 plugin->transaction = TRANSACTION_NONE;
531 * Roll back current transaction.
534 transaction_rollback (struct Plugin *plugin)
538 struct GNUNET_PQ_QueryParam params[] = {
539 GNUNET_PQ_query_param_end
542 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_rollback", params);
544 GNUNET_POSTGRES_check_result (plugin->dbh,
547 "PQexecPrepared", "transaction_rollback"))
549 return GNUNET_SYSERR;
553 plugin->transaction = TRANSACTION_NONE;
559 channel_key_store (struct Plugin *plugin,
560 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
564 struct GNUNET_PQ_QueryParam params[] = {
565 GNUNET_PQ_query_param_auto_from_type (channel_key),
566 GNUNET_PQ_query_param_end
569 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_channel_key", params);
571 GNUNET_POSTGRES_check_result (plugin->dbh,
574 "PQexecPrepared", "insert_channel_key"))
576 return GNUNET_SYSERR;
585 slave_key_store (struct Plugin *plugin,
586 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
590 struct GNUNET_PQ_QueryParam params[] = {
591 GNUNET_PQ_query_param_auto_from_type (slave_key),
592 GNUNET_PQ_query_param_end
595 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params);
597 GNUNET_POSTGRES_check_result (plugin->dbh,
600 "PQexecPrepared", "insert_slave_key"))
602 return GNUNET_SYSERR;
611 * Store join/leave events for a PSYC channel in order to be able to answer
612 * membership test queries later.
614 * @see GNUNET_PSYCSTORE_membership_store()
616 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
619 postgres_membership_store (void *cls,
620 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
621 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
623 uint64_t announced_at,
624 uint64_t effective_since,
625 uint64_t group_generation)
628 struct Plugin *plugin = cls;
630 uint32_t idid_join = (uint32_t)did_join;
632 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
634 if (announced_at > INT64_MAX ||
635 effective_since > INT64_MAX ||
636 group_generation > INT64_MAX)
639 return GNUNET_SYSERR;
642 if (GNUNET_OK != channel_key_store (plugin, channel_key)
643 || GNUNET_OK != slave_key_store (plugin, slave_key))
644 return GNUNET_SYSERR;
646 struct GNUNET_PQ_QueryParam params[] = {
647 GNUNET_PQ_query_param_auto_from_type (channel_key),
648 GNUNET_PQ_query_param_auto_from_type (slave_key),
649 GNUNET_PQ_query_param_uint32 (&idid_join),
650 GNUNET_PQ_query_param_uint64 (&announced_at),
651 GNUNET_PQ_query_param_uint64 (&effective_since),
652 GNUNET_PQ_query_param_uint64 (&group_generation),
653 GNUNET_PQ_query_param_end
656 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_membership", params);
658 GNUNET_POSTGRES_check_result (plugin->dbh,
661 "PQexecPrepared", "insert_membership"))
663 return GNUNET_SYSERR;
671 * Test if a member was admitted to the channel at the given message ID.
673 * @see GNUNET_PSYCSTORE_membership_test()
675 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
676 * #GNUNET_SYSERR if there was en error.
679 membership_test (void *cls,
680 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
681 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
685 struct Plugin *plugin = cls;
687 uint32_t did_join = 0;
689 int ret = GNUNET_SYSERR;
691 struct GNUNET_PQ_QueryParam params_select[] = {
692 GNUNET_PQ_query_param_auto_from_type (channel_key),
693 GNUNET_PQ_query_param_auto_from_type (slave_key),
694 GNUNET_PQ_query_param_uint64 (&message_id),
695 GNUNET_PQ_query_param_end
698 res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select);
700 GNUNET_POSTGRES_check_result (plugin->dbh,
703 "PQexecPrepared", "select_membership"))
705 return GNUNET_SYSERR;
708 struct GNUNET_PQ_ResultSpec results_select[] = {
709 GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
710 GNUNET_PQ_result_spec_end
713 switch (GNUNET_PQ_extract_result (res, results_select, 0))
730 * Store a message fragment sent to a channel.
732 * @see GNUNET_PSYCSTORE_fragment_store()
734 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
737 fragment_store (void *cls,
738 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
739 const struct GNUNET_MULTICAST_MessageHeader *msg,
740 uint32_t psycstore_flags)
743 struct Plugin *plugin = cls;
745 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
747 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
749 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
750 uint64_t message_id = GNUNET_ntohll (msg->message_id);
751 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
753 uint32_t hop_counter = ntohl(msg->hop_counter);
754 uint32_t flags = ntohl(msg->flags);
756 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
757 message_id > INT64_MAX || group_generation > INT64_MAX)
759 LOG(GNUNET_ERROR_TYPE_ERROR,
760 "Tried to store fragment with a field > INT64_MAX: "
761 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
762 message_id, group_generation);
764 return GNUNET_SYSERR;
767 if (GNUNET_OK != channel_key_store (plugin, channel_key))
768 return GNUNET_SYSERR;
770 struct GNUNET_PQ_QueryParam params_insert[] = {
771 GNUNET_PQ_query_param_auto_from_type (channel_key),
772 GNUNET_PQ_query_param_uint32 (&hop_counter),
773 GNUNET_PQ_query_param_auto_from_type (&msg->signature),
774 GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
775 GNUNET_PQ_query_param_uint64 (&fragment_id),
776 GNUNET_PQ_query_param_uint64 (&fragment_offset),
777 GNUNET_PQ_query_param_uint64 (&message_id),
778 GNUNET_PQ_query_param_uint64 (&group_generation),
779 GNUNET_PQ_query_param_uint32 (&flags),
780 GNUNET_PQ_query_param_uint32 (&psycstore_flags),
781 GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
782 GNUNET_PQ_query_param_end
785 res = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_fragment", params_insert);
787 GNUNET_POSTGRES_check_result (plugin->dbh,
790 "PQexecPrepared", "insert_fragment"))
791 return GNUNET_SYSERR;
798 * Set additional flags for a given message.
800 * They are OR'd with any existing flags set.
802 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
805 message_add_flags (void *cls,
806 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
808 uint32_t psycstore_flags)
811 struct Plugin *plugin = cls;
813 struct GNUNET_PQ_QueryParam params_update[] = {
814 GNUNET_PQ_query_param_uint32 (&psycstore_flags),
815 GNUNET_PQ_query_param_auto_from_type (channel_key),
816 GNUNET_PQ_query_param_uint64 (&message_id),
817 GNUNET_PQ_query_param_end
820 res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update);
821 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
824 "PQexecPrepared","update_message_flags"))
825 return GNUNET_SYSERR;
833 fragment_row (struct Plugin *plugin,
836 GNUNET_PSYCSTORE_FragmentCallback cb,
838 uint64_t *returned_fragments)
840 uint32_t hop_counter;
841 void *signature = NULL;
842 void *purpose = NULL;
843 size_t signature_size;
846 uint64_t fragment_id;
847 uint64_t fragment_offset;
849 uint64_t group_generation;
853 int ret = GNUNET_SYSERR;
854 struct GNUNET_MULTICAST_MessageHeader *mp;
858 struct GNUNET_PQ_ResultSpec results[] = {
859 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
860 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
861 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
862 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
863 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
864 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
865 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
866 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
867 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
868 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
869 GNUNET_PQ_result_spec_end
873 GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
877 LOG (GNUNET_ERROR_TYPE_DEBUG,
878 "Failing fragment lookup (postgres error)\n");
879 return GNUNET_SYSERR;
882 int nrows = PQntuples (res);
883 for (int row = 0; row < nrows; row++)
885 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
890 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
892 mp->header.size = htons (sizeof (*mp) + buf_size);
893 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
894 mp->hop_counter = htonl (hop_counter);
895 GNUNET_memcpy (&mp->signature,
898 GNUNET_memcpy (&mp->purpose,
901 mp->fragment_id = GNUNET_htonll (fragment_id);
902 mp->fragment_offset = GNUNET_htonll (fragment_offset);
903 mp->message_id = GNUNET_htonll (message_id);
904 mp->group_generation = GNUNET_htonll (group_generation);
905 mp->flags = htonl(msg_flags);
907 GNUNET_memcpy (&mp[1],
910 GNUNET_PQ_cleanup_result(results);
911 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
912 if (NULL != returned_fragments)
913 (*returned_fragments)++;
921 fragment_select (struct Plugin *plugin,
923 struct GNUNET_PQ_QueryParam *params,
924 uint64_t *returned_fragments,
925 GNUNET_PSYCSTORE_FragmentCallback cb,
929 int ret = GNUNET_SYSERR;
931 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
933 GNUNET_POSTGRES_check_result (plugin->dbh,
936 "PQexecPrepared", stmt))
938 if (PQntuples (res) == 0)
942 ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
951 * Retrieve a message fragment range by fragment ID.
953 * @see GNUNET_PSYCSTORE_fragment_get()
955 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
958 fragment_get (void *cls,
959 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
960 uint64_t first_fragment_id,
961 uint64_t last_fragment_id,
962 uint64_t *returned_fragments,
963 GNUNET_PSYCSTORE_FragmentCallback cb,
966 struct Plugin *plugin = cls;
967 *returned_fragments = 0;
969 struct GNUNET_PQ_QueryParam params_select[] = {
970 GNUNET_PQ_query_param_auto_from_type (channel_key),
971 GNUNET_PQ_query_param_uint64 (&first_fragment_id),
972 GNUNET_PQ_query_param_uint64 (&last_fragment_id),
973 GNUNET_PQ_query_param_end
976 return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls);
981 * Retrieve a message fragment range by fragment ID.
983 * @see GNUNET_PSYCSTORE_fragment_get_latest()
985 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
988 fragment_get_latest (void *cls,
989 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
990 uint64_t fragment_limit,
991 uint64_t *returned_fragments,
992 GNUNET_PSYCSTORE_FragmentCallback cb,
995 struct Plugin *plugin = cls;
997 *returned_fragments = 0;
999 struct GNUNET_PQ_QueryParam params_select[] = {
1000 GNUNET_PQ_query_param_auto_from_type (channel_key),
1001 GNUNET_PQ_query_param_uint64 (&fragment_limit),
1002 GNUNET_PQ_query_param_end
1005 return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls);
1010 * Retrieve all fragments of a message ID range.
1012 * @see GNUNET_PSYCSTORE_message_get()
1014 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1017 message_get (void *cls,
1018 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1019 uint64_t first_message_id,
1020 uint64_t last_message_id,
1021 uint64_t fragment_limit,
1022 uint64_t *returned_fragments,
1023 GNUNET_PSYCSTORE_FragmentCallback cb,
1026 struct Plugin *plugin = cls;
1027 *returned_fragments = 0;
1029 if (0 == fragment_limit)
1030 fragment_limit = INT64_MAX;
1032 struct GNUNET_PQ_QueryParam params_select[] = {
1033 GNUNET_PQ_query_param_auto_from_type (channel_key),
1034 GNUNET_PQ_query_param_uint64 (&first_message_id),
1035 GNUNET_PQ_query_param_uint64 (&last_message_id),
1036 GNUNET_PQ_query_param_uint64 (&fragment_limit),
1037 GNUNET_PQ_query_param_end
1040 return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls);
1045 * Retrieve all fragments of the latest messages.
1047 * @see GNUNET_PSYCSTORE_message_get_latest()
1049 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1052 message_get_latest (void *cls,
1053 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1054 uint64_t message_limit,
1055 uint64_t *returned_fragments,
1056 GNUNET_PSYCSTORE_FragmentCallback cb,
1059 struct Plugin *plugin = cls;
1060 *returned_fragments = 0;
1062 struct GNUNET_PQ_QueryParam params_select[] = {
1063 GNUNET_PQ_query_param_auto_from_type (channel_key),
1064 GNUNET_PQ_query_param_auto_from_type (channel_key),
1065 GNUNET_PQ_query_param_uint64 (&message_limit),
1066 GNUNET_PQ_query_param_end
1069 return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls);
1074 * Retrieve a fragment of message specified by its message ID and fragment
1077 * @see GNUNET_PSYCSTORE_message_get_fragment()
1079 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1082 message_get_fragment (void *cls,
1083 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1084 uint64_t message_id,
1085 uint64_t fragment_offset,
1086 GNUNET_PSYCSTORE_FragmentCallback cb,
1090 struct Plugin *plugin = cls;
1091 int ret = GNUNET_SYSERR;
1092 const char *stmt = "select_message_fragment";
1094 struct GNUNET_PQ_QueryParam params_select[] = {
1095 GNUNET_PQ_query_param_auto_from_type (channel_key),
1096 GNUNET_PQ_query_param_uint64 (&message_id),
1097 GNUNET_PQ_query_param_uint64 (&fragment_offset),
1098 GNUNET_PQ_query_param_end
1101 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1102 if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh,
1105 "PQexecPrepared", stmt))
1107 if (PQntuples (res) == 0)
1110 ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
1119 * Retrieve the max. values of message counters for a channel.
1121 * @see GNUNET_PSYCSTORE_counters_get()
1123 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1126 counters_message_get (void *cls,
1127 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1128 uint64_t *max_fragment_id,
1129 uint64_t *max_message_id,
1130 uint64_t *max_group_generation)
1133 struct Plugin *plugin = cls;
1135 const char *stmt = "select_counters_message";
1137 struct GNUNET_PQ_QueryParam params_select[] = {
1138 GNUNET_PQ_query_param_auto_from_type (channel_key),
1139 GNUNET_PQ_query_param_end
1142 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1143 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1146 "PQexecPrepared", stmt))
1148 return GNUNET_SYSERR;
1151 struct GNUNET_PQ_ResultSpec results_select[] = {
1152 GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
1153 GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
1154 GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
1155 GNUNET_PQ_result_spec_end
1158 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0))
1161 return GNUNET_SYSERR;
1164 GNUNET_PQ_cleanup_result(results_select);
1171 * Retrieve the max. values of state counters for a channel.
1173 * @see GNUNET_PSYCSTORE_counters_get()
1175 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1178 counters_state_get (void *cls,
1179 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1180 uint64_t *max_state_message_id)
1183 struct Plugin *plugin = cls;
1185 const char *stmt = "select_counters_state";
1187 int ret = GNUNET_SYSERR;
1189 struct GNUNET_PQ_QueryParam params_select[] = {
1190 GNUNET_PQ_query_param_auto_from_type (channel_key),
1191 GNUNET_PQ_query_param_end
1194 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1195 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1198 "PQexecPrepared", stmt))
1200 return GNUNET_SYSERR;
1203 struct GNUNET_PQ_ResultSpec results_select[] = {
1204 GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1205 GNUNET_PQ_result_spec_end
1208 ret = GNUNET_PQ_extract_result (res, results_select, 0);
1210 if (GNUNET_OK != ret)
1213 return GNUNET_SYSERR;
1216 GNUNET_PQ_cleanup_result(results_select);
1224 * Assign a value to a state variable.
1226 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1229 state_assign (struct Plugin *plugin, const char *stmt,
1230 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1231 const char *name, const void *value, size_t value_size)
1235 struct GNUNET_PQ_QueryParam params[] = {
1236 GNUNET_PQ_query_param_auto_from_type (channel_key),
1237 GNUNET_PQ_query_param_string (name),
1238 GNUNET_PQ_query_param_fixed_size (value, value_size),
1239 GNUNET_PQ_query_param_end
1242 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
1243 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1246 "PQexecPrepared", stmt))
1248 return GNUNET_SYSERR;
1258 update_message_id (struct Plugin *plugin, const char *stmt,
1259 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1260 uint64_t message_id)
1264 struct GNUNET_PQ_QueryParam params[] = {
1265 GNUNET_PQ_query_param_uint64 (&message_id),
1266 GNUNET_PQ_query_param_auto_from_type (channel_key),
1267 GNUNET_PQ_query_param_end
1270 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
1271 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1274 "PQexecPrepared", stmt))
1276 return GNUNET_SYSERR;
1286 * Begin modifying current state.
1289 state_modify_begin (void *cls,
1290 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1291 uint64_t message_id, uint64_t state_delta)
1293 struct Plugin *plugin = cls;
1295 if (state_delta > 0)
1298 * We can only apply state modifiers in the current message if modifiers in
1299 * the previous stateful message (message_id - state_delta) were already
1303 uint64_t max_state_message_id = 0;
1304 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1308 case GNUNET_NO: // no state yet
1316 if (max_state_message_id < message_id - state_delta)
1317 return GNUNET_NO; /* some stateful messages not yet applied */
1318 else if (message_id - state_delta < max_state_message_id)
1319 return GNUNET_NO; /* changes already applied */
1322 if (TRANSACTION_NONE != plugin->transaction)
1324 /** @todo FIXME: wait for other transaction to finish */
1325 return GNUNET_SYSERR;
1327 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1332 * Set the current value of state variable.
1334 * @see GNUNET_PSYCSTORE_state_modify()
1336 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1339 state_modify_op (void *cls,
1340 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1341 enum GNUNET_PSYC_Operator op,
1342 const char *name, const void *value, size_t value_size)
1344 struct Plugin *plugin = cls;
1345 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1349 case GNUNET_PSYC_OP_ASSIGN:
1350 return state_assign (plugin, "insert_state_current",
1351 channel_key, name, value, value_size);
1353 default: /** @todo implement more state operations */
1355 return GNUNET_SYSERR;
1361 * End modifying current state.
1364 state_modify_end (void *cls,
1365 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1366 uint64_t message_id)
1368 struct Plugin *plugin = cls;
1369 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1372 GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1373 && GNUNET_OK == update_message_id (plugin,
1374 "update_max_state_message_id",
1375 channel_key, message_id)
1376 && GNUNET_OK == transaction_commit (plugin)
1377 ? GNUNET_OK : GNUNET_SYSERR;
1382 * Begin state synchronization.
1385 state_sync_begin (void *cls,
1386 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1388 struct Plugin *plugin = cls;
1389 return exec_channel (plugin, "delete_state_sync", channel_key);
1394 * Assign current value of a state variable.
1396 * @see GNUNET_PSYCSTORE_state_modify()
1398 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1401 state_sync_assign (void *cls,
1402 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1403 const char *name, const void *value, size_t value_size)
1405 struct Plugin *plugin = cls;
1406 return state_assign (plugin, "insert_state_sync",
1407 channel_key, name, value, value_size);
1412 * End modifying current state.
1415 state_sync_end (void *cls,
1416 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1417 uint64_t max_state_message_id,
1418 uint64_t state_hash_message_id)
1420 struct Plugin *plugin = cls;
1421 int ret = GNUNET_SYSERR;
1423 if (TRANSACTION_NONE != plugin->transaction)
1425 /** @todo FIXME: wait for other transaction to finish */
1426 return GNUNET_SYSERR;
1429 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1430 && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1431 && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1433 && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1435 && GNUNET_OK == update_message_id (plugin,
1436 "update_state_hash_message_id",
1437 channel_key, state_hash_message_id)
1438 && GNUNET_OK == update_message_id (plugin,
1439 "update_max_state_message_id",
1440 channel_key, max_state_message_id)
1441 && GNUNET_OK == transaction_commit (plugin)
1443 : transaction_rollback (plugin);
1449 * Delete the whole state.
1451 * @see GNUNET_PSYCSTORE_state_reset()
1453 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1456 state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1458 struct Plugin *plugin = cls;
1459 return exec_channel (plugin, "delete_state", channel_key);
1464 * Update signed values of state variables in the state store.
1466 * @see GNUNET_PSYCSTORE_state_hash_update()
1468 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1471 state_update_signed (void *cls,
1472 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1474 struct Plugin *plugin = cls;
1475 return exec_channel (plugin, "update_state_signed", channel_key);
1480 * Retrieve a state variable by name.
1482 * @see GNUNET_PSYCSTORE_state_get()
1484 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1487 state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1488 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1492 struct Plugin *plugin = cls;
1493 int ret = GNUNET_SYSERR;
1495 const char *stmt = "select_state_one";
1497 struct GNUNET_PQ_QueryParam params_select[] = {
1498 GNUNET_PQ_query_param_auto_from_type (channel_key),
1499 GNUNET_PQ_query_param_string (name),
1500 GNUNET_PQ_query_param_end
1503 void *value_current = NULL;
1504 size_t value_size = 0;
1506 struct GNUNET_PQ_ResultSpec results[] = {
1507 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1508 GNUNET_PQ_result_spec_end
1511 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1512 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1515 "PQexecPrepared", stmt))
1517 return GNUNET_SYSERR;
1520 if (PQntuples (res) == 0)
1526 ret = GNUNET_PQ_extract_result (res, results, 0);
1528 if (GNUNET_OK != ret)
1531 return GNUNET_SYSERR;
1534 ret = cb (cb_cls, name, value_current,
1537 GNUNET_PQ_cleanup_result(results);
1545 * Retrieve all state variables for a channel with the given prefix.
1547 * @see GNUNET_PSYCSTORE_state_get_prefix()
1549 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1552 state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1553 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1557 struct Plugin *plugin = cls;
1558 int ret = GNUNET_NO;
1560 const char *stmt = "select_state_prefix";
1562 uint32_t name_len = (uint32_t) strlen (name);
1564 struct GNUNET_PQ_QueryParam params_select[] = {
1565 GNUNET_PQ_query_param_auto_from_type (channel_key),
1566 GNUNET_PQ_query_param_string (name),
1567 GNUNET_PQ_query_param_uint32 (&name_len),
1568 GNUNET_PQ_query_param_string (name),
1569 GNUNET_PQ_query_param_end
1573 void *value_current = NULL;
1574 size_t value_size = 0;
1576 struct GNUNET_PQ_ResultSpec results[] = {
1577 GNUNET_PQ_result_spec_string ("name", &name2),
1578 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1579 GNUNET_PQ_result_spec_end
1582 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1583 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1586 "PQexecPrepared", stmt))
1588 return GNUNET_SYSERR;
1591 int nrows = PQntuples (res);
1592 for (int row = 0; row < nrows; row++)
1594 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1599 ret = cb (cb_cls, (const char *) name2,
1602 GNUNET_PQ_cleanup_result(results);
1612 * Retrieve all signed state variables for a channel.
1614 * @see GNUNET_PSYCSTORE_state_get_signed()
1616 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1619 state_get_signed (void *cls,
1620 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1621 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1624 struct Plugin *plugin = cls;
1625 int ret = GNUNET_NO;
1627 const char *stmt = "select_state_signed";
1629 struct GNUNET_PQ_QueryParam params_select[] = {
1630 GNUNET_PQ_query_param_auto_from_type (channel_key),
1631 GNUNET_PQ_query_param_end
1635 void *value_signed = NULL;
1636 size_t value_size = 0;
1638 struct GNUNET_PQ_ResultSpec results[] = {
1639 GNUNET_PQ_result_spec_string ("name", &name),
1640 GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size),
1641 GNUNET_PQ_result_spec_end
1644 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1645 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1648 "PQexecPrepared", stmt))
1650 return GNUNET_SYSERR;
1653 int nrows = PQntuples (res);
1654 for (int row = 0; row < nrows; row++)
1656 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1661 ret = cb (cb_cls, (const char *) name,
1665 GNUNET_PQ_cleanup_result (results);
1675 * Entry point for the plugin.
1677 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1678 * @return NULL on error, otherwise the plugin context
1681 libgnunet_plugin_psycstore_postgres_init (void *cls)
1683 static struct Plugin plugin;
1684 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1685 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1687 if (NULL != plugin.cfg)
1688 return NULL; /* can only initialize once! */
1689 memset (&plugin, 0, sizeof (struct Plugin));
1691 if (GNUNET_OK != database_setup (&plugin))
1693 database_shutdown (&plugin);
1696 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1698 api->membership_store = &postgres_membership_store;
1699 api->membership_test = &membership_test;
1700 api->fragment_store = &fragment_store;
1701 api->message_add_flags = &message_add_flags;
1702 api->fragment_get = &fragment_get;
1703 api->fragment_get_latest = &fragment_get_latest;
1704 api->message_get = &message_get;
1705 api->message_get_latest = &message_get_latest;
1706 api->message_get_fragment = &message_get_fragment;
1707 api->counters_message_get = &counters_message_get;
1708 api->counters_state_get = &counters_state_get;
1709 api->state_modify_begin = &state_modify_begin;
1710 api->state_modify_op = &state_modify_op;
1711 api->state_modify_end = &state_modify_end;
1712 api->state_sync_begin = &state_sync_begin;
1713 api->state_sync_assign = &state_sync_assign;
1714 api->state_sync_end = &state_sync_end;
1715 api->state_reset = &state_reset;
1716 api->state_update_signed = &state_update_signed;
1717 api->state_get = &state_get;
1718 api->state_get_prefix = &state_get_prefix;
1719 api->state_get_signed = &state_get_signed;
1721 LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1727 * Exit point from the plugin.
1729 * @param cls The plugin context (as returned by "init")
1730 * @return Always NULL
1733 libgnunet_plugin_psycstore_postgres_done (void *cls)
1735 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1736 struct Plugin *plugin = api->cls;
1738 database_shutdown (plugin);
1741 LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1745 /* end of plugin_psycstore_postgres.c */