2 This file is part of GNUnet
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/plugin_datastore_postgres.c
23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
31 #define DEBUG_POSTGRES GNUNET_NO
33 #define SELECT_IT_LOW_PRIORITY "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
34 "WHERE (prio = $1 AND oid > $2) " \
35 "ORDER BY prio ASC,oid ASC LIMIT 1) "\
37 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
38 "WHERE (prio > $1 AND oid != $2)"\
39 "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40 "ORDER BY prio ASC,oid ASC LIMIT 1"
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
43 "WHERE (prio = $1 AND oid < $2)"\
44 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
46 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
47 "WHERE (prio < $1 AND oid != $2)"\
48 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49 "ORDER BY prio DESC,oid DESC LIMIT 1"
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
52 "WHERE (expire = $1 AND oid > $2) "\
53 "ORDER BY expire ASC,oid ASC LIMIT 1) "\
55 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
56 "WHERE (expire > $1 AND oid != $2) " \
57 "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58 "ORDER BY expire ASC,oid ASC LIMIT 1"
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
62 "WHERE (expire = $1 AND oid < $2)"\
63 " AND expire > $3 AND type!=3"\
64 " ORDER BY expire DESC,oid DESC LIMIT 1) "\
66 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
67 "WHERE (expire < $1 AND oid != $2)" \
68 " AND expire > $3 AND type!=3"\
69 " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70 "ORDER BY expire DESC,oid DESC LIMIT 1"
73 * After how many ms "busy" should a DB operation fail for good?
74 * A low value makes sure that we are more responsive to requests
75 * (especially PUTs). A high value guarantees a higher success
76 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78 * The default value of 1s should ensure that users do not experience
79 * huge latencies while at the same time allowing operations to succeed
80 * with reasonable probability.
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
85 struct NextRequestClosure
87 struct Plugin *plugin;
90 const char *paramValues[5];
96 GNUNET_HashCode vhash;
100 unsigned long long total;
101 uint64_t blast_expire;
102 uint32_t blast_rowid;
110 * Context for all functions in this plugin.
115 * Our execution environment.
117 struct GNUNET_DATASTORE_PluginEnvironment *env;
120 * Native Postgres database handle.
125 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
127 struct NextRequestClosure *next_task_nc;
130 * Pending task with scheduler for running the next request.
132 GNUNET_SCHEDULER_TaskIdentifier next_task;
134 unsigned long long payload;
136 unsigned int lastSync;
142 * Check if the result obtained from Postgres has
143 * the desired status code. If not, log an error, clear the
144 * result and return GNUNET_SYSERR.
146 * @return GNUNET_OK if the result is acceptable
149 check_result (struct Plugin *plugin,
152 const char *command, const char *args, int line)
156 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
157 "datastore-postgres",
158 "Postgres failed to allocate result for `%s:%s' at %d\n",
159 command, args, line);
160 return GNUNET_SYSERR;
162 if (PQresultStatus (ret) != expected_status)
164 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
165 "datastore-postgres",
166 _("`%s:%s' failed at %s:%d with error: %s"),
167 command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
169 return GNUNET_SYSERR;
175 * Run simple SQL statement (without results).
178 pq_exec (struct Plugin *plugin,
179 const char *sql, int line)
182 ret = PQexec (plugin->dbh, sql);
183 if (GNUNET_OK != check_result (plugin,
185 PGRES_COMMAND_OK, "PQexec", sql, line))
186 return GNUNET_SYSERR;
192 * Prepare SQL statement.
195 pq_prepare (struct Plugin *plugin,
196 const char *name, const char *sql, int nparms, int line)
199 ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
201 check_result (plugin,
202 ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
203 return GNUNET_SYSERR;
209 * @brief Get a database handle
210 * @return GNUNET_OK on success, GNUNET_SYSERR on error
213 init_connection (struct Plugin *plugin)
218 /* Open database and precompile statements */
220 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
221 "datastore-postgres",
224 plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
225 GNUNET_free_non_null (conninfo);
226 if (NULL == plugin->dbh)
228 /* FIXME: warn about out-of-memory? */
229 return GNUNET_SYSERR;
231 if (PQstatus (plugin->dbh) != CONNECTION_OK)
233 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
235 _("Unable to initialize Postgres: %s"),
236 PQerrorMessage (plugin->dbh));
237 PQfinish (plugin->dbh);
239 return GNUNET_SYSERR;
241 ret = PQexec (plugin->dbh,
242 "CREATE TABLE gn080 ("
243 " size INTEGER NOT NULL DEFAULT 0,"
244 " type INTEGER NOT NULL DEFAULT 0,"
245 " prio INTEGER NOT NULL DEFAULT 0,"
246 " anonLevel INTEGER NOT NULL DEFAULT 0,"
247 " expire BIGINT NOT NULL DEFAULT 0,"
248 " hash BYTEA NOT NULL DEFAULT '',"
249 " vhash BYTEA NOT NULL DEFAULT '',"
250 " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
251 if ( (ret == NULL) ||
252 ( (PQresultStatus (ret) != PGRES_COMMAND_OK) &&
253 (0 != strcmp ("42P07", /* duplicate table */
256 PG_DIAG_SQLSTATE)))))
258 check_result (plugin,
259 ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn080", __LINE__);
260 PQfinish (plugin->dbh);
262 return GNUNET_SYSERR;
264 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
267 pq_exec (plugin, "CREATE INDEX idx_hash ON gn080 (hash)", __LINE__)) ||
269 pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)",
272 pq_exec (plugin, "CREATE INDEX idx_prio ON gn080 (prio)", __LINE__))
274 pq_exec (plugin, "CREATE INDEX idx_expire ON gn080 (expire)", __LINE__))
276 pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)",
280 (plugin, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
283 pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)",
287 PQfinish (plugin->dbh);
289 return GNUNET_SYSERR;
294 ret = PQexec (plugin->dbh,
295 "ALTER TABLE gn080 ALTER value SET STORAGE EXTERNAL");
297 check_result (plugin,
298 ret, PGRES_COMMAND_OK,
299 "ALTER TABLE", "gn080", __LINE__))
301 PQfinish (plugin->dbh);
303 return GNUNET_SYSERR;
306 ret = PQexec (plugin->dbh,
307 "ALTER TABLE gn080 ALTER hash SET STORAGE PLAIN");
309 check_result (plugin,
310 ret, PGRES_COMMAND_OK,
311 "ALTER TABLE", "gn080", __LINE__))
313 PQfinish (plugin->dbh);
315 return GNUNET_SYSERR;
318 ret = PQexec (plugin->dbh,
319 "ALTER TABLE gn080 ALTER vhash SET STORAGE PLAIN");
321 check_result (plugin,
322 ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn080", __LINE__))
324 PQfinish (plugin->dbh);
326 return GNUNET_SYSERR;
333 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
334 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
335 "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
341 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
342 "WHERE hash=$1 AND type=$2"
343 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
349 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
350 "WHERE hash=$1 AND vhash=$2"
351 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
357 "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
359 "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
365 "INSERT INTO gn080 (size, type, prio, anonLevel, expire, hash, vhash, value) "
366 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
372 "UPDATE gn080 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
378 "select_low_priority",
379 SELECT_IT_LOW_PRIORITY,
384 "select_non_anonymous",
385 SELECT_IT_NON_ANONYMOUS,
390 "select_expiration_time",
391 SELECT_IT_EXPIRATION_TIME,
396 "select_migration_order",
397 SELECT_IT_MIGRATION_ORDER,
403 "DELETE FROM gn080 " "WHERE oid=$1", 1, __LINE__)))
405 PQfinish (plugin->dbh);
407 return GNUNET_SYSERR;
414 * Delete the row identified by the given rowid (qid
417 * @return GNUNET_OK on success
420 delete_by_rowid (struct Plugin *plugin,
423 const char *paramValues[] = { (const char *) &rowid };
424 int paramLengths[] = { sizeof (rowid) };
425 const int paramFormats[] = { 1 };
428 ret = PQexecPrepared (plugin->dbh,
430 1, paramValues, paramLengths, paramFormats, 1);
432 check_result (plugin,
433 ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
436 return GNUNET_SYSERR;
444 * Get an estimate of how much space the database is
447 * @param cls our "struct Plugin*"
448 * @return number of bytes used on disk
450 static unsigned long long
451 postgres_plugin_get_size (void *cls)
453 struct Plugin *plugin = cls;
456 ret = plugin->payload;
457 return (unsigned long long) (ret * 1.00);
458 /* benchmarking shows XX% overhead */
463 * Store an item in the datastore.
466 * @param key key for the item
467 * @param size number of bytes in data
468 * @param data content stored
469 * @param type type of the content
470 * @param priority priority of the content
471 * @param anonymity anonymity-level for the content
472 * @param expiration expiration time for the content
473 * @param msg set to error message
474 * @return GNUNET_OK on success
477 postgres_plugin_put (void *cls,
478 const GNUNET_HashCode * key,
481 enum GNUNET_BLOCK_Type type,
484 struct GNUNET_TIME_Absolute expiration,
487 struct Plugin *plugin = cls;
488 GNUNET_HashCode vhash;
490 uint32_t bsize = htonl (size);
491 uint32_t btype = htonl (type);
492 uint32_t bprio = htonl (priority);
493 uint32_t banon = htonl (anonymity);
494 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
495 const char *paramValues[] = {
496 (const char *) &bsize,
497 (const char *) &btype,
498 (const char *) &bprio,
499 (const char *) &banon,
500 (const char *) &bexpi,
502 (const char *) &vhash,
505 int paramLengths[] = {
511 sizeof (GNUNET_HashCode),
512 sizeof (GNUNET_HashCode),
515 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
517 GNUNET_CRYPTO_hash (data, size, &vhash);
518 ret = PQexecPrepared (plugin->dbh,
519 "put", 8, paramValues, paramLengths, paramFormats, 1);
520 if (GNUNET_OK != check_result (plugin, ret,
522 "PQexecPrepared", "put", __LINE__))
523 return GNUNET_SYSERR;
525 plugin->payload += size;
526 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
528 "Stored %u bytes in database, new payload is %llu\n",
530 (unsigned long long) plugin->payload);
535 * Function invoked on behalf of a "PluginIterator"
536 * asking the database plugin to call the iterator
537 * with the next item.
539 * @param cls the 'struct NextRequestClosure'
540 * @param tc scheduler context
543 postgres_next_request_cont (void *next_cls,
544 const struct GNUNET_SCHEDULER_TaskContext *tc)
546 struct NextRequestClosure *nrc = next_cls;
547 struct Plugin *plugin = nrc->plugin;
548 const int paramFormats[] = { 1, 1, 1, 1, 1 };
551 enum GNUNET_BLOCK_Type type;
556 struct GNUNET_TIME_Absolute expiration_time;
559 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
560 plugin->next_task_nc = NULL;
561 if ( (GNUNET_YES == nrc->end_it) ||
562 (nrc->count == nrc->total) )
564 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
566 "Ending iteration (%s)\n",
567 (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
568 nrc->iter (nrc->iter_cls,
569 NULL, NULL, 0, NULL, 0, 0, 0,
570 GNUNET_TIME_UNIT_ZERO_ABS, 0);
576 nrc->blimit_off = GNUNET_htonll (nrc->off);
578 nrc->blimit_off = GNUNET_htonll (0);
579 if (nrc->count + nrc->off == nrc->total)
580 nrc->blast_rowid = htonl (0); /* back to start */
582 res = PQexecPrepared (plugin->dbh,
588 if (GNUNET_OK != check_result (plugin,
595 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
597 "Ending iteration (postgres error)\n");
598 nrc->iter (nrc->iter_cls,
599 NULL, NULL, 0, NULL, 0, 0, 0,
600 GNUNET_TIME_UNIT_ZERO_ABS, 0);
605 if (0 == PQntuples (res))
608 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
610 "Ending iteration (no more results)\n");
611 nrc->iter (nrc->iter_cls,
612 NULL, NULL, 0, NULL, 0, 0, 0,
613 GNUNET_TIME_UNIT_ZERO_ABS, 0);
618 if ((1 != PQntuples (res)) ||
619 (8 != PQnfields (res)) ||
620 (sizeof (uint32_t) != PQfsize (res, 0)) ||
621 (sizeof (uint32_t) != PQfsize (res, 7)))
624 nrc->iter (nrc->iter_cls,
625 NULL, NULL, 0, NULL, 0, 0, 0,
626 GNUNET_TIME_UNIT_ZERO_ABS, 0);
631 rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 7));
632 size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
633 if ((sizeof (uint32_t) != PQfsize (res, 1)) ||
634 (sizeof (uint32_t) != PQfsize (res, 2)) ||
635 (sizeof (uint32_t) != PQfsize (res, 3)) ||
636 (sizeof (uint64_t) != PQfsize (res, 4)) ||
637 (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 5)) ||
638 (size != PQgetlength (res, 0, 6)))
642 delete_by_rowid (plugin, rowid);
643 nrc->iter (nrc->iter_cls,
644 NULL, NULL, 0, NULL, 0, 0, 0,
645 GNUNET_TIME_UNIT_ZERO_ABS, 0);
650 type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
651 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
652 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 3));
653 expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 4));
654 size = PQgetlength (res, 0, 6);
655 memcpy (&key, PQgetvalue (res, 0, 5), sizeof (GNUNET_HashCode));
657 nrc->blast_prio = htonl (priority);
658 nrc->blast_expire = GNUNET_htonll (expiration_time.value);
659 nrc->blast_rowid = htonl (rowid);
662 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
664 "Found result of size %u bytes and type %u in database\n",
666 (unsigned int) type);
667 iret = nrc->iter (nrc->iter_cls,
671 PQgetvalue (res, 0, 6),
672 (enum GNUNET_BLOCK_Type) type,
678 if (iret == GNUNET_SYSERR)
680 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
682 "Ending iteration (client error)\n");
685 if (iret == GNUNET_NO)
687 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
689 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
691 "Deleting %u bytes from database, current payload is %llu\n",
693 (unsigned long long) plugin->payload);
694 GNUNET_assert (plugin->payload >= size);
695 plugin->payload -= size;
696 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
698 "Deleted %u bytes from database, new payload is %llu\n",
700 (unsigned long long) plugin->payload);
707 * Function invoked on behalf of a "PluginIterator"
708 * asking the database plugin to call the iterator
709 * with the next item.
711 * @param next_cls whatever argument was given
712 * to the PluginIterator as "next_cls".
713 * @param end_it set to GNUNET_YES if we
714 * should terminate the iteration early
715 * (iterator should be still called once more
716 * to signal the end of the iteration).
719 postgres_plugin_next_request (void *next_cls,
722 struct NextRequestClosure *nrc = next_cls;
724 if (GNUNET_YES == end_it)
725 nrc->end_it = GNUNET_YES;
726 nrc->plugin->next_task_nc = nrc;
727 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
728 &postgres_next_request_cont,
734 * Update the priority for a particular key in the datastore. If
735 * the expiration time in value is different than the time found in
736 * the datastore, the higher value should be kept. For the
737 * anonymity level, the lower value is to be used. The specified
738 * priority should be added to the existing priority, ignoring the
741 * Note that it is possible for multiple values to match this put.
742 * In that case, all of the respective values are updated.
744 * @param cls our "struct Plugin*"
745 * @param uid unique identifier of the datum
746 * @param delta by how much should the priority
747 * change? If priority + delta < 0 the
748 * priority should be set to 0 (never go
750 * @param expire new expiration time should be the
751 * MAX of any existing expiration time and
753 * @param msg set to error message
754 * @return GNUNET_OK on success
757 postgres_plugin_update (void *cls,
759 int delta, struct GNUNET_TIME_Absolute expire,
762 struct Plugin *plugin = cls;
764 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
765 uint32_t boid = htonl ( (uint32_t) uid);
766 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
767 const char *paramValues[] = {
768 (const char *) &bdelta,
769 (const char *) &bexpire,
770 (const char *) &boid,
772 int paramLengths[] = {
777 const int paramFormats[] = { 1, 1, 1 };
779 ret = PQexecPrepared (plugin->dbh,
781 3, paramValues, paramLengths, paramFormats, 1);
782 if (GNUNET_OK != check_result (plugin,
785 "PQexecPrepared", "update", __LINE__))
786 return GNUNET_SYSERR;
793 * Call a method for each key in the database and
794 * call the callback method on it.
796 * @param type entries of which type should be considered?
797 * @param iter maybe NULL (to just count); iter
798 * should return GNUNET_SYSERR to abort the
799 * iteration, GNUNET_NO to delete the entry and
800 * continue and GNUNET_OK to continue iterating
803 postgres_iterate (struct Plugin *plugin,
806 unsigned int iter_select,
807 PluginIterator iter, void *iter_cls)
809 struct NextRequestClosure *nrc;
811 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
812 nrc->plugin = plugin;
814 nrc->iter_cls = iter_cls;
817 nrc->blast_prio = htonl (0);
818 nrc->blast_rowid = htonl (0);
819 nrc->blast_expire = htonl (0);
823 nrc->blast_prio = htonl (0x7FFFFFFFL);
824 nrc->blast_rowid = htonl (0xFFFFFFFF);
825 nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
830 nrc->pname = "select_low_priority";
832 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
833 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
834 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
835 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
838 nrc->pname = "select_non_anonymous";
840 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
841 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
842 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
843 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
846 nrc->pname = "select_expiration_time";
848 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
849 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
850 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
851 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
854 nrc->pname = "select_migration_order";
856 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
857 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
858 nrc->paramValues[2] = (const char *) &nrc->bnow;
859 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
860 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
861 nrc->paramLengths[2] = sizeof (nrc->bnow);
866 NULL, NULL, 0, NULL, 0, 0, 0,
867 GNUNET_TIME_UNIT_ZERO_ABS, 0);
870 nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
871 postgres_plugin_next_request (nrc,
877 * Select a subset of the items in the datastore and call
878 * the given iterator for each of them.
880 * @param cls our "struct Plugin*"
881 * @param type entries of which type should be considered?
882 * Use 0 for any type.
883 * @param iter function to call on each matching value;
884 * will be called once with a NULL value at the end
885 * @param iter_cls closure for iter
888 postgres_plugin_iter_low_priority (void *cls,
889 enum GNUNET_BLOCK_Type type,
893 struct Plugin *plugin = cls;
895 postgres_iterate (plugin,
905 * Iterate over the results for a particular key
909 * @param key maybe NULL (to match all entries)
910 * @param vhash hash of the value, maybe NULL (to
911 * match all values that have the right key).
912 * Note that for DBlocks there is no difference
913 * betwen key and vhash, but for other blocks
915 * @param type entries of which type are relevant?
916 * Use 0 for any type.
917 * @param iter function to call on each matching value;
918 * will be called once with a NULL value at the end
919 * @param iter_cls closure for iter
922 postgres_plugin_get (void *cls,
923 const GNUNET_HashCode * key,
924 const GNUNET_HashCode * vhash,
925 enum GNUNET_BLOCK_Type type,
926 PluginIterator iter, void *iter_cls)
928 struct Plugin *plugin = cls;
929 struct NextRequestClosure *nrc;
930 const int paramFormats[] = { 1, 1, 1, 1, 1 };
935 postgres_plugin_iter_low_priority (plugin, type,
939 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
940 nrc->plugin = plugin;
942 nrc->iter_cls = iter_cls;
946 nrc->paramValues[0] = (const char*) &nrc->key;
947 nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
948 nrc->btype = htonl (type);
953 nrc->paramValues[1] = (const char *) &nrc->vhash;
954 nrc->paramLengths[1] = sizeof (nrc->vhash);
955 nrc->paramValues[2] = (const char *) &nrc->btype;
956 nrc->paramLengths[2] = sizeof (nrc->btype);
957 nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
958 nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
959 nrc->paramValues[4] = (const char *) &nrc->blimit_off;
960 nrc->paramLengths[4] = sizeof (nrc->blimit_off);
962 nrc->pname = "getvt";
963 ret = PQexecParams (plugin->dbh,
964 "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2 AND type=$3",
973 nrc->paramValues[1] = (const char *) &nrc->btype;
974 nrc->paramLengths[1] = sizeof (nrc->btype);
975 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
976 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
977 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
978 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
981 ret = PQexecParams (plugin->dbh,
982 "SELECT count(*) FROM gn080 WHERE hash=$1 AND type=$2",
994 nrc->paramValues[1] = (const char *) &nrc->vhash;
995 nrc->paramLengths[1] = sizeof (nrc->vhash);
996 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
997 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
998 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
999 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1001 nrc->pname = "getv";
1002 ret = PQexecParams (plugin->dbh,
1003 "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2",
1012 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1013 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1014 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1015 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1018 ret = PQexecParams (plugin->dbh,
1019 "SELECT count(*) FROM gn080 WHERE hash=$1",
1027 if (GNUNET_OK != check_result (plugin,
1035 NULL, NULL, 0, NULL, 0, 0, 0,
1036 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1039 if ((PQntuples (ret) != 1) ||
1040 (PQnfields (ret) != 1) ||
1041 (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1046 NULL, NULL, 0, NULL, 0, 0, 0,
1047 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1050 nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1051 fprintf (stderr, "Total number of results: %llu\n",
1052 (unsigned long long) nrc->total);
1054 if (nrc->total == 0)
1057 NULL, NULL, 0, NULL, 0, 0, 0,
1058 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1061 nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1063 postgres_plugin_next_request (nrc,
1069 * Select a subset of the items in the datastore and call
1070 * the given iterator for each of them.
1072 * @param cls our "struct Plugin*"
1073 * @param type entries of which type should be considered?
1074 * Use 0 for any type.
1075 * @param iter function to call on each matching value;
1076 * will be called once with a NULL value at the end
1077 * @param iter_cls closure for iter
1080 postgres_plugin_iter_zero_anonymity (void *cls,
1081 enum GNUNET_BLOCK_Type type,
1082 PluginIterator iter,
1085 struct Plugin *plugin = cls;
1087 postgres_iterate (plugin,
1094 * Select a subset of the items in the datastore and call
1095 * the given iterator for each of them.
1097 * @param cls our "struct Plugin*"
1098 * @param type entries of which type should be considered?
1099 * Use 0 for any type.
1100 * @param iter function to call on each matching value;
1101 * will be called once with a NULL value at the end
1102 * @param iter_cls closure for iter
1105 postgres_plugin_iter_ascending_expiration (void *cls,
1106 enum GNUNET_BLOCK_Type type,
1107 PluginIterator iter,
1110 struct Plugin *plugin = cls;
1112 postgres_iterate (plugin, type, GNUNET_YES, 2,
1119 * Select a subset of the items in the datastore and call
1120 * the given iterator for each of them.
1122 * @param cls our "struct Plugin*"
1123 * @param type entries of which type should be considered?
1124 * Use 0 for any type.
1125 * @param iter function to call on each matching value;
1126 * will be called once with a NULL value at the end
1127 * @param iter_cls closure for iter
1130 postgres_plugin_iter_migration_order (void *cls,
1131 enum GNUNET_BLOCK_Type type,
1132 PluginIterator iter,
1135 struct Plugin *plugin = cls;
1137 postgres_iterate (plugin, 0, GNUNET_NO, 3,
1144 * Select a subset of the items in the datastore and call
1145 * the given iterator for each of them.
1147 * @param cls our "struct Plugin*"
1148 * @param type entries of which type should be considered?
1149 * Use 0 for any type.
1150 * @param iter function to call on each matching value;
1151 * will be called once with a NULL value at the end
1152 * @param iter_cls closure for iter
1155 postgres_plugin_iter_all_now (void *cls,
1156 enum GNUNET_BLOCK_Type type,
1157 PluginIterator iter,
1160 struct Plugin *plugin = cls;
1162 postgres_iterate (plugin,
1172 postgres_plugin_drop (void *cls)
1174 struct Plugin *plugin = cls;
1176 pq_exec (plugin, "DROP TABLE gn080", __LINE__);
1181 * Entry point for the plugin.
1183 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1184 * @return our "struct Plugin*"
1187 libgnunet_plugin_datastore_postgres_init (void *cls)
1189 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1190 struct GNUNET_DATASTORE_PluginFunctions *api;
1191 struct Plugin *plugin;
1193 plugin = GNUNET_malloc (sizeof (struct Plugin));
1195 if (GNUNET_OK != init_connection (plugin))
1197 GNUNET_free (plugin);
1200 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1202 api->get_size = &postgres_plugin_get_size;
1203 api->put = &postgres_plugin_put;
1204 api->next_request = &postgres_plugin_next_request;
1205 api->get = &postgres_plugin_get;
1206 api->update = &postgres_plugin_update;
1207 api->iter_low_priority = &postgres_plugin_iter_low_priority;
1208 api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1209 api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1210 api->iter_migration_order = &postgres_plugin_iter_migration_order;
1211 api->iter_all_now = &postgres_plugin_iter_all_now;
1212 api->drop = &postgres_plugin_drop;
1213 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1214 "postgres", _("Postgres database running\n"));
1220 * Exit point from the plugin.
1221 * @param cls our "struct Plugin*"
1222 * @return always NULL
1225 libgnunet_plugin_datastore_postgres_done (void *cls)
1227 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1228 struct Plugin *plugin = api->cls;
1230 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1232 GNUNET_SCHEDULER_cancel (plugin->env->sched,
1234 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1235 GNUNET_free (plugin->next_task_nc);
1236 plugin->next_task_nc = NULL;
1238 PQfinish (plugin->dbh);
1239 GNUNET_free (plugin);
1244 /* end of plugin_datastore_postgres.c */