7b04cc68a7e6d9ef17f223172f5df06e98651a82
[oweals/gnunet.git] / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include "gnunet_pq_lib.h"
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   PGresult *ret;
74
75   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
76   if (NULL == plugin->dbh)
77     return GNUNET_SYSERR;
78
79   /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
80    * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
81    * we do math or inequality tests, so we can't handle the entire range of uint32_t.
82    * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
83    */
84   ret =
85       PQexec (plugin->dbh,
86               "CREATE TABLE IF NOT EXISTS gn090 ("
87               "  repl INTEGER NOT NULL DEFAULT 0,"
88               "  type INTEGER NOT NULL DEFAULT 0,"
89               "  prio INTEGER NOT NULL DEFAULT 0,"
90               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
91               "  expire BIGINT NOT NULL DEFAULT 0,"
92               "  rvalue BIGINT NOT NULL DEFAULT 0,"
93               "  hash BYTEA NOT NULL DEFAULT '',"
94               "  vhash BYTEA NOT NULL DEFAULT '',"
95               "  value BYTEA NOT NULL DEFAULT '')"
96               "WITH OIDS");
97   if ( (NULL == ret) ||
98        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
99         (0 != strcmp ("42P07",    /* duplicate table */
100                       PQresultErrorField
101                       (ret,
102                        PG_DIAG_SQLSTATE)))))
103   {
104     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
105                                          ret,
106                                          PGRES_COMMAND_OK,
107                                          "CREATE TABLE",
108                                          "gn090");
109     PQfinish (plugin->dbh);
110     plugin->dbh = NULL;
111     return GNUNET_SYSERR;
112   }
113
114   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
115   {
116     if ((GNUNET_OK !=
117          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)")) ||
118         (GNUNET_OK !=
119          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_hash_vhash ON gn090 (hash,vhash)")) ||
120         (GNUNET_OK !=
121          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)")) ||
122         (GNUNET_OK !=
123          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)")) ||
124         (GNUNET_OK !=
125          GNUNET_POSTGRES_exec (plugin->dbh,
126                                "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)")) ||
127         (GNUNET_OK !=
128          GNUNET_POSTGRES_exec (plugin->dbh,
129                                "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
130         (GNUNET_OK !=
131          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
132         (GNUNET_OK !=
133          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)")))
134     {
135       PQclear (ret);
136       PQfinish (plugin->dbh);
137       plugin->dbh = NULL;
138       return GNUNET_SYSERR;
139     }
140   }
141   PQclear (ret);
142
143   ret =
144       PQexec (plugin->dbh,
145               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
146   if (GNUNET_OK !=
147       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
148   {
149     PQfinish (plugin->dbh);
150     plugin->dbh = NULL;
151     return GNUNET_SYSERR;
152   }
153   PQclear (ret);
154   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
155   if (GNUNET_OK !=
156       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
157   {
158     PQfinish (plugin->dbh);
159     plugin->dbh = NULL;
160     return GNUNET_SYSERR;
161   }
162   PQclear (ret);
163   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
164   if (GNUNET_OK !=
165       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
166   {
167     PQfinish (plugin->dbh);
168     plugin->dbh = NULL;
169     return GNUNET_SYSERR;
170   }
171   PQclear (ret);
172   if ((GNUNET_OK !=
173        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
174                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
175                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
176                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
177       (GNUNET_OK !=
178        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
179                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
180                    "WHERE hash=$1 AND type=$2 "
181                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
182       (GNUNET_OK !=
183        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
184                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
185                    "WHERE hash=$1 AND vhash=$2 "
186                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
187       (GNUNET_OK !=
188        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
189                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
190                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
191       (GNUNET_OK !=
192        GNUNET_POSTGRES_prepare (plugin->dbh, "count_getvt",
193                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3)) ||
194       (GNUNET_OK !=
195        GNUNET_POSTGRES_prepare (plugin->dbh, "count_gett",
196                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2)) ||
197       (GNUNET_OK !=
198        GNUNET_POSTGRES_prepare (plugin->dbh, "count_getv",
199                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "count_get",
202                                 "SELECT count(*) FROM gn090 WHERE hash=$1", 1)) ||
203       (GNUNET_OK !=
204        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
205                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
206                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
207       (GNUNET_OK !=
208        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
209                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
210                    "WHERE oid = $3", 3)) ||
211       (GNUNET_OK !=
212        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
213                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
214                    "WHERE oid = $1", 1)) ||
215       (GNUNET_OK !=
216        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
217                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
218                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
219                    1)) ||
220       (GNUNET_OK !=
221        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
222                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
223                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
224                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
225                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
226                    1)) ||
227       (GNUNET_OK !=
228        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
229                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
230                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
231       (GNUNET_OK !=
232        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
233       (GNUNET_OK !=
234        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
235   {
236     PQfinish (plugin->dbh);
237     plugin->dbh = NULL;
238     return GNUNET_SYSERR;
239   }
240   return GNUNET_OK;
241 }
242
243
244 /**
245  * Get an estimate of how much space the database is
246  * currently using.
247  *
248  * @param cls our `struct Plugin *`
249  * @return number of bytes used on disk
250  */
251 static void
252 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
253 {
254   struct Plugin *plugin = cls;
255   unsigned long long total;
256   PGresult *ret;
257
258   if (NULL == estimate)
259     return;
260   ret =
261       PQexecParams (plugin->dbh,
262                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
263                     NULL, NULL, NULL, NULL, 1);
264   if (GNUNET_OK !=
265       GNUNET_POSTGRES_check_result (plugin->dbh,
266                                     ret,
267                                     PGRES_TUPLES_OK,
268                                     "PQexecParams",
269                                     "get_size"))
270   {
271     *estimate = 0;
272     return;
273   }
274   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
275   {
276     GNUNET_break (0);
277     PQclear (ret);
278     *estimate = 0;
279     return;
280   }
281   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
282   {
283     GNUNET_break (0 == PQgetlength (ret, 0, 0));
284     PQclear (ret);
285     *estimate = 0;
286     return;
287   }
288   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
289   PQclear (ret);
290   *estimate = total;
291 }
292
293
294 /**
295  * Store an item in the datastore.
296  *
297  * @param cls closure with the `struct Plugin`
298  * @param key key for the item
299  * @param size number of bytes in data
300  * @param data content stored
301  * @param type type of the content
302  * @param priority priority of the content
303  * @param anonymity anonymity-level for the content
304  * @param replication replication-level for the content
305  * @param expiration expiration time for the content
306  * @param cont continuation called with success or failure status
307  * @param cont_cls continuation closure
308  */
309 static void
310 postgres_plugin_put (void *cls,
311                      const struct GNUNET_HashCode *key,
312                      uint32_t size,
313                      const void *data,
314                      enum GNUNET_BLOCK_Type type,
315                      uint32_t priority,
316                      uint32_t anonymity,
317                      uint32_t replication,
318                      struct GNUNET_TIME_Absolute expiration,
319                      PluginPutCont cont,
320                      void *cont_cls)
321 {
322   struct Plugin *plugin = cls;
323   uint32_t utype = type;
324   struct GNUNET_HashCode vhash;
325   PGresult *ret;
326   struct GNUNET_PQ_QueryParam params[] = {
327     GNUNET_PQ_query_param_uint32 (&replication),
328     GNUNET_PQ_query_param_uint32 (&utype),
329     GNUNET_PQ_query_param_uint32 (&priority),
330     GNUNET_PQ_query_param_uint32 (&anonymity),
331     GNUNET_PQ_query_param_absolute_time (&expiration),
332     GNUNET_PQ_query_param_auto_from_type (key),
333     GNUNET_PQ_query_param_auto_from_type (&vhash),
334     GNUNET_PQ_query_param_fixed_size (data, size),
335     GNUNET_PQ_query_param_end
336   };
337
338   GNUNET_CRYPTO_hash (data, size, &vhash);
339   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
340                                  "put",
341                                  params);
342   if (GNUNET_OK !=
343       GNUNET_POSTGRES_check_result (plugin->dbh,
344                                     ret,
345                                     PGRES_COMMAND_OK,
346                                     "PQexecPrepared", "put"))
347   {
348     cont (cont_cls, key, size,
349           GNUNET_SYSERR,
350           _("Postgress exec failure"));
351     return;
352   }
353   PQclear (ret);
354   plugin->env->duc (plugin->env->cls,
355                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
356   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
357                    "datastore-postgres",
358                    "Stored %u bytes in database\n",
359                    (unsigned int) size);
360   cont (cont_cls, key, size, GNUNET_OK, NULL);
361 }
362
363
364 /**
365  * Function invoked to process the result and call the processor.
366  *
367  * @param plugin global plugin data
368  * @param proc function to call the value (once only).
369  * @param proc_cls closure for proc
370  * @param res result from exec
371  * @param filename filename for error messages
372  * @param line line number for error messages
373  */
374 static void
375 process_result (struct Plugin *plugin,
376                 PluginDatumProcessor proc,
377                 void *proc_cls,
378                 PGresult * res,
379                 const char *filename, int line)
380 {
381   int iret;
382   uint32_t rowid;
383   uint32_t utype;
384   uint32_t anonymity;
385   uint32_t priority;
386   size_t size;
387   void *data;
388   struct GNUNET_TIME_Absolute expiration_time;
389   struct GNUNET_HashCode key;
390   struct GNUNET_PQ_ResultSpec rs[] = {
391     GNUNET_PQ_result_spec_uint32 ("type", &utype),
392     GNUNET_PQ_result_spec_uint32 ("prio", &priority),
393     GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
394     GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
395     GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
396     GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
397     GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
398     GNUNET_PQ_result_spec_end
399   };
400
401   if (GNUNET_OK !=
402       GNUNET_POSTGRES_check_result_ (plugin->dbh,
403                                      res,
404                                      PGRES_TUPLES_OK,
405                                      "PQexecPrepared",
406                                      "select",
407                                      filename, line))
408   {
409     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
410                      "datastore-postgres",
411                      "Ending iteration (postgres error)\n");
412     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
413           GNUNET_TIME_UNIT_ZERO_ABS, 0);
414     return;
415   }
416
417   if (0 == PQntuples (res))
418   {
419     /* no result */
420     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
421                      "datastore-postgres",
422                      "Ending iteration (no more results)\n");
423     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
424           GNUNET_TIME_UNIT_ZERO_ABS, 0);
425     PQclear (res);
426     return;
427   }
428   if (1 != PQntuples (res))
429   {
430     GNUNET_break (0);
431     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
432           GNUNET_TIME_UNIT_ZERO_ABS, 0);
433     PQclear (res);
434     return;
435   }
436   if (GNUNET_OK !=
437       GNUNET_PQ_extract_result (res,
438                                 rs,
439                                 0))
440   {
441     GNUNET_break (0);
442     PQclear (res);
443     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
444                                      "delrow",
445                                      rowid);
446     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
447           GNUNET_TIME_UNIT_ZERO_ABS, 0);
448     return;
449   }
450
451   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
452                    "datastore-postgres",
453                    "Found result of size %u bytes and type %u in database\n",
454                    (unsigned int) size,
455                    (unsigned int) utype);
456   iret = proc (proc_cls,
457                &key,
458                size,
459                data,
460                (enum GNUNET_BLOCK_Type) utype,
461                priority,
462                anonymity,
463                expiration_time,
464                rowid);
465   PQclear (res);
466   if (iret == GNUNET_NO)
467   {
468     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
469                 "Processor asked for item %u to be removed.\n",
470                 (unsigned int) rowid);
471     if (GNUNET_OK ==
472         GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
473                                          "delrow",
474                                          rowid))
475     {
476       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
477                        "datastore-postgres",
478                        "Deleting %u bytes from database\n",
479                        (unsigned int) size);
480       plugin->env->duc (plugin->env->cls,
481                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
482       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
483                        "datastore-postgres",
484                        "Deleted %u bytes from database\n",
485                        (unsigned int) size);
486     }
487   }
488 }
489
490
491 /**
492  * Iterate over the results for a particular key
493  * in the datastore.
494  *
495  * @param cls closure with the 'struct Plugin'
496  * @param offset offset of the result (modulo num-results);
497  *        specific ordering does not matter for the offset
498  * @param key maybe NULL (to match all entries)
499  * @param vhash hash of the value, maybe NULL (to
500  *        match all values that have the right key).
501  *        Note that for DBlocks there is no difference
502  *        betwen key and vhash, but for other blocks
503  *        there may be!
504  * @param type entries of which type are relevant?
505  *     Use 0 for any type.
506  * @param proc function to call on the matching value;
507  *        will be called once with a NULL if no value matches
508  * @param proc_cls closure for iter
509  */
510 static void
511 postgres_plugin_get_key (void *cls,
512                          uint64_t offset,
513                          const struct GNUNET_HashCode *key,
514                          const struct GNUNET_HashCode *vhash,
515                          enum GNUNET_BLOCK_Type type,
516                          PluginDatumProcessor proc,
517                          void *proc_cls)
518 {
519   struct Plugin *plugin = cls;
520   uint32_t utype = type;
521   PGresult *ret;
522   uint64_t total;
523   uint64_t limit_off;
524
525   if (0 != type)
526   {
527     if (NULL != vhash)
528     {
529       struct GNUNET_PQ_QueryParam params[] = {
530         GNUNET_PQ_query_param_auto_from_type (key),
531         GNUNET_PQ_query_param_auto_from_type (vhash),
532         GNUNET_PQ_query_param_uint32 (&utype),
533         GNUNET_PQ_query_param_end
534       };
535       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
536                                      "count_getvt",
537                                      params);
538     }
539     else
540     {
541       struct GNUNET_PQ_QueryParam params[] = {
542         GNUNET_PQ_query_param_auto_from_type (key),
543         GNUNET_PQ_query_param_uint32 (&utype),
544         GNUNET_PQ_query_param_end
545       };
546       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
547                                      "count_gett",
548                                      params);
549     }
550   }
551   else
552   {
553     if (NULL != vhash)
554     {
555       struct GNUNET_PQ_QueryParam params[] = {
556         GNUNET_PQ_query_param_auto_from_type (key),
557         GNUNET_PQ_query_param_auto_from_type (vhash),
558         GNUNET_PQ_query_param_end
559       };
560       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
561                                      "count_getv",
562                                      params);
563     }
564     else
565     {
566       struct GNUNET_PQ_QueryParam params[] = {
567         GNUNET_PQ_query_param_auto_from_type (key),
568         GNUNET_PQ_query_param_end
569       };
570       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
571                                      "count_get",
572                                      params);
573     }
574   }
575
576   if (GNUNET_OK !=
577       GNUNET_POSTGRES_check_result (plugin->dbh,
578                                     ret,
579                                     PGRES_TUPLES_OK,
580                                     "PQexecParams",
581                                     "count"))
582   {
583     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
584           GNUNET_TIME_UNIT_ZERO_ABS, 0);
585     return;
586   }
587   if ( (PQntuples (ret) != 1) ||
588        (PQnfields (ret) != 1) ||
589        (PQgetlength (ret, 0, 0) != sizeof (uint64_t)))
590   {
591     GNUNET_break (0);
592     PQclear (ret);
593     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
594           GNUNET_TIME_UNIT_ZERO_ABS, 0);
595     return;
596   }
597   total = GNUNET_ntohll (*(const uint64_t *) PQgetvalue (ret, 0, 0));
598   PQclear (ret);
599   if (0 == total)
600   {
601     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
602           GNUNET_TIME_UNIT_ZERO_ABS, 0);
603     return;
604   }
605   limit_off = offset % total;
606
607   if (0 != type)
608   {
609     if (NULL != vhash)
610     {
611       struct GNUNET_PQ_QueryParam params[] = {
612         GNUNET_PQ_query_param_auto_from_type (key),
613         GNUNET_PQ_query_param_auto_from_type (vhash),
614         GNUNET_PQ_query_param_uint32 (&utype),
615         GNUNET_PQ_query_param_uint64 (&limit_off),
616         GNUNET_PQ_query_param_end
617       };
618       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
619                                      "getvt",
620                                      params);
621     }
622     else
623     {
624       struct GNUNET_PQ_QueryParam params[] = {
625         GNUNET_PQ_query_param_auto_from_type (key),
626         GNUNET_PQ_query_param_uint32 (&utype),
627         GNUNET_PQ_query_param_uint64 (&limit_off),
628         GNUNET_PQ_query_param_end
629       };
630       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
631                                      "gett",
632                                      params);
633     }
634   }
635   else
636   {
637     if (NULL != vhash)
638     {
639       struct GNUNET_PQ_QueryParam params[] = {
640         GNUNET_PQ_query_param_auto_from_type (key),
641         GNUNET_PQ_query_param_auto_from_type (vhash),
642         GNUNET_PQ_query_param_uint64 (&limit_off),
643         GNUNET_PQ_query_param_end
644       };
645       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
646                                      "getv",
647                                      params);
648     }
649     else
650     {
651       struct GNUNET_PQ_QueryParam params[] = {
652         GNUNET_PQ_query_param_auto_from_type (key),
653         GNUNET_PQ_query_param_uint64 (&limit_off),
654         GNUNET_PQ_query_param_end
655       };
656       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
657                                      "get",
658                                      params);
659     }
660   }
661   process_result (plugin,
662                   proc,
663                   proc_cls,
664                   ret,
665                   __FILE__, __LINE__);
666 }
667
668
669 /**
670  * Select a subset of the items in the datastore and call
671  * the given iterator for each of them.
672  *
673  * @param cls our `struct Plugin *`
674  * @param offset offset of the result (modulo num-results);
675  *        specific ordering does not matter for the offset
676  * @param type entries of which type should be considered?
677  *        Use 0 for any type.
678  * @param proc function to call on the matching value;
679  *        will be called with a NULL if no value matches
680  * @param proc_cls closure for @a proc
681  */
682 static void
683 postgres_plugin_get_zero_anonymity (void *cls,
684                                     uint64_t offset,
685                                     enum GNUNET_BLOCK_Type type,
686                                     PluginDatumProcessor proc,
687                                     void *proc_cls)
688 {
689   struct Plugin *plugin = cls;
690   uint32_t utype = type;
691   struct GNUNET_PQ_QueryParam params[] = {
692     GNUNET_PQ_query_param_uint32 (&utype),
693     GNUNET_PQ_query_param_uint64 (&offset),
694     GNUNET_PQ_query_param_end
695   };
696   PGresult *ret;
697
698   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
699                                  "select_non_anonymous",
700                                  params);
701
702   process_result (plugin,
703                   proc, proc_cls,
704                   ret,
705                   __FILE__, __LINE__);
706 }
707
708
709 /**
710  * Context for #repl_iter() function.
711  */
712 struct ReplCtx
713 {
714
715   /**
716    * Plugin handle.
717    */
718   struct Plugin *plugin;
719
720   /**
721    * Function to call for the result (or the NULL).
722    */
723   PluginDatumProcessor proc;
724
725   /**
726    * Closure for @e proc.
727    */
728   void *proc_cls;
729 };
730
731
732 /**
733  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
734  * Decrements the replication counter and calls the original
735  * iterator.
736  *
737  * @param cls closure with the `struct ReplCtx *`
738  * @param key key for the content
739  * @param size number of bytes in @a data
740  * @param data content stored
741  * @param type type of the content
742  * @param priority priority of the content
743  * @param anonymity anonymity-level for the content
744  * @param expiration expiration time for the content
745  * @param uid unique identifier for the datum;
746  *        maybe 0 if no unique identifier is available
747  * @return #GNUNET_SYSERR to abort the iteration,
748  *         #GNUNET_OK to continue
749  *         (continue on call to "next", of course),
750  *         #GNUNET_NO to delete the item and continue (if supported)
751  */
752 static int
753 repl_proc (void *cls,
754            const struct GNUNET_HashCode *key,
755            uint32_t size,
756            const void *data,
757            enum GNUNET_BLOCK_Type type,
758            uint32_t priority,
759            uint32_t anonymity,
760            struct GNUNET_TIME_Absolute expiration,
761            uint64_t uid)
762 {
763   struct ReplCtx *rc = cls;
764   struct Plugin *plugin = rc->plugin;
765   int ret;
766   uint32_t oid = (uint32_t) uid;
767   struct GNUNET_PQ_QueryParam params[] = {
768     GNUNET_PQ_query_param_uint32 (&oid),
769     GNUNET_PQ_query_param_end
770   };
771   PGresult *qret;
772
773   ret = rc->proc (rc->proc_cls,
774                   key,
775                   size, data,
776                   type,
777                   priority,
778                   anonymity,
779                   expiration, uid);
780   if (NULL == key)
781     return ret;
782   qret = GNUNET_PQ_exec_prepared (plugin->dbh,
783                                   "decrepl",
784                                   params);
785   if (GNUNET_OK !=
786       GNUNET_POSTGRES_check_result (plugin->dbh,
787                                     qret,
788                                     PGRES_COMMAND_OK,
789                                     "PQexecPrepared",
790                                     "decrepl"))
791     return GNUNET_SYSERR;
792   PQclear (qret);
793   return ret;
794 }
795
796
797 /**
798  * Get a random item for replication.  Returns a single, not expired,
799  * random item from those with the highest replication counters.  The
800  * item's replication counter is decremented by one IF it was positive
801  * before.  Call @a proc with all values ZERO or NULL if the datastore
802  * is empty.
803  *
804  * @param cls closure with the `struct Plugin`
805  * @param proc function to call the value (once only).
806  * @param proc_cls closure for @a proc
807  */
808 static void
809 postgres_plugin_get_replication (void *cls,
810                                  PluginDatumProcessor proc,
811                                  void *proc_cls)
812 {
813   struct Plugin *plugin = cls;
814   struct ReplCtx rc;
815   PGresult *ret;
816
817   rc.plugin = plugin;
818   rc.proc = proc;
819   rc.proc_cls = proc_cls;
820   ret = PQexecPrepared (plugin->dbh,
821                         "select_replication_order", 0, NULL, NULL,
822                         NULL, 1);
823   process_result (plugin,
824                   &repl_proc,
825                   &rc,
826                   ret,
827                   __FILE__, __LINE__);
828 }
829
830
831 /**
832  * Get a random item for expiration.  Call @a proc with all values
833  * ZERO or NULL if the datastore is empty.
834  *
835  * @param cls closure with the `struct Plugin`
836  * @param proc function to call the value (once only).
837  * @param proc_cls closure for @a proc
838  */
839 static void
840 postgres_plugin_get_expiration (void *cls,
841                                 PluginDatumProcessor proc,
842                                 void *proc_cls)
843 {
844   struct Plugin *plugin = cls;
845   struct GNUNET_TIME_Absolute now;
846   struct GNUNET_PQ_QueryParam params[] = {
847     GNUNET_PQ_query_param_absolute_time (&now),
848     GNUNET_PQ_query_param_end
849   };
850   PGresult *ret;
851
852   now = GNUNET_TIME_absolute_get ();
853   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
854                                  "select_expiration_order",
855                                  params);
856   process_result (plugin,
857                   proc, proc_cls,
858                   ret,
859                   __FILE__, __LINE__);
860 }
861
862
863 /**
864  * Update the priority for a particular key in the datastore.  If
865  * the expiration time in value is different than the time found in
866  * the datastore, the higher value should be kept.  For the
867  * anonymity level, the lower value is to be used.  The specified
868  * priority should be added to the existing priority, ignoring the
869  * priority in value.
870  *
871  * Note that it is possible for multiple values to match this put.
872  * In that case, all of the respective values are updated.
873  *
874  * @param cls our `struct Plugin *`
875  * @param uid unique identifier of the datum
876  * @param delta by how much should the priority
877  *     change?
878  * @param expire new expiration time should be the
879  *     MAX of any existing expiration time and
880  *     this value
881  * @param cont continuation called with success or failure status
882  * @param cons_cls continuation closure
883  */
884 static void
885 postgres_plugin_update (void *cls,
886                         uint64_t uid,
887                         uint32_t delta,
888                         struct GNUNET_TIME_Absolute expire,
889                         PluginUpdateCont cont,
890                         void *cont_cls)
891 {
892   struct Plugin *plugin = cls;
893   uint32_t oid = (uint32_t) uid;
894   struct GNUNET_PQ_QueryParam params[] = {
895     GNUNET_PQ_query_param_uint32 (&delta),
896     GNUNET_PQ_query_param_absolute_time (&expire),
897     GNUNET_PQ_query_param_uint32 (&oid),
898     GNUNET_PQ_query_param_end
899   };
900   PGresult *ret;
901
902   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
903                                  "update",
904                                  params);
905   if (GNUNET_OK !=
906       GNUNET_POSTGRES_check_result (plugin->dbh,
907                                     ret,
908                                     PGRES_COMMAND_OK,
909                                     "PQexecPrepared",
910                                     "update"))
911   {
912     cont (cont_cls,
913           GNUNET_SYSERR,
914           NULL);
915     return;
916   }
917   PQclear (ret);
918   cont (cont_cls,
919         GNUNET_OK,
920         NULL);
921 }
922
923
924 /**
925  * Get all of the keys in the datastore.
926  *
927  * @param cls closure with the `struct Plugin *`
928  * @param proc function to call on each key
929  * @param proc_cls closure for @a proc
930  */
931 static void
932 postgres_plugin_get_keys (void *cls,
933                           PluginKeyProcessor proc,
934                           void *proc_cls)
935 {
936   struct Plugin *plugin = cls;
937   int ret;
938   int i;
939   struct GNUNET_HashCode key;
940   PGresult * res;
941
942   res = PQexecPrepared (plugin->dbh,
943                         "get_keys",
944                         0, NULL, NULL, NULL, 1);
945   ret = PQntuples (res);
946   for (i=0;i<ret;i++)
947   {
948     if (sizeof (struct GNUNET_HashCode) !=
949         PQgetlength (res, i, 0))
950     {
951       GNUNET_memcpy (&key,
952               PQgetvalue (res, i, 0),
953               sizeof (struct GNUNET_HashCode));
954       proc (proc_cls, &key, 1);
955     }
956   }
957   PQclear (res);
958   proc (proc_cls, NULL, 0);
959 }
960
961
962 /**
963  * Drop database.
964  *
965  * @param cls closure with the `struct Plugin *`
966  */
967 static void
968 postgres_plugin_drop (void *cls)
969 {
970   struct Plugin *plugin = cls;
971
972   if (GNUNET_OK !=
973       GNUNET_POSTGRES_exec (plugin->dbh,
974                             "DROP TABLE gn090"))
975     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
976                      "postgres",
977                      _("Failed to drop table from database.\n"));
978 }
979
980
981 /**
982  * Entry point for the plugin.
983  *
984  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
985  * @return our `struct Plugin *`
986  */
987 void *
988 libgnunet_plugin_datastore_postgres_init (void *cls)
989 {
990   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
991   struct GNUNET_DATASTORE_PluginFunctions *api;
992   struct Plugin *plugin;
993
994   plugin = GNUNET_new (struct Plugin);
995   plugin->env = env;
996   if (GNUNET_OK != init_connection (plugin))
997   {
998     GNUNET_free (plugin);
999     return NULL;
1000   }
1001   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1002   api->cls = plugin;
1003   api->estimate_size = &postgres_plugin_estimate_size;
1004   api->put = &postgres_plugin_put;
1005   api->update = &postgres_plugin_update;
1006   api->get_key = &postgres_plugin_get_key;
1007   api->get_replication = &postgres_plugin_get_replication;
1008   api->get_expiration = &postgres_plugin_get_expiration;
1009   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1010   api->get_keys = &postgres_plugin_get_keys;
1011   api->drop = &postgres_plugin_drop;
1012   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1013                    "datastore-postgres",
1014                    _("Postgres database running\n"));
1015   return api;
1016 }
1017
1018
1019 /**
1020  * Exit point from the plugin.
1021  *
1022  * @param cls our `struct Plugin *`
1023  * @return always NULL
1024  */
1025 void *
1026 libgnunet_plugin_datastore_postgres_done (void *cls)
1027 {
1028   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1029   struct Plugin *plugin = api->cls;
1030
1031   PQfinish (plugin->dbh);
1032   GNUNET_free (plugin);
1033   GNUNET_free (api);
1034   return NULL;
1035 }
1036
1037 /* end of plugin_datastore_postgres.c */