2 This file is part of GNUnet
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/plugin_datastore_postgres.c
23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
31 #define DEBUG_POSTGRES GNUNET_NO
33 #define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34 "WHERE (prio = $1 AND oid > $2) " \
35 "ORDER BY prio ASC,oid ASC LIMIT 1) "\
37 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38 "WHERE (prio > $1 AND oid != $2)"\
39 "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40 "ORDER BY prio ASC,oid ASC LIMIT 1"
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43 "WHERE (prio = $1 AND oid < $2)"\
44 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
46 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47 "WHERE (prio < $1 AND oid != $2)"\
48 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49 "ORDER BY prio DESC,oid DESC LIMIT 1"
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52 "WHERE (expire = $1 AND oid > $2) "\
53 "ORDER BY expire ASC,oid ASC LIMIT 1) "\
55 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56 "WHERE (expire > $1 AND oid != $2) " \
57 "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58 "ORDER BY expire ASC,oid ASC LIMIT 1"
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62 "WHERE (expire = $1 AND oid < $2)"\
63 " AND expire > $3 AND type!=3"\
64 " ORDER BY expire DESC,oid DESC LIMIT 1) "\
66 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67 "WHERE (expire < $1 AND oid != $2)" \
68 " AND expire > $3 AND type!=3"\
69 " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70 "ORDER BY expire DESC,oid DESC LIMIT 1"
73 * After how many ms "busy" should a DB operation fail for good?
74 * A low value makes sure that we are more responsive to requests
75 * (especially PUTs). A high value guarantees a higher success
76 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78 * The default value of 1s should ensure that users do not experience
79 * huge latencies while at the same time allowing operations to succeed
80 * with reasonable probability.
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
86 * Closure for 'postgres_next_request_cont'.
88 struct NextRequestClosure
93 struct Plugin *plugin;
96 * Function to call for each matching entry.
101 * Closure for 'iter'.
106 * Parameters for the prepared statement.
108 const char *paramValues[5];
111 * Name of the prepared statement to run.
116 * Size of values pointed to by paramValues.
121 * Number of paramters in paramValues/paramLengths.
126 * Current time (possible parameter), big-endian.
131 * Key (possible parameter)
136 * Hash of value (possible parameter)
138 GNUNET_HashCode vhash;
141 * Number of entries found so far
146 * Offset this iteration starts at.
151 * Current offset to use in query, big-endian.
156 * Overall number of matching entries.
158 unsigned long long total;
161 * Expiration value of previous result (possible parameter), big-endian.
163 uint64_t blast_expire;
166 * Row ID of last result (possible paramter), big-endian.
168 uint32_t blast_rowid;
171 * Priority of last result (possible parameter), big-endian.
176 * Type of block (possible paramter), big-endian.
181 * Flag set to GNUNET_YES to stop iteration.
188 * Context for all functions in this plugin.
193 * Our execution environment.
195 struct GNUNET_DATASTORE_PluginEnvironment *env;
198 * Native Postgres database handle.
203 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
205 struct NextRequestClosure *next_task_nc;
208 * Pending task with scheduler for running the next request.
210 GNUNET_SCHEDULER_TaskIdentifier next_task;
216 * Check if the result obtained from Postgres has
217 * the desired status code. If not, log an error, clear the
218 * result and return GNUNET_SYSERR.
220 * @param plugin global context
221 * @param ret result to check
222 * @param expected_status expected return value
223 * @param command name of SQL command that was run
224 * @param args arguments to SQL command
225 * @param line line number for error reporting
226 * @return GNUNET_OK if the result is acceptable
229 check_result (struct Plugin *plugin,
232 const char *command, const char *args, int line)
236 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
237 "datastore-postgres",
238 "Postgres failed to allocate result for `%s:%s' at %d\n",
239 command, args, line);
240 return GNUNET_SYSERR;
242 if (PQresultStatus (ret) != expected_status)
244 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
245 "datastore-postgres",
246 _("`%s:%s' failed at %s:%d with error: %s"),
247 command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
249 return GNUNET_SYSERR;
255 * Run simple SQL statement (without results).
257 * @param plugin global context
258 * @param sql statement to run
259 * @param line code line for error reporting
262 pq_exec (struct Plugin *plugin,
263 const char *sql, int line)
266 ret = PQexec (plugin->dbh, sql);
267 if (GNUNET_OK != check_result (plugin,
269 PGRES_COMMAND_OK, "PQexec", sql, line))
270 return GNUNET_SYSERR;
276 * Prepare SQL statement.
278 * @param plugin global context
279 * @param name name for the prepared SQL statement
280 * @param sql SQL code to prepare
281 * @param nparams number of parameters in sql
282 * @param line code line for error reporting
283 * @return GNUNET_OK on success
286 pq_prepare (struct Plugin *plugin,
287 const char *name, const char *sql, int nparams, int line)
290 ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
292 check_result (plugin,
293 ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
294 return GNUNET_SYSERR;
300 * @brief Get a database handle
302 * @param plugin global context
303 * @return GNUNET_OK on success, GNUNET_SYSERR on error
306 init_connection (struct Plugin *plugin)
311 /* Open database and precompile statements */
313 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
314 "datastore-postgres",
317 plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
318 if (NULL == plugin->dbh)
320 /* FIXME: warn about out-of-memory? */
321 GNUNET_free_non_null (conninfo);
322 return GNUNET_SYSERR;
324 if (PQstatus (plugin->dbh) != CONNECTION_OK)
326 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
327 "datastore-postgres",
328 _("Unable to initialize Postgres with configuration `%s': %s"),
330 PQerrorMessage (plugin->dbh));
331 PQfinish (plugin->dbh);
333 GNUNET_free_non_null (conninfo);
334 return GNUNET_SYSERR;
336 GNUNET_free_non_null (conninfo);
337 ret = PQexec (plugin->dbh,
338 "CREATE TABLE gn090 ("
339 " type INTEGER NOT NULL DEFAULT 0,"
340 " prio INTEGER NOT NULL DEFAULT 0,"
341 " anonLevel INTEGER NOT NULL DEFAULT 0,"
342 " expire BIGINT NOT NULL DEFAULT 0,"
343 " hash BYTEA NOT NULL DEFAULT '',"
344 " vhash BYTEA NOT NULL DEFAULT '',"
345 " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
346 if ( (ret == NULL) ||
347 ( (PQresultStatus (ret) != PGRES_COMMAND_OK) &&
348 (0 != strcmp ("42P07", /* duplicate table */
351 PG_DIAG_SQLSTATE)))))
353 check_result (plugin,
354 ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
355 PQfinish (plugin->dbh);
357 return GNUNET_SYSERR;
359 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
362 pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
364 pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
367 pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
369 pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
371 pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
375 (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
378 pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
382 PQfinish (plugin->dbh);
384 return GNUNET_SYSERR;
389 ret = PQexec (plugin->dbh,
390 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
392 check_result (plugin,
393 ret, PGRES_COMMAND_OK,
394 "ALTER TABLE", "gn090", __LINE__))
396 PQfinish (plugin->dbh);
398 return GNUNET_SYSERR;
401 ret = PQexec (plugin->dbh,
402 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
404 check_result (plugin,
405 ret, PGRES_COMMAND_OK,
406 "ALTER TABLE", "gn090", __LINE__))
408 PQfinish (plugin->dbh);
410 return GNUNET_SYSERR;
413 ret = PQexec (plugin->dbh,
414 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
416 check_result (plugin,
417 ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
419 PQfinish (plugin->dbh);
421 return GNUNET_SYSERR;
428 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
429 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
430 "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
436 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437 "WHERE hash=$1 AND type=$2"
438 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
444 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
445 "WHERE hash=$1 AND vhash=$2"
446 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
452 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
454 "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
460 "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
461 "VALUES ($1, $2, $3, $4, $5, $6, $7)",
467 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
473 "select_low_priority",
474 SELECT_IT_LOW_PRIORITY,
479 "select_non_anonymous",
480 SELECT_IT_NON_ANONYMOUS,
485 "select_expiration_time",
486 SELECT_IT_EXPIRATION_TIME,
491 "select_migration_order",
492 SELECT_IT_MIGRATION_ORDER,
498 "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
500 PQfinish (plugin->dbh);
502 return GNUNET_SYSERR;
509 * Delete the row identified by the given rowid (qid
512 * @param plugin global context
513 * @param rowid which row to delete
514 * @return GNUNET_OK on success
517 delete_by_rowid (struct Plugin *plugin,
520 const char *paramValues[] = { (const char *) &rowid };
521 int paramLengths[] = { sizeof (rowid) };
522 const int paramFormats[] = { 1 };
525 ret = PQexecPrepared (plugin->dbh,
527 1, paramValues, paramLengths, paramFormats, 1);
529 check_result (plugin,
530 ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
533 return GNUNET_SYSERR;
541 * Get an estimate of how much space the database is
544 * @param cls our "struct Plugin*"
545 * @return number of bytes used on disk
547 static unsigned long long
548 postgres_plugin_get_size (void *cls)
550 struct Plugin *plugin = cls;
551 unsigned long long total;
554 ret = PQexecParams (plugin->dbh,
555 "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
556 0, NULL, NULL, NULL, NULL, 1);
557 if (GNUNET_OK != check_result (plugin,
566 if ((PQntuples (ret) != 1) ||
567 (PQnfields (ret) != 1) ||
568 (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
574 total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
581 * Store an item in the datastore.
584 * @param key key for the item
585 * @param size number of bytes in data
586 * @param data content stored
587 * @param type type of the content
588 * @param priority priority of the content
589 * @param anonymity anonymity-level for the content
590 * @param expiration expiration time for the content
591 * @param msg set to error message
592 * @return GNUNET_OK on success
595 postgres_plugin_put (void *cls,
596 const GNUNET_HashCode * key,
599 enum GNUNET_BLOCK_Type type,
602 struct GNUNET_TIME_Absolute expiration,
605 struct Plugin *plugin = cls;
606 GNUNET_HashCode vhash;
608 uint32_t btype = htonl (type);
609 uint32_t bprio = htonl (priority);
610 uint32_t banon = htonl (anonymity);
611 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
612 const char *paramValues[] = {
613 (const char *) &btype,
614 (const char *) &bprio,
615 (const char *) &banon,
616 (const char *) &bexpi,
618 (const char *) &vhash,
621 int paramLengths[] = {
626 sizeof (GNUNET_HashCode),
627 sizeof (GNUNET_HashCode),
630 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
632 GNUNET_CRYPTO_hash (data, size, &vhash);
633 ret = PQexecPrepared (plugin->dbh,
634 "put", 7, paramValues, paramLengths, paramFormats, 1);
635 if (GNUNET_OK != check_result (plugin, ret,
637 "PQexecPrepared", "put", __LINE__))
638 return GNUNET_SYSERR;
640 plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
642 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
643 "datastore-postgres",
644 "Stored %u bytes in database\n",
645 (unsigned int) size);
651 * Function invoked on behalf of a "PluginIterator"
652 * asking the database plugin to call the iterator
653 * with the next item.
655 * @param next_cls the 'struct NextRequestClosure'
656 * @param tc scheduler context
659 postgres_next_request_cont (void *next_cls,
660 const struct GNUNET_SCHEDULER_TaskContext *tc)
662 struct NextRequestClosure *nrc = next_cls;
663 struct Plugin *plugin = nrc->plugin;
664 const int paramFormats[] = { 1, 1, 1, 1, 1 };
667 enum GNUNET_BLOCK_Type type;
672 struct GNUNET_TIME_Absolute expiration_time;
675 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
676 plugin->next_task_nc = NULL;
677 if ( (GNUNET_YES == nrc->end_it) ||
678 (nrc->count == nrc->total) )
681 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
682 "datastore-postgres",
683 "Ending iteration (%s)\n",
684 (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
686 nrc->iter (nrc->iter_cls,
687 NULL, NULL, 0, NULL, 0, 0, 0,
688 GNUNET_TIME_UNIT_ZERO_ABS, 0);
694 nrc->blimit_off = GNUNET_htonll (nrc->off);
696 nrc->blimit_off = GNUNET_htonll (0);
697 if (nrc->count + nrc->off == nrc->total)
698 nrc->blast_rowid = htonl (0); /* back to start */
700 res = PQexecPrepared (plugin->dbh,
706 if (GNUNET_OK != check_result (plugin,
714 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
715 "datastore-postgres",
716 "Ending iteration (postgres error)\n");
718 nrc->iter (nrc->iter_cls,
719 NULL, NULL, 0, NULL, 0, 0, 0,
720 GNUNET_TIME_UNIT_ZERO_ABS, 0);
725 if (0 == PQntuples (res))
729 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
730 "datastore-postgres",
731 "Ending iteration (no more results)\n");
733 nrc->iter (nrc->iter_cls,
734 NULL, NULL, 0, NULL, 0, 0, 0,
735 GNUNET_TIME_UNIT_ZERO_ABS, 0);
740 if ((1 != PQntuples (res)) ||
741 (7 != PQnfields (res)) ||
742 (sizeof (uint32_t) != PQfsize (res, 0)) ||
743 (sizeof (uint32_t) != PQfsize (res, 6)))
746 nrc->iter (nrc->iter_cls,
747 NULL, NULL, 0, NULL, 0, 0, 0,
748 GNUNET_TIME_UNIT_ZERO_ABS, 0);
753 rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
754 if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
755 (sizeof (uint32_t) != PQfsize (res, 1)) ||
756 (sizeof (uint32_t) != PQfsize (res, 2)) ||
757 (sizeof (uint64_t) != PQfsize (res, 3)) ||
758 (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
762 delete_by_rowid (plugin, rowid);
763 nrc->iter (nrc->iter_cls,
764 NULL, NULL, 0, NULL, 0, 0, 0,
765 GNUNET_TIME_UNIT_ZERO_ABS, 0);
770 type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
771 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
772 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
773 expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
774 memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
775 size = PQgetlength (res, 0, 5);
777 nrc->blast_prio = htonl (priority);
778 nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
779 nrc->blast_rowid = htonl (rowid);
783 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
784 "datastore-postgres",
785 "Found result of size %u bytes and type %u in database\n",
787 (unsigned int) type);
789 iret = nrc->iter (nrc->iter_cls,
793 PQgetvalue (res, 0, 5),
794 (enum GNUNET_BLOCK_Type) type,
800 if (iret == GNUNET_SYSERR)
803 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
804 "datastore-postgres",
805 "Ending iteration (client error)\n");
809 if (iret == GNUNET_NO)
811 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
814 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
815 "datastore-postgres",
816 "Deleting %u bytes from database\n",
817 (unsigned int) size);
819 plugin->env->duc (plugin->env->cls,
820 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
822 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
823 "datastore-postgres",
824 "Deleted %u bytes from database\n",
825 (unsigned int) size);
833 * Function invoked on behalf of a "PluginIterator"
834 * asking the database plugin to call the iterator
835 * with the next item.
837 * @param next_cls whatever argument was given
838 * to the PluginIterator as "next_cls".
839 * @param end_it set to GNUNET_YES if we
840 * should terminate the iteration early
841 * (iterator should be still called once more
842 * to signal the end of the iteration).
845 postgres_plugin_next_request (void *next_cls,
848 struct NextRequestClosure *nrc = next_cls;
850 if (GNUNET_YES == end_it)
851 nrc->end_it = GNUNET_YES;
852 nrc->plugin->next_task_nc = nrc;
853 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
859 * Update the priority for a particular key in the datastore. If
860 * the expiration time in value is different than the time found in
861 * the datastore, the higher value should be kept. For the
862 * anonymity level, the lower value is to be used. The specified
863 * priority should be added to the existing priority, ignoring the
866 * Note that it is possible for multiple values to match this put.
867 * In that case, all of the respective values are updated.
869 * @param cls our "struct Plugin*"
870 * @param uid unique identifier of the datum
871 * @param delta by how much should the priority
872 * change? If priority + delta < 0 the
873 * priority should be set to 0 (never go
875 * @param expire new expiration time should be the
876 * MAX of any existing expiration time and
878 * @param msg set to error message
879 * @return GNUNET_OK on success
882 postgres_plugin_update (void *cls,
884 int delta, struct GNUNET_TIME_Absolute expire,
887 struct Plugin *plugin = cls;
889 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
890 uint32_t boid = htonl ( (uint32_t) uid);
891 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
892 const char *paramValues[] = {
893 (const char *) &bdelta,
894 (const char *) &bexpire,
895 (const char *) &boid,
897 int paramLengths[] = {
902 const int paramFormats[] = { 1, 1, 1 };
904 ret = PQexecPrepared (plugin->dbh,
906 3, paramValues, paramLengths, paramFormats, 1);
907 if (GNUNET_OK != check_result (plugin,
910 "PQexecPrepared", "update", __LINE__))
911 return GNUNET_SYSERR;
918 * Call a method for each key in the database and
919 * call the callback method on it.
921 * @param plugin global context
922 * @param type entries of which type should be considered?
923 * @param is_asc ascending or descending iteration?
924 * @param iter_select which SELECT method should be used?
925 * @param iter maybe NULL (to just count); iter
926 * should return GNUNET_SYSERR to abort the
927 * iteration, GNUNET_NO to delete the entry and
928 * continue and GNUNET_OK to continue iterating
929 * @param iter_cls closure for 'iter'
932 postgres_iterate (struct Plugin *plugin,
935 unsigned int iter_select,
936 PluginIterator iter, void *iter_cls)
938 struct NextRequestClosure *nrc;
940 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
941 nrc->count = UINT32_MAX;
942 nrc->plugin = plugin;
944 nrc->iter_cls = iter_cls;
947 nrc->blast_prio = htonl (0);
948 nrc->blast_rowid = htonl (0);
949 nrc->blast_expire = htonl (0);
953 nrc->blast_prio = htonl (0x7FFFFFFFL);
954 nrc->blast_rowid = htonl (0xFFFFFFFF);
955 nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
960 nrc->pname = "select_low_priority";
962 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
963 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
964 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
965 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
968 nrc->pname = "select_non_anonymous";
970 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
971 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
972 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
973 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
976 nrc->pname = "select_expiration_time";
978 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
979 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
980 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
981 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
984 nrc->pname = "select_migration_order";
986 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
987 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
988 nrc->paramValues[2] = (const char *) &nrc->bnow;
989 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
990 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
991 nrc->paramLengths[2] = sizeof (nrc->bnow);
996 NULL, NULL, 0, NULL, 0, 0, 0,
997 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1001 nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
1002 postgres_plugin_next_request (nrc,
1008 * Select a subset of the items in the datastore and call
1009 * the given iterator for each of them.
1011 * @param cls our "struct Plugin*"
1012 * @param type entries of which type should be considered?
1013 * Use 0 for any type.
1014 * @param iter function to call on each matching value;
1015 * will be called once with a NULL value at the end
1016 * @param iter_cls closure for iter
1019 postgres_plugin_iter_low_priority (void *cls,
1020 enum GNUNET_BLOCK_Type type,
1021 PluginIterator iter,
1024 struct Plugin *plugin = cls;
1026 postgres_iterate (plugin,
1036 * Iterate over the results for a particular key
1039 * @param cls closure
1040 * @param key maybe NULL (to match all entries)
1041 * @param vhash hash of the value, maybe NULL (to
1042 * match all values that have the right key).
1043 * Note that for DBlocks there is no difference
1044 * betwen key and vhash, but for other blocks
1046 * @param type entries of which type are relevant?
1047 * Use 0 for any type.
1048 * @param iter function to call on each matching value;
1049 * will be called once with a NULL value at the end
1050 * @param iter_cls closure for iter
1053 postgres_plugin_get (void *cls,
1054 const GNUNET_HashCode * key,
1055 const GNUNET_HashCode * vhash,
1056 enum GNUNET_BLOCK_Type type,
1057 PluginIterator iter, void *iter_cls)
1059 struct Plugin *plugin = cls;
1060 struct NextRequestClosure *nrc;
1061 const int paramFormats[] = { 1, 1, 1, 1, 1 };
1066 postgres_plugin_iter_low_priority (plugin, type,
1070 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1071 nrc->plugin = plugin;
1073 nrc->iter_cls = iter_cls;
1076 nrc->vhash = *vhash;
1077 nrc->paramValues[0] = (const char*) &nrc->key;
1078 nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
1079 nrc->btype = htonl (type);
1084 nrc->paramValues[1] = (const char *) &nrc->vhash;
1085 nrc->paramLengths[1] = sizeof (nrc->vhash);
1086 nrc->paramValues[2] = (const char *) &nrc->btype;
1087 nrc->paramLengths[2] = sizeof (nrc->btype);
1088 nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
1089 nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
1090 nrc->paramValues[4] = (const char *) &nrc->blimit_off;
1091 nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1093 nrc->pname = "getvt";
1094 ret = PQexecParams (plugin->dbh,
1095 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
1104 nrc->paramValues[1] = (const char *) &nrc->btype;
1105 nrc->paramLengths[1] = sizeof (nrc->btype);
1106 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1107 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1108 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1109 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1111 nrc->pname = "gett";
1112 ret = PQexecParams (plugin->dbh,
1113 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
1125 nrc->paramValues[1] = (const char *) &nrc->vhash;
1126 nrc->paramLengths[1] = sizeof (nrc->vhash);
1127 nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1128 nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1129 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1130 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1132 nrc->pname = "getv";
1133 ret = PQexecParams (plugin->dbh,
1134 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
1143 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1144 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1145 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1146 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1149 ret = PQexecParams (plugin->dbh,
1150 "SELECT count(*) FROM gn090 WHERE hash=$1",
1158 if (GNUNET_OK != check_result (plugin,
1166 NULL, NULL, 0, NULL, 0, 0, 0,
1167 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1171 if ((PQntuples (ret) != 1) ||
1172 (PQnfields (ret) != 1) ||
1173 (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1178 NULL, NULL, 0, NULL, 0, 0, 0,
1179 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1183 nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1185 if (nrc->total == 0)
1188 NULL, NULL, 0, NULL, 0, 0, 0,
1189 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1193 nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1195 postgres_plugin_next_request (nrc,
1201 * Select a subset of the items in the datastore and call
1202 * the given iterator for each of them.
1204 * @param cls our "struct Plugin*"
1205 * @param type entries of which type should be considered?
1206 * Use 0 for any type.
1207 * @param iter function to call on each matching value;
1208 * will be called once with a NULL value at the end
1209 * @param iter_cls closure for iter
1212 postgres_plugin_iter_zero_anonymity (void *cls,
1213 enum GNUNET_BLOCK_Type type,
1214 PluginIterator iter,
1217 struct Plugin *plugin = cls;
1219 postgres_iterate (plugin,
1226 * Select a subset of the items in the datastore and call
1227 * the given iterator for each of them.
1229 * @param cls our "struct Plugin*"
1230 * @param type entries of which type should be considered?
1231 * Use 0 for any type.
1232 * @param iter function to call on each matching value;
1233 * will be called once with a NULL value at the end
1234 * @param iter_cls closure for iter
1237 postgres_plugin_iter_ascending_expiration (void *cls,
1238 enum GNUNET_BLOCK_Type type,
1239 PluginIterator iter,
1242 struct Plugin *plugin = cls;
1244 postgres_iterate (plugin, type, GNUNET_YES, 2,
1250 * Select a subset of the items in the datastore and call
1251 * the given iterator for each of them.
1253 * @param cls our "struct Plugin*"
1254 * @param type entries of which type should be considered?
1255 * Use 0 for any type.
1256 * @param iter function to call on each matching value;
1257 * will be called once with a NULL value at the end
1258 * @param iter_cls closure for iter
1261 postgres_plugin_iter_migration_order (void *cls,
1262 enum GNUNET_BLOCK_Type type,
1263 PluginIterator iter,
1266 struct Plugin *plugin = cls;
1268 postgres_iterate (plugin, 0, GNUNET_NO, 3,
1274 * Select a subset of the items in the datastore and call
1275 * the given iterator for each of them.
1277 * @param cls our "struct Plugin*"
1278 * @param type entries of which type should be considered?
1279 * Use 0 for any type.
1280 * @param iter function to call on each matching value;
1281 * will be called once with a NULL value at the end
1282 * @param iter_cls closure for iter
1285 postgres_plugin_iter_all_now (void *cls,
1286 enum GNUNET_BLOCK_Type type,
1287 PluginIterator iter,
1290 struct Plugin *plugin = cls;
1292 postgres_iterate (plugin,
1302 postgres_plugin_drop (void *cls)
1304 struct Plugin *plugin = cls;
1306 pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1311 * Entry point for the plugin.
1313 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1314 * @return our "struct Plugin*"
1317 libgnunet_plugin_datastore_postgres_init (void *cls)
1319 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1320 struct GNUNET_DATASTORE_PluginFunctions *api;
1321 struct Plugin *plugin;
1323 plugin = GNUNET_malloc (sizeof (struct Plugin));
1325 if (GNUNET_OK != init_connection (plugin))
1327 GNUNET_free (plugin);
1330 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1332 api->get_size = &postgres_plugin_get_size;
1333 api->put = &postgres_plugin_put;
1334 api->next_request = &postgres_plugin_next_request;
1335 api->get = &postgres_plugin_get;
1336 api->update = &postgres_plugin_update;
1337 api->iter_low_priority = &postgres_plugin_iter_low_priority;
1338 api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1339 api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1340 api->iter_migration_order = &postgres_plugin_iter_migration_order;
1341 api->iter_all_now = &postgres_plugin_iter_all_now;
1342 api->drop = &postgres_plugin_drop;
1343 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1344 "datastore-postgres",
1345 _("Postgres database running\n"));
1351 * Exit point from the plugin.
1352 * @param cls our "struct Plugin*"
1353 * @return always NULL
1356 libgnunet_plugin_datastore_postgres_done (void *cls)
1358 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1359 struct Plugin *plugin = api->cls;
1361 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1363 GNUNET_SCHEDULER_cancel (plugin->next_task);
1364 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1365 GNUNET_free (plugin->next_task_nc);
1366 plugin->next_task_nc = NULL;
1368 PQfinish (plugin->dbh);
1369 GNUNET_free (plugin);
1374 /* end of plugin_datastore_postgres.c */