2 This file is part of GNUnet
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/plugin_datastore_postgres.c
23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
31 #define DEBUG_POSTGRES GNUNET_NO
33 #define SELECT_IT_LOW_PRIORITY "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
34 "WHERE (prio = $1 AND oid > $2) " \
35 "ORDER BY prio ASC,oid ASC LIMIT 1) "\
37 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
38 "WHERE (prio > $1 AND oid != $2)"\
39 "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40 "ORDER BY prio ASC,oid ASC LIMIT 1"
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
43 "WHERE (prio = $1 AND oid < $2)"\
44 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
46 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
47 "WHERE (prio < $1 AND oid != $2)"\
48 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49 "ORDER BY prio DESC,oid DESC LIMIT 1"
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
52 "WHERE (expire = $1 AND oid > $2) "\
53 "ORDER BY expire ASC,oid ASC LIMIT 1) "\
55 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
56 "WHERE (expire > $1 AND oid != $2) " \
57 "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58 "ORDER BY expire ASC,oid ASC LIMIT 1"
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
62 "WHERE (expire = $1 AND oid < $2)"\
63 " AND expire > $3 AND type!=3"\
64 " ORDER BY expire DESC,oid DESC LIMIT 1) "\
66 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
67 "WHERE (expire < $1 AND oid != $2)" \
68 " AND expire > $3 AND type!=3"\
69 " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70 "ORDER BY expire DESC,oid DESC LIMIT 1"
73 * After how many ms "busy" should a DB operation fail for good?
74 * A low value makes sure that we are more responsive to requests
75 * (especially PUTs). A high value guarantees a higher success
76 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78 * The default value of 1s should ensure that users do not experience
79 * huge latencies while at the same time allowing operations to succeed
80 * with reasonable probability.
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
85 struct NextRequestClosure
87 struct Plugin *plugin;
90 const char *paramValues[5];
96 GNUNET_HashCode vhash;
100 unsigned long long total;
101 uint64_t blast_expire;
102 uint32_t blast_rowid;
110 * Context for all functions in this plugin.
115 * Our execution environment.
117 struct GNUNET_DATASTORE_PluginEnvironment *env;
120 * Native Postgres database handle.
125 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
127 struct NextRequestClosure *next_task_nc;
130 * Pending task with scheduler for running the next request.
132 GNUNET_SCHEDULER_TaskIdentifier next_task;
134 unsigned long long payload;
136 unsigned int lastSync;
142 * Check if the result obtained from Postgres has
143 * the desired status code. If not, log an error, clear the
144 * result and return GNUNET_SYSERR.
146 * @return GNUNET_OK if the result is acceptable
149 check_result (struct Plugin *plugin,
152 const char *command, const char *args, int line)
156 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
157 "datastore-postgres",
158 "Postgres failed to allocate result for `%s:%s' at %d\n",
159 command, args, line);
160 return GNUNET_SYSERR;
162 if (PQresultStatus (ret) != expected_status)
164 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
165 "datastore-postgres",
166 _("`%s:%s' failed at %s:%d with error: %s"),
167 command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
169 return GNUNET_SYSERR;
175 * Run simple SQL statement (without results).
178 pq_exec (struct Plugin *plugin,
179 const char *sql, int line)
182 ret = PQexec (plugin->dbh, sql);
183 if (GNUNET_OK != check_result (plugin,
185 PGRES_COMMAND_OK, "PQexec", sql, line))
186 return GNUNET_SYSERR;
192 * Prepare SQL statement.
195 pq_prepare (struct Plugin *plugin,
196 const char *name, const char *sql, int nparms, int line)
199 ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
201 check_result (plugin,
202 ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
203 return GNUNET_SYSERR;
209 * @brief Get a database handle
210 * @return GNUNET_OK on success, GNUNET_SYSERR on error
213 init_connection (struct Plugin *plugin)
218 /* Open database and precompile statements */
220 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
221 "datastore-postgres",
224 plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
225 GNUNET_free_non_null (conninfo);
226 if (NULL == plugin->dbh)
228 /* FIXME: warn about out-of-memory? */
229 return GNUNET_SYSERR;
231 if (PQstatus (plugin->dbh) != CONNECTION_OK)
233 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
235 _("Unable to initialize Postgres: %s"),
236 PQerrorMessage (plugin->dbh));
237 PQfinish (plugin->dbh);
239 return GNUNET_SYSERR;
241 ret = PQexec (plugin->dbh,
242 "CREATE TABLE gn080 ("
243 " size INTEGER NOT NULL DEFAULT 0,"
244 " type INTEGER NOT NULL DEFAULT 0,"
245 " prio INTEGER NOT NULL DEFAULT 0,"
246 " anonLevel INTEGER NOT NULL DEFAULT 0,"
247 " expire BIGINT NOT NULL DEFAULT 0,"
248 " hash BYTEA NOT NULL DEFAULT '',"
249 " vhash BYTEA NOT NULL DEFAULT '',"
250 " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
251 if ( (ret == NULL) ||
252 ( (PQresultStatus (ret) != PGRES_COMMAND_OK) &&
253 (0 != strcmp ("42P07", /* duplicate table */
256 PG_DIAG_SQLSTATE)))))
258 check_result (plugin,
259 ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn080", __LINE__);
260 PQfinish (plugin->dbh);
262 return GNUNET_SYSERR;
264 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
267 pq_exec (plugin, "CREATE INDEX idx_hash ON gn080 (hash)", __LINE__)) ||
269 pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)",
272 pq_exec (plugin, "CREATE INDEX idx_prio ON gn080 (prio)", __LINE__))
274 pq_exec (plugin, "CREATE INDEX idx_expire ON gn080 (expire)", __LINE__))
276 pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)",
280 (plugin, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
283 pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)",
287 PQfinish (plugin->dbh);
289 return GNUNET_SYSERR;
294 ret = PQexec (plugin->dbh,
295 "ALTER TABLE gn080 ALTER value SET STORAGE EXTERNAL");
297 check_result (plugin,
298 ret, PGRES_COMMAND_OK,
299 "ALTER TABLE", "gn080", __LINE__))
301 PQfinish (plugin->dbh);
303 return GNUNET_SYSERR;
306 ret = PQexec (plugin->dbh,
307 "ALTER TABLE gn080 ALTER hash SET STORAGE PLAIN");
309 check_result (plugin,
310 ret, PGRES_COMMAND_OK,
311 "ALTER TABLE", "gn080", __LINE__))
313 PQfinish (plugin->dbh);
315 return GNUNET_SYSERR;
318 ret = PQexec (plugin->dbh,
319 "ALTER TABLE gn080 ALTER vhash SET STORAGE PLAIN");
321 check_result (plugin,
322 ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn080", __LINE__))
324 PQfinish (plugin->dbh);
326 return GNUNET_SYSERR;
333 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
334 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
335 "AND oid >= $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
341 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
342 "WHERE hash=$1 AND type=$2"
343 "AND oid >= $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
349 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
350 "WHERE hash=$1 AND vhash=$2"
351 "AND oid >= $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
357 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
359 "AND oid >= $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
365 "INSERT INTO gn080 (size, type, prio, anonLevel, expire, hash, vhash, value) "
366 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
372 "UPDATE gn080 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
378 "select_low_priority",
379 SELECT_IT_LOW_PRIORITY,
384 "select_non_anonymous",
385 SELECT_IT_NON_ANONYMOUS,
390 "select_expiration_time",
391 SELECT_IT_EXPIRATION_TIME,
396 "select_migration_order",
397 SELECT_IT_MIGRATION_ORDER,
403 "DELETE FROM gn080 " "WHERE oid=$1", 1, __LINE__)))
405 PQfinish (plugin->dbh);
407 return GNUNET_SYSERR;
414 * Delete the row identified by the given rowid (qid
417 * @return GNUNET_OK on success
420 delete_by_rowid (struct Plugin *plugin,
423 const char *paramValues[] = { (const char *) &rowid };
424 int paramLengths[] = { sizeof (rowid) };
425 const int paramFormats[] = { 1 };
428 ret = PQexecPrepared (plugin->dbh,
430 1, paramValues, paramLengths, paramFormats, 1);
432 check_result (plugin,
433 ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
436 return GNUNET_SYSERR;
444 * Get an estimate of how much space the database is
447 * @param cls our "struct Plugin*"
448 * @return number of bytes used on disk
450 static unsigned long long
451 postgres_plugin_get_size (void *cls)
453 struct Plugin *plugin = cls;
456 ret = plugin->payload;
457 return (unsigned long long) (ret * 1.00);
458 /* benchmarking shows XX% overhead */
463 * Store an item in the datastore.
466 * @param key key for the item
467 * @param size number of bytes in data
468 * @param data content stored
469 * @param type type of the content
470 * @param priority priority of the content
471 * @param anonymity anonymity-level for the content
472 * @param expiration expiration time for the content
473 * @param msg set to error message
474 * @return GNUNET_OK on success
477 postgres_plugin_put (void *cls,
478 const GNUNET_HashCode * key,
481 enum GNUNET_BLOCK_Type type,
484 struct GNUNET_TIME_Absolute expiration,
487 struct Plugin *plugin = cls;
488 GNUNET_HashCode vhash;
490 uint32_t bsize = htonl (size);
491 uint32_t btype = htonl (type);
492 uint32_t bprio = htonl (priority);
493 uint32_t banon = htonl (anonymity);
494 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
495 const char *paramValues[] = {
496 (const char *) &bsize,
497 (const char *) &btype,
498 (const char *) &bprio,
499 (const char *) &banon,
500 (const char *) &bexpi,
502 (const char *) &vhash,
505 int paramLengths[] = {
511 sizeof (GNUNET_HashCode),
512 sizeof (GNUNET_HashCode),
515 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
517 GNUNET_CRYPTO_hash (data, size, &vhash);
518 ret = PQexecPrepared (plugin->dbh,
519 "put", 8, paramValues, paramLengths, paramFormats, 1);
520 if (GNUNET_OK != check_result (plugin, ret,
522 "PQexecPrepared", "put", __LINE__))
523 return GNUNET_SYSERR;
525 plugin->payload += size;
530 * Function invoked on behalf of a "PluginIterator"
531 * asking the database plugin to call the iterator
532 * with the next item.
534 * @param cls the 'struct NextRequestClosure'
535 * @param tc scheduler context
538 postgres_next_request_cont (void *next_cls,
539 const struct GNUNET_SCHEDULER_TaskContext *tc)
541 struct NextRequestClosure *nrc = next_cls;
542 struct Plugin *plugin = nrc->plugin;
543 const int paramFormats[] = { 1, 1, 1, 1, 1 };
546 enum GNUNET_BLOCK_Type type;
551 struct GNUNET_TIME_Absolute expiration_time;
554 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
555 plugin->next_task_nc = NULL;
556 if (GNUNET_YES == nrc->end_it)
558 nrc->iter (nrc->iter_cls,
559 NULL, NULL, 0, NULL, 0, 0, 0,
560 GNUNET_TIME_UNIT_ZERO_ABS, 0);
567 nrc->blimit_off = GNUNET_htonll (nrc->off);
569 nrc->blimit_off = GNUNET_htonll (0);
571 res = PQexecPrepared (plugin->dbh,
577 if (GNUNET_OK != check_result (plugin,
584 nrc->iter (nrc->iter_cls,
585 NULL, NULL, 0, NULL, 0, 0, 0,
586 GNUNET_TIME_UNIT_ZERO_ABS, 0);
591 if (0 == PQntuples (res))
594 nrc->iter (nrc->iter_cls,
595 NULL, NULL, 0, NULL, 0, 0, 0,
596 GNUNET_TIME_UNIT_ZERO_ABS, 0);
601 if ((1 != PQntuples (res)) ||
602 (8 != PQnfields (res)) ||
603 (sizeof (uint32_t) != PQfsize (res, 0)) ||
604 (sizeof (uint32_t) != PQfsize (res, 7)))
607 nrc->iter (nrc->iter_cls,
608 NULL, NULL, 0, NULL, 0, 0, 0,
609 GNUNET_TIME_UNIT_ZERO_ABS, 0);
614 rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 7));
615 size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
616 if ((sizeof (uint32_t) != PQfsize (res, 1)) ||
617 (sizeof (uint32_t) != PQfsize (res, 2)) ||
618 (sizeof (uint32_t) != PQfsize (res, 3)) ||
619 (sizeof (uint64_t) != PQfsize (res, 4)) ||
620 (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 5)) ||
621 (size != PQgetlength (res, 0, 6)))
625 delete_by_rowid (plugin, rowid);
626 nrc->iter (nrc->iter_cls,
627 NULL, NULL, 0, NULL, 0, 0, 0,
628 GNUNET_TIME_UNIT_ZERO_ABS, 0);
633 type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
634 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
635 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 3));
636 expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 4));
637 size = PQgetlength (res, 0, 6);
638 memcpy (&key, PQgetvalue (res, 0, 5), sizeof (GNUNET_HashCode));
640 nrc->blast_prio = htonl (priority);
641 nrc->blast_expire = GNUNET_htonll (expiration_time.value);
642 nrc->blast_rowid = htonl (rowid + 1);
644 iret = nrc->iter (nrc->iter_cls,
648 PQgetvalue (res, 0, 6),
649 (enum GNUNET_BLOCK_Type) type,
655 if (iret == GNUNET_SYSERR)
657 if (iret == GNUNET_NO)
659 plugin->payload -= size;
660 delete_by_rowid (plugin, rowid);
666 * Function invoked on behalf of a "PluginIterator"
667 * asking the database plugin to call the iterator
668 * with the next item.
670 * @param next_cls whatever argument was given
671 * to the PluginIterator as "next_cls".
672 * @param end_it set to GNUNET_YES if we
673 * should terminate the iteration early
674 * (iterator should be still called once more
675 * to signal the end of the iteration).
678 postgres_plugin_next_request (void *next_cls,
681 struct NextRequestClosure *nrc = next_cls;
683 if (GNUNET_YES == end_it)
684 nrc->end_it = GNUNET_YES;
685 nrc->plugin->next_task_nc = nrc;
686 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
687 &postgres_next_request_cont,
693 * Update the priority for a particular key in the datastore. If
694 * the expiration time in value is different than the time found in
695 * the datastore, the higher value should be kept. For the
696 * anonymity level, the lower value is to be used. The specified
697 * priority should be added to the existing priority, ignoring the
700 * Note that it is possible for multiple values to match this put.
701 * In that case, all of the respective values are updated.
703 * @param cls our "struct Plugin*"
704 * @param uid unique identifier of the datum
705 * @param delta by how much should the priority
706 * change? If priority + delta < 0 the
707 * priority should be set to 0 (never go
709 * @param expire new expiration time should be the
710 * MAX of any existing expiration time and
712 * @param msg set to error message
713 * @return GNUNET_OK on success
716 postgres_plugin_update (void *cls,
718 int delta, struct GNUNET_TIME_Absolute expire,
721 struct Plugin *plugin = cls;
723 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
724 uint32_t boid = htonl ( (uint32_t) uid);
725 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
726 const char *paramValues[] = {
727 (const char *) &bdelta,
728 (const char *) &bexpire,
729 (const char *) &boid,
731 int paramLengths[] = {
736 const int paramFormats[] = { 1, 1, 1 };
738 ret = PQexecPrepared (plugin->dbh,
740 3, paramValues, paramLengths, paramFormats, 1);
741 if (GNUNET_OK != check_result (plugin,
744 "PQexecPrepared", "update", __LINE__))
745 return GNUNET_SYSERR;
752 * Call a method for each key in the database and
753 * call the callback method on it.
755 * @param type entries of which type should be considered?
756 * @param iter maybe NULL (to just count); iter
757 * should return GNUNET_SYSERR to abort the
758 * iteration, GNUNET_NO to delete the entry and
759 * continue and GNUNET_OK to continue iterating
762 postgres_iterate (struct Plugin *plugin,
765 unsigned int iter_select,
766 PluginIterator iter, void *iter_cls)
768 struct NextRequestClosure *nrc;
770 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
771 nrc->plugin = plugin;
773 nrc->iter_cls = iter_cls;
776 nrc->blast_prio = htonl (0);
777 nrc->blast_rowid = htonl (0);
778 nrc->blast_expire = htonl (0);
782 nrc->blast_prio = htonl (0x7FFFFFFFL);
783 nrc->blast_rowid = htonl (0xFFFFFFFF);
784 nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
789 nrc->pname = "select_low_priority";
791 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
792 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
793 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
794 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
797 nrc->pname = "select_non_anonymous";
799 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
800 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
801 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
802 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
805 nrc->pname = "select_expiration_time";
807 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
808 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
809 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
810 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
813 nrc->pname = "select_migration_order";
815 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
816 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
817 nrc->paramValues[2] = (const char *) &nrc->bnow;
818 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
819 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
820 nrc->paramLengths[2] = sizeof (nrc->bnow);
825 NULL, NULL, 0, NULL, 0, 0, 0,
826 GNUNET_TIME_UNIT_ZERO_ABS, 0);
829 nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
830 postgres_plugin_next_request (nrc,
836 * Select a subset of the items in the datastore and call
837 * the given iterator for each of them.
839 * @param cls our "struct Plugin*"
840 * @param type entries of which type should be considered?
841 * Use 0 for any type.
842 * @param iter function to call on each matching value;
843 * will be called once with a NULL value at the end
844 * @param iter_cls closure for iter
847 postgres_plugin_iter_low_priority (void *cls,
848 enum GNUNET_BLOCK_Type type,
852 struct Plugin *plugin = cls;
854 postgres_iterate (plugin,
864 * Iterate over the results for a particular key
868 * @param key maybe NULL (to match all entries)
869 * @param vhash hash of the value, maybe NULL (to
870 * match all values that have the right key).
871 * Note that for DBlocks there is no difference
872 * betwen key and vhash, but for other blocks
874 * @param type entries of which type are relevant?
875 * Use 0 for any type.
876 * @param iter function to call on each matching value;
877 * will be called once with a NULL value at the end
878 * @param iter_cls closure for iter
881 postgres_plugin_get (void *cls,
882 const GNUNET_HashCode * key,
883 const GNUNET_HashCode * vhash,
884 enum GNUNET_BLOCK_Type type,
885 PluginIterator iter, void *iter_cls)
887 struct Plugin *plugin = cls;
888 struct NextRequestClosure *nrc;
889 const int paramFormats[] = { 1, 1, 1, 1, 1 };
894 postgres_plugin_iter_low_priority (plugin, type,
898 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
899 nrc->plugin = plugin;
901 nrc->iter_cls = iter_cls;
905 nrc->paramValues[0] = (const char*) &nrc->key;
906 nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
907 nrc->btype = htonl (type);
912 nrc->paramValues[1] = (const char *) &nrc->vhash;
913 nrc->paramLengths[1] = sizeof (nrc->vhash);
914 nrc->paramValues[2] = (const char *) &nrc->btype;
915 nrc->paramLengths[2] = sizeof (nrc->btype);
916 nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
917 nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
918 nrc->paramValues[4] = (const char *) &nrc->blimit_off;
919 nrc->paramLengths[4] = sizeof (nrc->blimit_off);
921 nrc->pname = "getvt";
922 ret = PQexecParams (plugin->dbh,
923 "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2 AND type=$3",
932 nrc->paramValues[1] = (const char *) &nrc->btype;
933 nrc->paramLengths[1] = sizeof (nrc->btype);
934 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
935 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
936 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
937 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
940 ret = PQexecParams (plugin->dbh,
941 "SELECT count(*) FROM gn080 WHERE hash=$1 AND type=$2",
953 nrc->paramValues[1] = (const char *) &nrc->vhash;
954 nrc->paramLengths[1] = sizeof (nrc->vhash);
955 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
956 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
957 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
958 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
961 ret = PQexecParams (plugin->dbh,
962 "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2",
971 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
972 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
973 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
974 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
977 ret = PQexecParams (plugin->dbh,
978 "SELECT count(*) FROM gn080 WHERE hash=$1",
986 if (GNUNET_OK != check_result (plugin,
994 NULL, NULL, 0, NULL, 0, 0, 0,
995 GNUNET_TIME_UNIT_ZERO_ABS, 0);
998 if ((PQntuples (ret) != 1) ||
999 (PQnfields (ret) != 1) ||
1000 (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1005 NULL, NULL, 0, NULL, 0, 0, 0,
1006 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1009 nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1011 if (nrc->total == 0)
1014 NULL, NULL, 0, NULL, 0, 0, 0,
1015 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1018 nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1020 postgres_plugin_next_request (nrc,
1026 * Select a subset of the items in the datastore and call
1027 * the given iterator for each of them.
1029 * @param cls our "struct Plugin*"
1030 * @param type entries of which type should be considered?
1031 * Use 0 for any type.
1032 * @param iter function to call on each matching value;
1033 * will be called once with a NULL value at the end
1034 * @param iter_cls closure for iter
1037 postgres_plugin_iter_zero_anonymity (void *cls,
1038 enum GNUNET_BLOCK_Type type,
1039 PluginIterator iter,
1042 struct Plugin *plugin = cls;
1044 postgres_iterate (plugin,
1051 * Select a subset of the items in the datastore and call
1052 * the given iterator for each of them.
1054 * @param cls our "struct Plugin*"
1055 * @param type entries of which type should be considered?
1056 * Use 0 for any type.
1057 * @param iter function to call on each matching value;
1058 * will be called once with a NULL value at the end
1059 * @param iter_cls closure for iter
1062 postgres_plugin_iter_ascending_expiration (void *cls,
1063 enum GNUNET_BLOCK_Type type,
1064 PluginIterator iter,
1067 struct Plugin *plugin = cls;
1069 postgres_iterate (plugin, type, GNUNET_YES, 2,
1076 * Select a subset of the items in the datastore and call
1077 * the given iterator for each of them.
1079 * @param cls our "struct Plugin*"
1080 * @param type entries of which type should be considered?
1081 * Use 0 for any type.
1082 * @param iter function to call on each matching value;
1083 * will be called once with a NULL value at the end
1084 * @param iter_cls closure for iter
1087 postgres_plugin_iter_migration_order (void *cls,
1088 enum GNUNET_BLOCK_Type type,
1089 PluginIterator iter,
1092 struct Plugin *plugin = cls;
1094 postgres_iterate (plugin, 0, GNUNET_NO, 3,
1101 * Select a subset of the items in the datastore and call
1102 * the given iterator for each of them.
1104 * @param cls our "struct Plugin*"
1105 * @param type entries of which type should be considered?
1106 * Use 0 for any type.
1107 * @param iter function to call on each matching value;
1108 * will be called once with a NULL value at the end
1109 * @param iter_cls closure for iter
1112 postgres_plugin_iter_all_now (void *cls,
1113 enum GNUNET_BLOCK_Type type,
1114 PluginIterator iter,
1117 struct Plugin *plugin = cls;
1119 postgres_iterate (plugin,
1129 postgres_plugin_drop (void *cls)
1131 struct Plugin *plugin = cls;
1133 pq_exec (plugin, "DROP TABLE gn080", __LINE__);
1138 * Entry point for the plugin.
1140 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1141 * @return our "struct Plugin*"
1144 libgnunet_plugin_datastore_postgres_init (void *cls)
1146 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1147 struct GNUNET_DATASTORE_PluginFunctions *api;
1148 struct Plugin *plugin;
1150 plugin = GNUNET_malloc (sizeof (struct Plugin));
1152 if (GNUNET_OK != init_connection (plugin))
1154 GNUNET_free (plugin);
1157 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1159 api->get_size = &postgres_plugin_get_size;
1160 api->put = &postgres_plugin_put;
1161 api->next_request = &postgres_plugin_next_request;
1162 api->get = &postgres_plugin_get;
1163 api->update = &postgres_plugin_update;
1164 api->iter_low_priority = &postgres_plugin_iter_low_priority;
1165 api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1166 api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1167 api->iter_migration_order = &postgres_plugin_iter_migration_order;
1168 api->iter_all_now = &postgres_plugin_iter_all_now;
1169 api->drop = &postgres_plugin_drop;
1170 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1171 "postgres", _("Postgres database running\n"));
1177 * Exit point from the plugin.
1178 * @param cls our "struct Plugin*"
1179 * @return always NULL
1182 libgnunet_plugin_datastore_postgres_done (void *cls)
1184 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1185 struct Plugin *plugin = api->cls;
1187 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1189 GNUNET_SCHEDULER_cancel (plugin->env->sched,
1191 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1192 GNUNET_free (plugin->next_task_nc);
1193 plugin->next_task_nc = NULL;
1195 PQfinish (plugin->dbh);
1196 GNUNET_free (plugin);
1201 /* end of plugin_datastore_postgres.c */