2 This file is part of GNUnet
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/plugin_datastore_postgres.c
23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
31 #define DEBUG_POSTGRES GNUNET_NO
33 #define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34 "WHERE (prio = $1 AND oid > $2) " \
35 "ORDER BY prio ASC,oid ASC LIMIT 1) "\
37 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38 "WHERE (prio > $1 AND oid != $2)"\
39 "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40 "ORDER BY prio ASC,oid ASC LIMIT 1"
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43 "WHERE (prio = $1 AND oid < $2)"\
44 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
46 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47 "WHERE (prio < $1 AND oid != $2)"\
48 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49 "ORDER BY prio DESC,oid DESC LIMIT 1"
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52 "WHERE (expire = $1 AND oid > $2) "\
53 "ORDER BY expire ASC,oid ASC LIMIT 1) "\
55 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56 "WHERE (expire > $1 AND oid != $2) " \
57 "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58 "ORDER BY expire ASC,oid ASC LIMIT 1"
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62 "WHERE (expire = $1 AND oid < $2)"\
63 " AND expire > $3 AND type!=3"\
64 " ORDER BY expire DESC,oid DESC LIMIT 1) "\
66 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67 "WHERE (expire < $1 AND oid != $2)" \
68 " AND expire > $3 AND type!=3"\
69 " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70 "ORDER BY expire DESC,oid DESC LIMIT 1"
73 * After how many ms "busy" should a DB operation fail for good?
74 * A low value makes sure that we are more responsive to requests
75 * (especially PUTs). A high value guarantees a higher success
76 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78 * The default value of 1s should ensure that users do not experience
79 * huge latencies while at the same time allowing operations to succeed
80 * with reasonable probability.
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
86 * Closure for 'postgres_next_request_cont'.
88 struct NextRequestClosure
93 struct Plugin *plugin;
96 * Function to call for each matching entry.
101 * Closure for 'iter'.
106 * Parameters for the prepared statement.
108 const char *paramValues[5];
111 * Name of the prepared statement to run.
116 * Size of values pointed to by paramValues.
121 * Number of paramters in paramValues/paramLengths.
126 * Current time (possible parameter), big-endian.
131 * Key (possible parameter)
136 * Hash of value (possible parameter)
138 GNUNET_HashCode vhash;
141 * Number of entries found so far
146 * Offset this iteration starts at.
151 * Current offset to use in query, big-endian.
156 * Overall number of matching entries.
158 unsigned long long total;
161 * Expiration value of previous result (possible parameter), big-endian.
163 uint64_t blast_expire;
166 * Row ID of last result (possible paramter), big-endian.
168 uint32_t blast_rowid;
171 * Priority of last result (possible parameter), big-endian.
176 * Type of block (possible paramter), big-endian.
181 * Flag set to GNUNET_YES to stop iteration.
188 * Context for all functions in this plugin.
193 * Our execution environment.
195 struct GNUNET_DATASTORE_PluginEnvironment *env;
198 * Native Postgres database handle.
203 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
205 struct NextRequestClosure *next_task_nc;
208 * Pending task with scheduler for running the next request.
210 GNUNET_SCHEDULER_TaskIdentifier next_task;
216 * Check if the result obtained from Postgres has
217 * the desired status code. If not, log an error, clear the
218 * result and return GNUNET_SYSERR.
220 * @param plugin global context
221 * @param ret result to check
222 * @param expected_status expected return value
223 * @param command name of SQL command that was run
224 * @param args arguments to SQL command
225 * @param line line number for error reporting
226 * @return GNUNET_OK if the result is acceptable
229 check_result (struct Plugin *plugin,
232 const char *command, const char *args, int line)
236 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
237 "datastore-postgres",
238 "Postgres failed to allocate result for `%s:%s' at %d\n",
239 command, args, line);
240 return GNUNET_SYSERR;
242 if (PQresultStatus (ret) != expected_status)
244 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
245 "datastore-postgres",
246 _("`%s:%s' failed at %s:%d with error: %s"),
247 command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
249 return GNUNET_SYSERR;
255 * Run simple SQL statement (without results).
257 * @param plugin global context
258 * @param sql statement to run
259 * @param line code line for error reporting
262 pq_exec (struct Plugin *plugin,
263 const char *sql, int line)
266 ret = PQexec (plugin->dbh, sql);
267 if (GNUNET_OK != check_result (plugin,
269 PGRES_COMMAND_OK, "PQexec", sql, line))
270 return GNUNET_SYSERR;
276 * Prepare SQL statement.
278 * @param plugin global context
279 * @param sql SQL code to prepare
280 * @param nparams number of parameters in sql
281 * @param line code line for error reporting
282 * @return GNUNET_OK on success
285 pq_prepare (struct Plugin *plugin,
286 const char *name, const char *sql, int nparms, int line)
289 ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
291 check_result (plugin,
292 ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
293 return GNUNET_SYSERR;
299 * @brief Get a database handle
301 * @param plugin global context
302 * @return GNUNET_OK on success, GNUNET_SYSERR on error
305 init_connection (struct Plugin *plugin)
310 /* Open database and precompile statements */
312 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
313 "datastore-postgres",
316 plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
317 GNUNET_free_non_null (conninfo);
318 if (NULL == plugin->dbh)
320 /* FIXME: warn about out-of-memory? */
321 return GNUNET_SYSERR;
323 if (PQstatus (plugin->dbh) != CONNECTION_OK)
325 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
326 "datastore-postgres",
327 _("Unable to initialize Postgres: %s"),
328 PQerrorMessage (plugin->dbh));
329 PQfinish (plugin->dbh);
331 return GNUNET_SYSERR;
333 ret = PQexec (plugin->dbh,
334 "CREATE TABLE gn090 ("
335 " type INTEGER NOT NULL DEFAULT 0,"
336 " prio INTEGER NOT NULL DEFAULT 0,"
337 " anonLevel INTEGER NOT NULL DEFAULT 0,"
338 " expire BIGINT NOT NULL DEFAULT 0,"
339 " hash BYTEA NOT NULL DEFAULT '',"
340 " vhash BYTEA NOT NULL DEFAULT '',"
341 " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
342 if ( (ret == NULL) ||
343 ( (PQresultStatus (ret) != PGRES_COMMAND_OK) &&
344 (0 != strcmp ("42P07", /* duplicate table */
347 PG_DIAG_SQLSTATE)))))
349 check_result (plugin,
350 ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
351 PQfinish (plugin->dbh);
353 return GNUNET_SYSERR;
355 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
358 pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
360 pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
363 pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
365 pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
367 pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
371 (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
374 pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
378 PQfinish (plugin->dbh);
380 return GNUNET_SYSERR;
385 ret = PQexec (plugin->dbh,
386 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
388 check_result (plugin,
389 ret, PGRES_COMMAND_OK,
390 "ALTER TABLE", "gn090", __LINE__))
392 PQfinish (plugin->dbh);
394 return GNUNET_SYSERR;
397 ret = PQexec (plugin->dbh,
398 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
400 check_result (plugin,
401 ret, PGRES_COMMAND_OK,
402 "ALTER TABLE", "gn090", __LINE__))
404 PQfinish (plugin->dbh);
406 return GNUNET_SYSERR;
409 ret = PQexec (plugin->dbh,
410 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
412 check_result (plugin,
413 ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
415 PQfinish (plugin->dbh);
417 return GNUNET_SYSERR;
424 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
425 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
426 "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
432 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
433 "WHERE hash=$1 AND type=$2"
434 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
440 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
441 "WHERE hash=$1 AND vhash=$2"
442 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
448 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
450 "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
456 "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
457 "VALUES ($1, $2, $3, $4, $5, $6, $7)",
463 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
469 "select_low_priority",
470 SELECT_IT_LOW_PRIORITY,
475 "select_non_anonymous",
476 SELECT_IT_NON_ANONYMOUS,
481 "select_expiration_time",
482 SELECT_IT_EXPIRATION_TIME,
487 "select_migration_order",
488 SELECT_IT_MIGRATION_ORDER,
494 "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
496 PQfinish (plugin->dbh);
498 return GNUNET_SYSERR;
505 * Delete the row identified by the given rowid (qid
508 * @param plugin global context
509 * @param rowid which row to delete
510 * @return GNUNET_OK on success
513 delete_by_rowid (struct Plugin *plugin,
516 const char *paramValues[] = { (const char *) &rowid };
517 int paramLengths[] = { sizeof (rowid) };
518 const int paramFormats[] = { 1 };
521 ret = PQexecPrepared (plugin->dbh,
523 1, paramValues, paramLengths, paramFormats, 1);
525 check_result (plugin,
526 ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
529 return GNUNET_SYSERR;
537 * Get an estimate of how much space the database is
540 * @param cls our "struct Plugin*"
541 * @return number of bytes used on disk
543 static unsigned long long
544 postgres_plugin_get_size (void *cls)
546 struct Plugin *plugin = cls;
547 unsigned long long total;
550 ret = PQexecParams (plugin->dbh,
551 "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
552 0, NULL, NULL, NULL, NULL, 1);
553 if (GNUNET_OK != check_result (plugin,
562 if ((PQntuples (ret) != 1) ||
563 (PQnfields (ret) != 1) ||
564 (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
570 total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
577 * Store an item in the datastore.
580 * @param key key for the item
581 * @param size number of bytes in data
582 * @param data content stored
583 * @param type type of the content
584 * @param priority priority of the content
585 * @param anonymity anonymity-level for the content
586 * @param expiration expiration time for the content
587 * @param msg set to error message
588 * @return GNUNET_OK on success
591 postgres_plugin_put (void *cls,
592 const GNUNET_HashCode * key,
595 enum GNUNET_BLOCK_Type type,
598 struct GNUNET_TIME_Absolute expiration,
601 struct Plugin *plugin = cls;
602 GNUNET_HashCode vhash;
604 uint32_t btype = htonl (type);
605 uint32_t bprio = htonl (priority);
606 uint32_t banon = htonl (anonymity);
607 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
608 const char *paramValues[] = {
609 (const char *) &btype,
610 (const char *) &bprio,
611 (const char *) &banon,
612 (const char *) &bexpi,
614 (const char *) &vhash,
617 int paramLengths[] = {
622 sizeof (GNUNET_HashCode),
623 sizeof (GNUNET_HashCode),
626 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
628 GNUNET_CRYPTO_hash (data, size, &vhash);
629 ret = PQexecPrepared (plugin->dbh,
630 "put", 7, paramValues, paramLengths, paramFormats, 1);
631 if (GNUNET_OK != check_result (plugin, ret,
633 "PQexecPrepared", "put", __LINE__))
634 return GNUNET_SYSERR;
636 plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
638 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
639 "datastore-postgres",
640 "Stored %u bytes in database\n",
641 (unsigned int) size);
647 * Function invoked on behalf of a "PluginIterator"
648 * asking the database plugin to call the iterator
649 * with the next item.
651 * @param cls the 'struct NextRequestClosure'
652 * @param tc scheduler context
655 postgres_next_request_cont (void *next_cls,
656 const struct GNUNET_SCHEDULER_TaskContext *tc)
658 struct NextRequestClosure *nrc = next_cls;
659 struct Plugin *plugin = nrc->plugin;
660 const int paramFormats[] = { 1, 1, 1, 1, 1 };
663 enum GNUNET_BLOCK_Type type;
668 struct GNUNET_TIME_Absolute expiration_time;
671 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
672 plugin->next_task_nc = NULL;
673 if ( (GNUNET_YES == nrc->end_it) ||
674 (nrc->count == nrc->total) )
677 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
678 "datastore-postgres",
679 "Ending iteration (%s)\n",
680 (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
682 nrc->iter (nrc->iter_cls,
683 NULL, NULL, 0, NULL, 0, 0, 0,
684 GNUNET_TIME_UNIT_ZERO_ABS, 0);
690 nrc->blimit_off = GNUNET_htonll (nrc->off);
692 nrc->blimit_off = GNUNET_htonll (0);
693 if (nrc->count + nrc->off == nrc->total)
694 nrc->blast_rowid = htonl (0); /* back to start */
696 res = PQexecPrepared (plugin->dbh,
702 if (GNUNET_OK != check_result (plugin,
710 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
711 "datastore-postgres",
712 "Ending iteration (postgres error)\n");
714 nrc->iter (nrc->iter_cls,
715 NULL, NULL, 0, NULL, 0, 0, 0,
716 GNUNET_TIME_UNIT_ZERO_ABS, 0);
721 if (0 == PQntuples (res))
725 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
726 "datastore-postgres",
727 "Ending iteration (no more results)\n");
729 nrc->iter (nrc->iter_cls,
730 NULL, NULL, 0, NULL, 0, 0, 0,
731 GNUNET_TIME_UNIT_ZERO_ABS, 0);
736 if ((1 != PQntuples (res)) ||
737 (7 != PQnfields (res)) ||
738 (sizeof (uint32_t) != PQfsize (res, 0)) ||
739 (sizeof (uint32_t) != PQfsize (res, 6)))
742 nrc->iter (nrc->iter_cls,
743 NULL, NULL, 0, NULL, 0, 0, 0,
744 GNUNET_TIME_UNIT_ZERO_ABS, 0);
749 rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
750 if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
751 (sizeof (uint32_t) != PQfsize (res, 1)) ||
752 (sizeof (uint32_t) != PQfsize (res, 2)) ||
753 (sizeof (uint64_t) != PQfsize (res, 3)) ||
754 (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
758 delete_by_rowid (plugin, rowid);
759 nrc->iter (nrc->iter_cls,
760 NULL, NULL, 0, NULL, 0, 0, 0,
761 GNUNET_TIME_UNIT_ZERO_ABS, 0);
766 type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
767 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
768 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
769 expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
770 memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
771 size = PQgetlength (res, 0, 5);
773 nrc->blast_prio = htonl (priority);
774 nrc->blast_expire = GNUNET_htonll (expiration_time.value);
775 nrc->blast_rowid = htonl (rowid);
779 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
780 "datastore-postgres",
781 "Found result of size %u bytes and type %u in database\n",
783 (unsigned int) type);
785 iret = nrc->iter (nrc->iter_cls,
789 PQgetvalue (res, 0, 5),
790 (enum GNUNET_BLOCK_Type) type,
796 if (iret == GNUNET_SYSERR)
799 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
800 "datastore-postgres",
801 "Ending iteration (client error)\n");
805 if (iret == GNUNET_NO)
807 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
810 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
811 "datastore-postgres",
812 "Deleting %u bytes from database\n",
813 (unsigned int) size);
815 plugin->env->duc (plugin->env->cls,
816 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
818 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
819 "datastore-postgres",
820 "Deleted %u bytes from database\n",
821 (unsigned int) size);
829 * Function invoked on behalf of a "PluginIterator"
830 * asking the database plugin to call the iterator
831 * with the next item.
833 * @param next_cls whatever argument was given
834 * to the PluginIterator as "next_cls".
835 * @param end_it set to GNUNET_YES if we
836 * should terminate the iteration early
837 * (iterator should be still called once more
838 * to signal the end of the iteration).
841 postgres_plugin_next_request (void *next_cls,
844 struct NextRequestClosure *nrc = next_cls;
846 if (GNUNET_YES == end_it)
847 nrc->end_it = GNUNET_YES;
848 nrc->plugin->next_task_nc = nrc;
849 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
850 &postgres_next_request_cont,
856 * Update the priority for a particular key in the datastore. If
857 * the expiration time in value is different than the time found in
858 * the datastore, the higher value should be kept. For the
859 * anonymity level, the lower value is to be used. The specified
860 * priority should be added to the existing priority, ignoring the
863 * Note that it is possible for multiple values to match this put.
864 * In that case, all of the respective values are updated.
866 * @param cls our "struct Plugin*"
867 * @param uid unique identifier of the datum
868 * @param delta by how much should the priority
869 * change? If priority + delta < 0 the
870 * priority should be set to 0 (never go
872 * @param expire new expiration time should be the
873 * MAX of any existing expiration time and
875 * @param msg set to error message
876 * @return GNUNET_OK on success
879 postgres_plugin_update (void *cls,
881 int delta, struct GNUNET_TIME_Absolute expire,
884 struct Plugin *plugin = cls;
886 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
887 uint32_t boid = htonl ( (uint32_t) uid);
888 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
889 const char *paramValues[] = {
890 (const char *) &bdelta,
891 (const char *) &bexpire,
892 (const char *) &boid,
894 int paramLengths[] = {
899 const int paramFormats[] = { 1, 1, 1 };
901 ret = PQexecPrepared (plugin->dbh,
903 3, paramValues, paramLengths, paramFormats, 1);
904 if (GNUNET_OK != check_result (plugin,
907 "PQexecPrepared", "update", __LINE__))
908 return GNUNET_SYSERR;
915 * Call a method for each key in the database and
916 * call the callback method on it.
918 * @param plugin global context
919 * @param type entries of which type should be considered?
920 * @param is_asc ascending or descending iteration?
921 * @param iter_select which SELECT method should be used?
922 * @param iter maybe NULL (to just count); iter
923 * should return GNUNET_SYSERR to abort the
924 * iteration, GNUNET_NO to delete the entry and
925 * continue and GNUNET_OK to continue iterating
926 * @param iter_cls closure for 'iter'
929 postgres_iterate (struct Plugin *plugin,
932 unsigned int iter_select,
933 PluginIterator iter, void *iter_cls)
935 struct NextRequestClosure *nrc;
937 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
938 nrc->count = UINT32_MAX;
939 nrc->plugin = plugin;
941 nrc->iter_cls = iter_cls;
944 nrc->blast_prio = htonl (0);
945 nrc->blast_rowid = htonl (0);
946 nrc->blast_expire = htonl (0);
950 nrc->blast_prio = htonl (0x7FFFFFFFL);
951 nrc->blast_rowid = htonl (0xFFFFFFFF);
952 nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
957 nrc->pname = "select_low_priority";
959 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
960 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
961 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
962 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
965 nrc->pname = "select_non_anonymous";
967 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
968 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
969 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
970 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
973 nrc->pname = "select_expiration_time";
975 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
976 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
977 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
978 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
981 nrc->pname = "select_migration_order";
983 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
984 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
985 nrc->paramValues[2] = (const char *) &nrc->bnow;
986 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
987 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
988 nrc->paramLengths[2] = sizeof (nrc->bnow);
993 NULL, NULL, 0, NULL, 0, 0, 0,
994 GNUNET_TIME_UNIT_ZERO_ABS, 0);
998 nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
999 postgres_plugin_next_request (nrc,
1005 * Select a subset of the items in the datastore and call
1006 * the given iterator for each of them.
1008 * @param cls our "struct Plugin*"
1009 * @param type entries of which type should be considered?
1010 * Use 0 for any type.
1011 * @param iter function to call on each matching value;
1012 * will be called once with a NULL value at the end
1013 * @param iter_cls closure for iter
1016 postgres_plugin_iter_low_priority (void *cls,
1017 enum GNUNET_BLOCK_Type type,
1018 PluginIterator iter,
1021 struct Plugin *plugin = cls;
1023 postgres_iterate (plugin,
1033 * Iterate over the results for a particular key
1036 * @param cls closure
1037 * @param key maybe NULL (to match all entries)
1038 * @param vhash hash of the value, maybe NULL (to
1039 * match all values that have the right key).
1040 * Note that for DBlocks there is no difference
1041 * betwen key and vhash, but for other blocks
1043 * @param type entries of which type are relevant?
1044 * Use 0 for any type.
1045 * @param iter function to call on each matching value;
1046 * will be called once with a NULL value at the end
1047 * @param iter_cls closure for iter
1050 postgres_plugin_get (void *cls,
1051 const GNUNET_HashCode * key,
1052 const GNUNET_HashCode * vhash,
1053 enum GNUNET_BLOCK_Type type,
1054 PluginIterator iter, void *iter_cls)
1056 struct Plugin *plugin = cls;
1057 struct NextRequestClosure *nrc;
1058 const int paramFormats[] = { 1, 1, 1, 1, 1 };
1063 postgres_plugin_iter_low_priority (plugin, type,
1067 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1068 nrc->plugin = plugin;
1070 nrc->iter_cls = iter_cls;
1073 nrc->vhash = *vhash;
1074 nrc->paramValues[0] = (const char*) &nrc->key;
1075 nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
1076 nrc->btype = htonl (type);
1081 nrc->paramValues[1] = (const char *) &nrc->vhash;
1082 nrc->paramLengths[1] = sizeof (nrc->vhash);
1083 nrc->paramValues[2] = (const char *) &nrc->btype;
1084 nrc->paramLengths[2] = sizeof (nrc->btype);
1085 nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
1086 nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
1087 nrc->paramValues[4] = (const char *) &nrc->blimit_off;
1088 nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1090 nrc->pname = "getvt";
1091 ret = PQexecParams (plugin->dbh,
1092 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
1101 nrc->paramValues[1] = (const char *) &nrc->btype;
1102 nrc->paramLengths[1] = sizeof (nrc->btype);
1103 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1104 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1105 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1106 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1108 nrc->pname = "gett";
1109 ret = PQexecParams (plugin->dbh,
1110 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
1122 nrc->paramValues[1] = (const char *) &nrc->vhash;
1123 nrc->paramLengths[1] = sizeof (nrc->vhash);
1124 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1125 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1126 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1127 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1129 nrc->pname = "getv";
1130 ret = PQexecParams (plugin->dbh,
1131 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
1140 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1141 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1142 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1143 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1146 ret = PQexecParams (plugin->dbh,
1147 "SELECT count(*) FROM gn090 WHERE hash=$1",
1155 if (GNUNET_OK != check_result (plugin,
1163 NULL, NULL, 0, NULL, 0, 0, 0,
1164 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1168 if ((PQntuples (ret) != 1) ||
1169 (PQnfields (ret) != 1) ||
1170 (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1175 NULL, NULL, 0, NULL, 0, 0, 0,
1176 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1180 nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1182 if (nrc->total == 0)
1185 NULL, NULL, 0, NULL, 0, 0, 0,
1186 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1190 nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1192 postgres_plugin_next_request (nrc,
1198 * Select a subset of the items in the datastore and call
1199 * the given iterator for each of them.
1201 * @param cls our "struct Plugin*"
1202 * @param type entries of which type should be considered?
1203 * Use 0 for any type.
1204 * @param iter function to call on each matching value;
1205 * will be called once with a NULL value at the end
1206 * @param iter_cls closure for iter
1209 postgres_plugin_iter_zero_anonymity (void *cls,
1210 enum GNUNET_BLOCK_Type type,
1211 PluginIterator iter,
1214 struct Plugin *plugin = cls;
1216 postgres_iterate (plugin,
1223 * Select a subset of the items in the datastore and call
1224 * the given iterator for each of them.
1226 * @param cls our "struct Plugin*"
1227 * @param type entries of which type should be considered?
1228 * Use 0 for any type.
1229 * @param iter function to call on each matching value;
1230 * will be called once with a NULL value at the end
1231 * @param iter_cls closure for iter
1234 postgres_plugin_iter_ascending_expiration (void *cls,
1235 enum GNUNET_BLOCK_Type type,
1236 PluginIterator iter,
1239 struct Plugin *plugin = cls;
1241 postgres_iterate (plugin, type, GNUNET_YES, 2,
1247 * Select a subset of the items in the datastore and call
1248 * the given iterator for each of them.
1250 * @param cls our "struct Plugin*"
1251 * @param type entries of which type should be considered?
1252 * Use 0 for any type.
1253 * @param iter function to call on each matching value;
1254 * will be called once with a NULL value at the end
1255 * @param iter_cls closure for iter
1258 postgres_plugin_iter_migration_order (void *cls,
1259 enum GNUNET_BLOCK_Type type,
1260 PluginIterator iter,
1263 struct Plugin *plugin = cls;
1265 postgres_iterate (plugin, 0, GNUNET_NO, 3,
1271 * Select a subset of the items in the datastore and call
1272 * the given iterator for each of them.
1274 * @param cls our "struct Plugin*"
1275 * @param type entries of which type should be considered?
1276 * Use 0 for any type.
1277 * @param iter function to call on each matching value;
1278 * will be called once with a NULL value at the end
1279 * @param iter_cls closure for iter
1282 postgres_plugin_iter_all_now (void *cls,
1283 enum GNUNET_BLOCK_Type type,
1284 PluginIterator iter,
1287 struct Plugin *plugin = cls;
1289 postgres_iterate (plugin,
1299 postgres_plugin_drop (void *cls)
1301 struct Plugin *plugin = cls;
1303 pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1308 * Entry point for the plugin.
1310 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1311 * @return our "struct Plugin*"
1314 libgnunet_plugin_datastore_postgres_init (void *cls)
1316 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1317 struct GNUNET_DATASTORE_PluginFunctions *api;
1318 struct Plugin *plugin;
1320 plugin = GNUNET_malloc (sizeof (struct Plugin));
1322 if (GNUNET_OK != init_connection (plugin))
1324 GNUNET_free (plugin);
1327 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1329 api->get_size = &postgres_plugin_get_size;
1330 api->put = &postgres_plugin_put;
1331 api->next_request = &postgres_plugin_next_request;
1332 api->get = &postgres_plugin_get;
1333 api->update = &postgres_plugin_update;
1334 api->iter_low_priority = &postgres_plugin_iter_low_priority;
1335 api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1336 api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1337 api->iter_migration_order = &postgres_plugin_iter_migration_order;
1338 api->iter_all_now = &postgres_plugin_iter_all_now;
1339 api->drop = &postgres_plugin_drop;
1340 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1341 "datastore-postgres",
1342 _("Postgres database running\n"));
1348 * Exit point from the plugin.
1349 * @param cls our "struct Plugin*"
1350 * @return always NULL
1353 libgnunet_plugin_datastore_postgres_done (void *cls)
1355 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1356 struct Plugin *plugin = api->cls;
1358 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1360 GNUNET_SCHEDULER_cancel (plugin->env->sched,
1362 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1363 GNUNET_free (plugin->next_task_nc);
1364 plugin->next_task_nc = NULL;
1366 PQfinish (plugin->dbh);
1367 GNUNET_free (plugin);
1372 /* end of plugin_datastore_postgres.c */