2 This file is part of GNUnet
3 Copyright (C) 2009-2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file datastore/plugin_datastore_postgres.c
23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include "gnunet_pq_lib.h"
34 * After how many ms "busy" should a DB operation fail for good?
35 * A low value makes sure that we are more responsive to requests
36 * (especially PUTs). A high value guarantees a higher success
37 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
39 * The default value of 1s should ensure that users do not experience
40 * huge latencies while at the same time allowing operations to succeed
41 * with reasonable probability.
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
47 * Context for all functions in this plugin.
52 * Our execution environment.
54 struct GNUNET_DATASTORE_PluginEnvironment *env;
57 * Native Postgres database handle.
65 * @brief Get a database handle
67 * @param plugin global context
68 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
71 init_connection (struct Plugin *plugin)
73 struct GNUNET_PQ_ExecuteStatement es[] = {
74 /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
75 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
76 * we do math or inequality tests, so we can't handle the entire range of uint32_t.
77 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
78 * PostgreSQL also recommends against using WITH OIDS.
80 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
81 " repl INTEGER NOT NULL DEFAULT 0,"
82 " type INTEGER NOT NULL DEFAULT 0,"
83 " prio INTEGER NOT NULL DEFAULT 0,"
84 " anonLevel INTEGER NOT NULL DEFAULT 0,"
85 " expire BIGINT NOT NULL DEFAULT 0,"
86 " rvalue BIGINT NOT NULL DEFAULT 0,"
87 " hash BYTEA NOT NULL DEFAULT '',"
88 " vhash BYTEA NOT NULL DEFAULT '',"
89 " value BYTEA NOT NULL DEFAULT '')"
91 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
92 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
93 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
94 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
95 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
96 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
97 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
98 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
99 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
100 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
101 GNUNET_PQ_EXECUTE_STATEMENT_END
103 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
104 struct GNUNET_PQ_PreparedStatement ps[] = {
105 GNUNET_PQ_make_prepare ("get",
106 "SELECT " RESULT_COLUMNS " FROM gn090 "
107 "WHERE oid >= $1::bigint AND "
108 "(rvalue >= $2 OR 0 = $3::smallint) AND "
109 "(hash = $4 OR 0 = $5::smallint) AND "
110 "(type = $6 OR 0 = $7::smallint) "
111 "ORDER BY oid ASC LIMIT 1",
113 GNUNET_PQ_make_prepare ("put",
114 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
115 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
117 GNUNET_PQ_make_prepare ("update",
119 "SET prio = prio + $1, "
121 "expire = GREATEST(expire, $3) "
122 "WHERE hash = $4 AND vhash = $5",
124 GNUNET_PQ_make_prepare ("decrepl",
125 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
128 GNUNET_PQ_make_prepare ("select_non_anonymous",
129 "SELECT " RESULT_COLUMNS " FROM gn090 "
130 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
131 "ORDER BY oid ASC LIMIT 1",
133 GNUNET_PQ_make_prepare ("select_expiration_order",
134 "(SELECT " RESULT_COLUMNS " FROM gn090 "
135 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
137 "(SELECT " RESULT_COLUMNS " FROM gn090 "
138 "ORDER BY prio ASC LIMIT 1) "
139 "ORDER BY expire ASC LIMIT 1",
141 GNUNET_PQ_make_prepare ("select_replication_order",
142 "SELECT " RESULT_COLUMNS " FROM gn090 "
143 "ORDER BY repl DESC,RANDOM() LIMIT 1",
145 GNUNET_PQ_make_prepare ("delrow",
146 "DELETE FROM gn090 " "WHERE oid=$1",
148 GNUNET_PQ_make_prepare ("remove", "DELETE FROM gn090 "
149 "WHERE hash = $1 AND "
152 GNUNET_PQ_make_prepare ("get_keys",
153 "SELECT hash FROM gn090",
155 GNUNET_PQ_PREPARED_STATEMENT_END
157 #undef RESULT_COLUMNS
159 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
160 "datastore-postgres");
161 if (NULL == plugin->dbh)
162 return GNUNET_SYSERR;
165 GNUNET_PQ_exec_statements (plugin->dbh,
168 GNUNET_PQ_prepare_statements (plugin->dbh,
171 PQfinish (plugin->dbh);
173 return GNUNET_SYSERR;
180 * Get an estimate of how much space the database is
183 * @param cls our `struct Plugin *`
184 * @return number of bytes used on disk
187 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
189 struct Plugin *plugin = cls;
190 unsigned long long total;
193 if (NULL == estimate)
196 PQexecParams (plugin->dbh,
197 "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
198 NULL, NULL, NULL, NULL, 1);
200 GNUNET_POSTGRES_check_result (plugin->dbh,
209 if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
216 if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
218 GNUNET_break (0 == PQgetlength (ret, 0, 0));
223 total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
230 * Store an item in the datastore.
232 * @param cls closure with the `struct Plugin`
233 * @param key key for the item
234 * @param absent true if the key was not found in the bloom filter
235 * @param size number of bytes in data
236 * @param data content stored
237 * @param type type of the content
238 * @param priority priority of the content
239 * @param anonymity anonymity-level for the content
240 * @param replication replication-level for the content
241 * @param expiration expiration time for the content
242 * @param cont continuation called with success or failure status
243 * @param cont_cls continuation closure
246 postgres_plugin_put (void *cls,
247 const struct GNUNET_HashCode *key,
251 enum GNUNET_BLOCK_Type type,
254 uint32_t replication,
255 struct GNUNET_TIME_Absolute expiration,
259 struct Plugin *plugin = cls;
260 struct GNUNET_HashCode vhash;
261 enum GNUNET_PQ_QueryStatus ret;
263 GNUNET_CRYPTO_hash (data,
268 struct GNUNET_PQ_QueryParam params[] = {
269 GNUNET_PQ_query_param_uint32 (&priority),
270 GNUNET_PQ_query_param_uint32 (&replication),
271 GNUNET_PQ_query_param_absolute_time (&expiration),
272 GNUNET_PQ_query_param_auto_from_type (key),
273 GNUNET_PQ_query_param_auto_from_type (&vhash),
274 GNUNET_PQ_query_param_end
276 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
285 _("Postgress exec failure"));
288 bool affected = (0 != ret);
301 uint32_t utype = (uint32_t) type;
302 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
304 struct GNUNET_PQ_QueryParam params[] = {
305 GNUNET_PQ_query_param_uint32 (&replication),
306 GNUNET_PQ_query_param_uint32 (&utype),
307 GNUNET_PQ_query_param_uint32 (&priority),
308 GNUNET_PQ_query_param_uint32 (&anonymity),
309 GNUNET_PQ_query_param_absolute_time (&expiration),
310 GNUNET_PQ_query_param_uint64 (&rvalue),
311 GNUNET_PQ_query_param_auto_from_type (key),
312 GNUNET_PQ_query_param_auto_from_type (&vhash),
313 GNUNET_PQ_query_param_fixed_size (data, size),
314 GNUNET_PQ_query_param_end
317 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
326 "Postgress exec failure");
330 plugin->env->duc (plugin->env->cls,
331 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
332 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
333 "datastore-postgres",
334 "Stored %u bytes in database\n",
335 (unsigned int) size);
345 * Function invoked to process the result and call the processor.
347 * @param plugin global plugin data
348 * @param proc function to call the value (once only).
349 * @param proc_cls closure for proc
350 * @param res result from exec
351 * @param filename filename for error messages
352 * @param line line number for error messages
355 process_result (struct Plugin *plugin,
356 PluginDatumProcessor proc,
359 const char *filename, int line)
365 uint32_t replication;
369 struct GNUNET_TIME_Absolute expiration_time;
370 struct GNUNET_HashCode key;
371 struct GNUNET_PQ_ResultSpec rs[] = {
372 GNUNET_PQ_result_spec_uint32 ("repl", &replication),
373 GNUNET_PQ_result_spec_uint32 ("type", &utype),
374 GNUNET_PQ_result_spec_uint32 ("prio", &priority),
375 GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
376 GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
377 GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
378 GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
379 GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
380 GNUNET_PQ_result_spec_end
384 GNUNET_POSTGRES_check_result_ (plugin->dbh,
391 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
392 "datastore-postgres",
393 "Ending iteration (postgres error)\n");
394 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
398 if (0 == PQntuples (res))
401 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
402 "datastore-postgres",
403 "Ending iteration (no more results)\n");
404 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
408 if (1 != PQntuples (res))
411 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
416 GNUNET_PQ_extract_result (res,
422 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
425 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
429 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
430 "datastore-postgres",
431 "Found result of size %u bytes and type %u in database\n",
433 (unsigned int) utype);
434 iret = proc (proc_cls,
438 (enum GNUNET_BLOCK_Type) utype,
445 if (iret == GNUNET_NO)
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448 "Processor asked for item %u to be removed.\n",
449 (unsigned int) rowid);
451 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
455 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
456 "datastore-postgres",
457 "Deleting %u bytes from database\n",
458 (unsigned int) size);
459 plugin->env->duc (plugin->env->cls,
460 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
461 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
462 "datastore-postgres",
463 "Deleted %u bytes from database\n",
464 (unsigned int) size);
471 * Get one of the results for a particular key in the datastore.
473 * @param cls closure with the 'struct Plugin'
474 * @param next_uid return the result with lowest uid >= next_uid
475 * @param random if true, return a random result instead of using next_uid
476 * @param key maybe NULL (to match all entries)
477 * @param type entries of which type are relevant?
478 * Use 0 for any type.
479 * @param proc function to call on the matching value;
480 * will be called with NULL if nothing matches
481 * @param proc_cls closure for @a proc
484 postgres_plugin_get_key (void *cls,
487 const struct GNUNET_HashCode *key,
488 enum GNUNET_BLOCK_Type type,
489 PluginDatumProcessor proc,
492 struct Plugin *plugin = cls;
493 uint32_t utype = type;
494 uint16_t use_rvalue = random;
495 uint16_t use_key = NULL != key;
496 uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
498 struct GNUNET_PQ_QueryParam params[] = {
499 GNUNET_PQ_query_param_uint64 (&next_uid),
500 GNUNET_PQ_query_param_uint64 (&rvalue),
501 GNUNET_PQ_query_param_uint16 (&use_rvalue),
502 GNUNET_PQ_query_param_auto_from_type (key),
503 GNUNET_PQ_query_param_uint16 (&use_key),
504 GNUNET_PQ_query_param_uint32 (&utype),
505 GNUNET_PQ_query_param_uint16 (&use_type),
506 GNUNET_PQ_query_param_end
512 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
519 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
522 process_result (plugin,
531 * Select a subset of the items in the datastore and call
532 * the given iterator for each of them.
534 * @param cls our `struct Plugin *`
535 * @param next_uid return the result with lowest uid >= next_uid
536 * @param type entries of which type should be considered?
537 * Must not be zero (ANY).
538 * @param proc function to call on the matching value;
539 * will be called with NULL if no value matches
540 * @param proc_cls closure for @a proc
543 postgres_plugin_get_zero_anonymity (void *cls,
545 enum GNUNET_BLOCK_Type type,
546 PluginDatumProcessor proc,
549 struct Plugin *plugin = cls;
550 uint32_t utype = type;
551 struct GNUNET_PQ_QueryParam params[] = {
552 GNUNET_PQ_query_param_uint32 (&utype),
553 GNUNET_PQ_query_param_uint64 (&next_uid),
554 GNUNET_PQ_query_param_end
558 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
559 "select_non_anonymous",
562 process_result (plugin,
570 * Context for #repl_iter() function.
578 struct Plugin *plugin;
581 * Function to call for the result (or the NULL).
583 PluginDatumProcessor proc;
586 * Closure for @e proc.
593 * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
594 * Decrements the replication counter and calls the original
597 * @param cls closure with the `struct ReplCtx *`
598 * @param key key for the content
599 * @param size number of bytes in @a data
600 * @param data content stored
601 * @param type type of the content
602 * @param priority priority of the content
603 * @param anonymity anonymity-level for the content
604 * @param replication replication-level for the content
605 * @param expiration expiration time for the content
606 * @param uid unique identifier for the datum;
607 * maybe 0 if no unique identifier is available
608 * @return #GNUNET_SYSERR to abort the iteration,
609 * #GNUNET_OK to continue
610 * (continue on call to "next", of course),
611 * #GNUNET_NO to delete the item and continue (if supported)
614 repl_proc (void *cls,
615 const struct GNUNET_HashCode *key,
618 enum GNUNET_BLOCK_Type type,
621 uint32_t replication,
622 struct GNUNET_TIME_Absolute expiration,
625 struct ReplCtx *rc = cls;
626 struct Plugin *plugin = rc->plugin;
628 uint32_t oid = (uint32_t) uid;
629 struct GNUNET_PQ_QueryParam params[] = {
630 GNUNET_PQ_query_param_uint32 (&oid),
631 GNUNET_PQ_query_param_end
635 ret = rc->proc (rc->proc_cls,
647 qret = GNUNET_PQ_exec_prepared (plugin->dbh,
651 GNUNET_POSTGRES_check_result (plugin->dbh,
656 return GNUNET_SYSERR;
663 * Get a random item for replication. Returns a single, not expired,
664 * random item from those with the highest replication counters. The
665 * item's replication counter is decremented by one IF it was positive
666 * before. Call @a proc with all values ZERO or NULL if the datastore
669 * @param cls closure with the `struct Plugin`
670 * @param proc function to call the value (once only).
671 * @param proc_cls closure for @a proc
674 postgres_plugin_get_replication (void *cls,
675 PluginDatumProcessor proc,
678 struct Plugin *plugin = cls;
684 rc.proc_cls = proc_cls;
685 ret = PQexecPrepared (plugin->dbh,
686 "select_replication_order", 0, NULL, NULL,
688 process_result (plugin,
697 * Get a random item for expiration. Call @a proc with all values
698 * ZERO or NULL if the datastore is empty.
700 * @param cls closure with the `struct Plugin`
701 * @param proc function to call the value (once only).
702 * @param proc_cls closure for @a proc
705 postgres_plugin_get_expiration (void *cls,
706 PluginDatumProcessor proc,
709 struct Plugin *plugin = cls;
710 struct GNUNET_TIME_Absolute now;
711 struct GNUNET_PQ_QueryParam params[] = {
712 GNUNET_PQ_query_param_absolute_time (&now),
713 GNUNET_PQ_query_param_end
717 now = GNUNET_TIME_absolute_get ();
718 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
719 "select_expiration_order",
721 process_result (plugin,
729 * Get all of the keys in the datastore.
731 * @param cls closure with the `struct Plugin *`
732 * @param proc function to call on each key
733 * @param proc_cls closure for @a proc
736 postgres_plugin_get_keys (void *cls,
737 PluginKeyProcessor proc,
740 struct Plugin *plugin = cls;
743 struct GNUNET_HashCode key;
746 res = PQexecPrepared (plugin->dbh,
748 0, NULL, NULL, NULL, 1);
749 ret = PQntuples (res);
752 if (sizeof (struct GNUNET_HashCode) !=
753 PQgetlength (res, i, 0))
756 PQgetvalue (res, i, 0),
757 sizeof (struct GNUNET_HashCode));
758 proc (proc_cls, &key, 1);
762 proc (proc_cls, NULL, 0);
769 * @param cls closure with the `struct Plugin *`
772 postgres_plugin_drop (void *cls)
774 struct Plugin *plugin = cls;
777 GNUNET_POSTGRES_exec (plugin->dbh,
779 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
781 _("Failed to drop table from database.\n"));
786 * Remove a particular key in the datastore.
789 * @param key key for the content
790 * @param size number of bytes in data
791 * @param data content stored
792 * @param cont continuation called with success or failure status
793 * @param cont_cls continuation closure for @a cont
796 postgres_plugin_remove_key (void *cls,
797 const struct GNUNET_HashCode *key,
800 PluginRemoveCont cont,
803 struct Plugin *plugin = cls;
804 enum GNUNET_PQ_QueryStatus ret;
805 struct GNUNET_PQ_QueryParam params[] = {
806 GNUNET_PQ_query_param_auto_from_type (key),
807 GNUNET_PQ_query_param_fixed_size (data, size),
808 GNUNET_PQ_query_param_end
811 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
820 _("Postgress exec failure"));
823 if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS == ret)
832 plugin->env->duc (plugin->env->cls,
833 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
834 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
835 "datastore-postgres",
836 "Deleted %u bytes from database\n",
837 (unsigned int) size);
847 * Entry point for the plugin.
849 * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
850 * @return our `struct Plugin *`
853 libgnunet_plugin_datastore_postgres_init (void *cls)
855 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
856 struct GNUNET_DATASTORE_PluginFunctions *api;
857 struct Plugin *plugin;
859 plugin = GNUNET_new (struct Plugin);
861 if (GNUNET_OK != init_connection (plugin))
863 GNUNET_free (plugin);
866 api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
868 api->estimate_size = &postgres_plugin_estimate_size;
869 api->put = &postgres_plugin_put;
870 api->get_key = &postgres_plugin_get_key;
871 api->get_replication = &postgres_plugin_get_replication;
872 api->get_expiration = &postgres_plugin_get_expiration;
873 api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
874 api->get_keys = &postgres_plugin_get_keys;
875 api->drop = &postgres_plugin_drop;
876 api->remove_key = &postgres_plugin_remove_key;
877 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
878 "datastore-postgres",
879 _("Postgres database running\n"));
885 * Exit point from the plugin.
887 * @param cls our `struct Plugin *`
888 * @return always NULL
891 libgnunet_plugin_datastore_postgres_done (void *cls)
893 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
894 struct Plugin *plugin = api->cls;
896 PQfinish (plugin->dbh);
897 GNUNET_free (plugin);
902 /* end of plugin_datastore_postgres.c */