we use CREATE INDEX IF NOT EXITS, this requires postgres>=9.6, bump dependency requir...
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include "gnunet_pq_lib.h"
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   PGresult *ret;
74
75   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
76   if (NULL == plugin->dbh)
77     return GNUNET_SYSERR;
78
79   /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
80    * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
81    * we do math or inequality tests, so we can't handle the entire range of uint32_t.
82    * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
83    */
84   ret =
85       PQexec (plugin->dbh,
86               "CREATE TABLE IF NOT EXISTS gn090 ("
87               "  repl INTEGER NOT NULL DEFAULT 0,"
88               "  type INTEGER NOT NULL DEFAULT 0,"
89               "  prio INTEGER NOT NULL DEFAULT 0,"
90               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
91               "  expire BIGINT NOT NULL DEFAULT 0,"
92               "  rvalue BIGINT NOT NULL DEFAULT 0,"
93               "  hash BYTEA NOT NULL DEFAULT '',"
94               "  vhash BYTEA NOT NULL DEFAULT '',"
95               "  value BYTEA NOT NULL DEFAULT '')"
96               "WITH OIDS");
97   if ( (NULL == ret) ||
98        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
99         (0 != strcmp ("42P07",    /* duplicate table */
100                       PQresultErrorField
101                       (ret,
102                        PG_DIAG_SQLSTATE)))))
103   {
104     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
105                                          ret,
106                                          PGRES_COMMAND_OK,
107                                          "CREATE TABLE",
108                                          "gn090");
109     PQfinish (plugin->dbh);
110     plugin->dbh = NULL;
111     return GNUNET_SYSERR;
112   }
113
114   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
115   {
116     if ((GNUNET_OK !=
117          GNUNET_POSTGRES_exec (plugin->dbh,
118                                "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)")) ||
119         (GNUNET_OK !=
120          GNUNET_POSTGRES_exec (plugin->dbh,
121                                "CREATE INDEX IF NOT EXISTS idx_hash_vhash ON gn090 (hash,vhash)")) ||
122         (GNUNET_OK !=
123          GNUNET_POSTGRES_exec (plugin->dbh,
124                                "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)")) ||
125         (GNUNET_OK !=
126          GNUNET_POSTGRES_exec (plugin->dbh,
127                                "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)")) ||
128         (GNUNET_OK !=
129          GNUNET_POSTGRES_exec (plugin->dbh,
130                                "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)")) ||
131         (GNUNET_OK !=
132          GNUNET_POSTGRES_exec (plugin->dbh,
133                                "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
134         (GNUNET_OK !=
135          GNUNET_POSTGRES_exec (plugin->dbh,
136                                "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
137         (GNUNET_OK !=
138          GNUNET_POSTGRES_exec (plugin->dbh,
139                                "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)")))
140     {
141       PQclear (ret);
142       PQfinish (plugin->dbh);
143       plugin->dbh = NULL;
144       return GNUNET_SYSERR;
145     }
146   }
147   PQclear (ret);
148
149   ret =
150       PQexec (plugin->dbh,
151               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
152   if (GNUNET_OK !=
153       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
154   {
155     PQfinish (plugin->dbh);
156     plugin->dbh = NULL;
157     return GNUNET_SYSERR;
158   }
159   PQclear (ret);
160   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
161   if (GNUNET_OK !=
162       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
163   {
164     PQfinish (plugin->dbh);
165     plugin->dbh = NULL;
166     return GNUNET_SYSERR;
167   }
168   PQclear (ret);
169   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
170   if (GNUNET_OK !=
171       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
172   {
173     PQfinish (plugin->dbh);
174     plugin->dbh = NULL;
175     return GNUNET_SYSERR;
176   }
177   PQclear (ret);
178   if ((GNUNET_OK !=
179        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
180                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
181                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
182                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
183       (GNUNET_OK !=
184        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
185                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
186                    "WHERE hash=$1 AND type=$2 "
187                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
188       (GNUNET_OK !=
189        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
190                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
191                    "WHERE hash=$1 AND vhash=$2 "
192                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
193       (GNUNET_OK !=
194        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
195                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
196                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
197       (GNUNET_OK !=
198        GNUNET_POSTGRES_prepare (plugin->dbh, "count_getvt",
199                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "count_gett",
202                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2)) ||
203       (GNUNET_OK !=
204        GNUNET_POSTGRES_prepare (plugin->dbh, "count_getv",
205                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2)) ||
206       (GNUNET_OK !=
207        GNUNET_POSTGRES_prepare (plugin->dbh, "count_get",
208                                 "SELECT count(*) FROM gn090 WHERE hash=$1", 1)) ||
209       (GNUNET_OK !=
210        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
211                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
212                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
213       (GNUNET_OK !=
214        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
215                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
216                    "WHERE oid = $3", 3)) ||
217       (GNUNET_OK !=
218        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
219                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
220                    "WHERE oid = $1", 1)) ||
221       (GNUNET_OK !=
222        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
223                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
224                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
225                    1)) ||
226       (GNUNET_OK !=
227        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
228                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
229                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
230                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
231                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
232                    1)) ||
233       (GNUNET_OK !=
234        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
235                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
236                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
237       (GNUNET_OK !=
238        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
239       (GNUNET_OK !=
240        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
241   {
242     PQfinish (plugin->dbh);
243     plugin->dbh = NULL;
244     return GNUNET_SYSERR;
245   }
246   return GNUNET_OK;
247 }
248
249
250 /**
251  * Get an estimate of how much space the database is
252  * currently using.
253  *
254  * @param cls our `struct Plugin *`
255  * @return number of bytes used on disk
256  */
257 static void
258 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
259 {
260   struct Plugin *plugin = cls;
261   unsigned long long total;
262   PGresult *ret;
263
264   if (NULL == estimate)
265     return;
266   ret =
267       PQexecParams (plugin->dbh,
268                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
269                     NULL, NULL, NULL, NULL, 1);
270   if (GNUNET_OK !=
271       GNUNET_POSTGRES_check_result (plugin->dbh,
272                                     ret,
273                                     PGRES_TUPLES_OK,
274                                     "PQexecParams",
275                                     "get_size"))
276   {
277     *estimate = 0;
278     return;
279   }
280   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
281   {
282     GNUNET_break (0);
283     PQclear (ret);
284     *estimate = 0;
285     return;
286   }
287   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
288   {
289     GNUNET_break (0 == PQgetlength (ret, 0, 0));
290     PQclear (ret);
291     *estimate = 0;
292     return;
293   }
294   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
295   PQclear (ret);
296   *estimate = total;
297 }
298
299
300 /**
301  * Store an item in the datastore.
302  *
303  * @param cls closure with the `struct Plugin`
304  * @param key key for the item
305  * @param size number of bytes in data
306  * @param data content stored
307  * @param type type of the content
308  * @param priority priority of the content
309  * @param anonymity anonymity-level for the content
310  * @param replication replication-level for the content
311  * @param expiration expiration time for the content
312  * @param cont continuation called with success or failure status
313  * @param cont_cls continuation closure
314  */
315 static void
316 postgres_plugin_put (void *cls,
317                      const struct GNUNET_HashCode *key,
318                      uint32_t size,
319                      const void *data,
320                      enum GNUNET_BLOCK_Type type,
321                      uint32_t priority,
322                      uint32_t anonymity,
323                      uint32_t replication,
324                      struct GNUNET_TIME_Absolute expiration,
325                      PluginPutCont cont,
326                      void *cont_cls)
327 {
328   struct Plugin *plugin = cls;
329   uint32_t utype = type;
330   struct GNUNET_HashCode vhash;
331   PGresult *ret;
332   struct GNUNET_PQ_QueryParam params[] = {
333     GNUNET_PQ_query_param_uint32 (&replication),
334     GNUNET_PQ_query_param_uint32 (&utype),
335     GNUNET_PQ_query_param_uint32 (&priority),
336     GNUNET_PQ_query_param_uint32 (&anonymity),
337     GNUNET_PQ_query_param_absolute_time (&expiration),
338     GNUNET_PQ_query_param_auto_from_type (key),
339     GNUNET_PQ_query_param_auto_from_type (&vhash),
340     GNUNET_PQ_query_param_fixed_size (data, size),
341     GNUNET_PQ_query_param_end
342   };
343
344   GNUNET_CRYPTO_hash (data, size, &vhash);
345   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
346                                  "put",
347                                  params);
348   if (GNUNET_OK !=
349       GNUNET_POSTGRES_check_result (plugin->dbh,
350                                     ret,
351                                     PGRES_COMMAND_OK,
352                                     "PQexecPrepared", "put"))
353   {
354     cont (cont_cls, key, size,
355           GNUNET_SYSERR,
356           _("Postgress exec failure"));
357     return;
358   }
359   PQclear (ret);
360   plugin->env->duc (plugin->env->cls,
361                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
362   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
363                    "datastore-postgres",
364                    "Stored %u bytes in database\n",
365                    (unsigned int) size);
366   cont (cont_cls, key, size, GNUNET_OK, NULL);
367 }
368
369
370 /**
371  * Function invoked to process the result and call the processor.
372  *
373  * @param plugin global plugin data
374  * @param proc function to call the value (once only).
375  * @param proc_cls closure for proc
376  * @param res result from exec
377  * @param filename filename for error messages
378  * @param line line number for error messages
379  */
380 static void
381 process_result (struct Plugin *plugin,
382                 PluginDatumProcessor proc,
383                 void *proc_cls,
384                 PGresult * res,
385                 const char *filename, int line)
386 {
387   int iret;
388   uint32_t rowid;
389   uint32_t utype;
390   uint32_t anonymity;
391   uint32_t priority;
392   size_t size;
393   void *data;
394   struct GNUNET_TIME_Absolute expiration_time;
395   struct GNUNET_HashCode key;
396   struct GNUNET_PQ_ResultSpec rs[] = {
397     GNUNET_PQ_result_spec_uint32 ("type", &utype),
398     GNUNET_PQ_result_spec_uint32 ("prio", &priority),
399     GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
400     GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
401     GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
402     GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
403     GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
404     GNUNET_PQ_result_spec_end
405   };
406
407   if (GNUNET_OK !=
408       GNUNET_POSTGRES_check_result_ (plugin->dbh,
409                                      res,
410                                      PGRES_TUPLES_OK,
411                                      "PQexecPrepared",
412                                      "select",
413                                      filename, line))
414   {
415     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
416                      "datastore-postgres",
417                      "Ending iteration (postgres error)\n");
418     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
419           GNUNET_TIME_UNIT_ZERO_ABS, 0);
420     return;
421   }
422
423   if (0 == PQntuples (res))
424   {
425     /* no result */
426     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
427                      "datastore-postgres",
428                      "Ending iteration (no more results)\n");
429     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
430           GNUNET_TIME_UNIT_ZERO_ABS, 0);
431     PQclear (res);
432     return;
433   }
434   if (1 != PQntuples (res))
435   {
436     GNUNET_break (0);
437     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
438           GNUNET_TIME_UNIT_ZERO_ABS, 0);
439     PQclear (res);
440     return;
441   }
442   if (GNUNET_OK !=
443       GNUNET_PQ_extract_result (res,
444                                 rs,
445                                 0))
446   {
447     GNUNET_break (0);
448     PQclear (res);
449     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
450                                      "delrow",
451                                      rowid);
452     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
453           GNUNET_TIME_UNIT_ZERO_ABS, 0);
454     return;
455   }
456
457   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
458                    "datastore-postgres",
459                    "Found result of size %u bytes and type %u in database\n",
460                    (unsigned int) size,
461                    (unsigned int) utype);
462   iret = proc (proc_cls,
463                &key,
464                size,
465                data,
466                (enum GNUNET_BLOCK_Type) utype,
467                priority,
468                anonymity,
469                expiration_time,
470                rowid);
471   PQclear (res);
472   if (iret == GNUNET_NO)
473   {
474     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475                 "Processor asked for item %u to be removed.\n",
476                 (unsigned int) rowid);
477     if (GNUNET_OK ==
478         GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
479                                          "delrow",
480                                          rowid))
481     {
482       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
483                        "datastore-postgres",
484                        "Deleting %u bytes from database\n",
485                        (unsigned int) size);
486       plugin->env->duc (plugin->env->cls,
487                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
488       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
489                        "datastore-postgres",
490                        "Deleted %u bytes from database\n",
491                        (unsigned int) size);
492     }
493   }
494 }
495
496
497 /**
498  * Iterate over the results for a particular key
499  * in the datastore.
500  *
501  * @param cls closure with the 'struct Plugin'
502  * @param offset offset of the result (modulo num-results);
503  *        specific ordering does not matter for the offset
504  * @param key maybe NULL (to match all entries)
505  * @param vhash hash of the value, maybe NULL (to
506  *        match all values that have the right key).
507  *        Note that for DBlocks there is no difference
508  *        betwen key and vhash, but for other blocks
509  *        there may be!
510  * @param type entries of which type are relevant?
511  *     Use 0 for any type.
512  * @param proc function to call on the matching value;
513  *        will be called once with a NULL if no value matches
514  * @param proc_cls closure for iter
515  */
516 static void
517 postgres_plugin_get_key (void *cls,
518                          uint64_t offset,
519                          const struct GNUNET_HashCode *key,
520                          const struct GNUNET_HashCode *vhash,
521                          enum GNUNET_BLOCK_Type type,
522                          PluginDatumProcessor proc,
523                          void *proc_cls)
524 {
525   struct Plugin *plugin = cls;
526   uint32_t utype = type;
527   PGresult *ret;
528   uint64_t total;
529   uint64_t limit_off;
530
531   if (0 != type)
532   {
533     if (NULL != vhash)
534     {
535       struct GNUNET_PQ_QueryParam params[] = {
536         GNUNET_PQ_query_param_auto_from_type (key),
537         GNUNET_PQ_query_param_auto_from_type (vhash),
538         GNUNET_PQ_query_param_uint32 (&utype),
539         GNUNET_PQ_query_param_end
540       };
541       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
542                                      "count_getvt",
543                                      params);
544     }
545     else
546     {
547       struct GNUNET_PQ_QueryParam params[] = {
548         GNUNET_PQ_query_param_auto_from_type (key),
549         GNUNET_PQ_query_param_uint32 (&utype),
550         GNUNET_PQ_query_param_end
551       };
552       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
553                                      "count_gett",
554                                      params);
555     }
556   }
557   else
558   {
559     if (NULL != vhash)
560     {
561       struct GNUNET_PQ_QueryParam params[] = {
562         GNUNET_PQ_query_param_auto_from_type (key),
563         GNUNET_PQ_query_param_auto_from_type (vhash),
564         GNUNET_PQ_query_param_end
565       };
566       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
567                                      "count_getv",
568                                      params);
569     }
570     else
571     {
572       struct GNUNET_PQ_QueryParam params[] = {
573         GNUNET_PQ_query_param_auto_from_type (key),
574         GNUNET_PQ_query_param_end
575       };
576       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
577                                      "count_get",
578                                      params);
579     }
580   }
581
582   if (GNUNET_OK !=
583       GNUNET_POSTGRES_check_result (plugin->dbh,
584                                     ret,
585                                     PGRES_TUPLES_OK,
586                                     "PQexecParams",
587                                     "count"))
588   {
589     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
590           GNUNET_TIME_UNIT_ZERO_ABS, 0);
591     return;
592   }
593   if ( (PQntuples (ret) != 1) ||
594        (PQnfields (ret) != 1) ||
595        (PQgetlength (ret, 0, 0) != sizeof (uint64_t)))
596   {
597     GNUNET_break (0);
598     PQclear (ret);
599     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
600           GNUNET_TIME_UNIT_ZERO_ABS, 0);
601     return;
602   }
603   total = GNUNET_ntohll (*(const uint64_t *) PQgetvalue (ret, 0, 0));
604   PQclear (ret);
605   if (0 == total)
606   {
607     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
608           GNUNET_TIME_UNIT_ZERO_ABS, 0);
609     return;
610   }
611   limit_off = offset % total;
612
613   if (0 != type)
614   {
615     if (NULL != vhash)
616     {
617       struct GNUNET_PQ_QueryParam params[] = {
618         GNUNET_PQ_query_param_auto_from_type (key),
619         GNUNET_PQ_query_param_auto_from_type (vhash),
620         GNUNET_PQ_query_param_uint32 (&utype),
621         GNUNET_PQ_query_param_uint64 (&limit_off),
622         GNUNET_PQ_query_param_end
623       };
624       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
625                                      "getvt",
626                                      params);
627     }
628     else
629     {
630       struct GNUNET_PQ_QueryParam params[] = {
631         GNUNET_PQ_query_param_auto_from_type (key),
632         GNUNET_PQ_query_param_uint32 (&utype),
633         GNUNET_PQ_query_param_uint64 (&limit_off),
634         GNUNET_PQ_query_param_end
635       };
636       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
637                                      "gett",
638                                      params);
639     }
640   }
641   else
642   {
643     if (NULL != vhash)
644     {
645       struct GNUNET_PQ_QueryParam params[] = {
646         GNUNET_PQ_query_param_auto_from_type (key),
647         GNUNET_PQ_query_param_auto_from_type (vhash),
648         GNUNET_PQ_query_param_uint64 (&limit_off),
649         GNUNET_PQ_query_param_end
650       };
651       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
652                                      "getv",
653                                      params);
654     }
655     else
656     {
657       struct GNUNET_PQ_QueryParam params[] = {
658         GNUNET_PQ_query_param_auto_from_type (key),
659         GNUNET_PQ_query_param_uint64 (&limit_off),
660         GNUNET_PQ_query_param_end
661       };
662       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
663                                      "get",
664                                      params);
665     }
666   }
667   process_result (plugin,
668                   proc,
669                   proc_cls,
670                   ret,
671                   __FILE__, __LINE__);
672 }
673
674
675 /**
676  * Select a subset of the items in the datastore and call
677  * the given iterator for each of them.
678  *
679  * @param cls our `struct Plugin *`
680  * @param offset offset of the result (modulo num-results);
681  *        specific ordering does not matter for the offset
682  * @param type entries of which type should be considered?
683  *        Use 0 for any type.
684  * @param proc function to call on the matching value;
685  *        will be called with a NULL if no value matches
686  * @param proc_cls closure for @a proc
687  */
688 static void
689 postgres_plugin_get_zero_anonymity (void *cls,
690                                     uint64_t offset,
691                                     enum GNUNET_BLOCK_Type type,
692                                     PluginDatumProcessor proc,
693                                     void *proc_cls)
694 {
695   struct Plugin *plugin = cls;
696   uint32_t utype = type;
697   struct GNUNET_PQ_QueryParam params[] = {
698     GNUNET_PQ_query_param_uint32 (&utype),
699     GNUNET_PQ_query_param_uint64 (&offset),
700     GNUNET_PQ_query_param_end
701   };
702   PGresult *ret;
703
704   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
705                                  "select_non_anonymous",
706                                  params);
707
708   process_result (plugin,
709                   proc, proc_cls,
710                   ret,
711                   __FILE__, __LINE__);
712 }
713
714
715 /**
716  * Context for #repl_iter() function.
717  */
718 struct ReplCtx
719 {
720
721   /**
722    * Plugin handle.
723    */
724   struct Plugin *plugin;
725
726   /**
727    * Function to call for the result (or the NULL).
728    */
729   PluginDatumProcessor proc;
730
731   /**
732    * Closure for @e proc.
733    */
734   void *proc_cls;
735 };
736
737
738 /**
739  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
740  * Decrements the replication counter and calls the original
741  * iterator.
742  *
743  * @param cls closure with the `struct ReplCtx *`
744  * @param key key for the content
745  * @param size number of bytes in @a data
746  * @param data content stored
747  * @param type type of the content
748  * @param priority priority of the content
749  * @param anonymity anonymity-level for the content
750  * @param expiration expiration time for the content
751  * @param uid unique identifier for the datum;
752  *        maybe 0 if no unique identifier is available
753  * @return #GNUNET_SYSERR to abort the iteration,
754  *         #GNUNET_OK to continue
755  *         (continue on call to "next", of course),
756  *         #GNUNET_NO to delete the item and continue (if supported)
757  */
758 static int
759 repl_proc (void *cls,
760            const struct GNUNET_HashCode *key,
761            uint32_t size,
762            const void *data,
763            enum GNUNET_BLOCK_Type type,
764            uint32_t priority,
765            uint32_t anonymity,
766            struct GNUNET_TIME_Absolute expiration,
767            uint64_t uid)
768 {
769   struct ReplCtx *rc = cls;
770   struct Plugin *plugin = rc->plugin;
771   int ret;
772   uint32_t oid = (uint32_t) uid;
773   struct GNUNET_PQ_QueryParam params[] = {
774     GNUNET_PQ_query_param_uint32 (&oid),
775     GNUNET_PQ_query_param_end
776   };
777   PGresult *qret;
778
779   ret = rc->proc (rc->proc_cls,
780                   key,
781                   size, data,
782                   type,
783                   priority,
784                   anonymity,
785                   expiration, uid);
786   if (NULL == key)
787     return ret;
788   qret = GNUNET_PQ_exec_prepared (plugin->dbh,
789                                   "decrepl",
790                                   params);
791   if (GNUNET_OK !=
792       GNUNET_POSTGRES_check_result (plugin->dbh,
793                                     qret,
794                                     PGRES_COMMAND_OK,
795                                     "PQexecPrepared",
796                                     "decrepl"))
797     return GNUNET_SYSERR;
798   PQclear (qret);
799   return ret;
800 }
801
802
803 /**
804  * Get a random item for replication.  Returns a single, not expired,
805  * random item from those with the highest replication counters.  The
806  * item's replication counter is decremented by one IF it was positive
807  * before.  Call @a proc with all values ZERO or NULL if the datastore
808  * is empty.
809  *
810  * @param cls closure with the `struct Plugin`
811  * @param proc function to call the value (once only).
812  * @param proc_cls closure for @a proc
813  */
814 static void
815 postgres_plugin_get_replication (void *cls,
816                                  PluginDatumProcessor proc,
817                                  void *proc_cls)
818 {
819   struct Plugin *plugin = cls;
820   struct ReplCtx rc;
821   PGresult *ret;
822
823   rc.plugin = plugin;
824   rc.proc = proc;
825   rc.proc_cls = proc_cls;
826   ret = PQexecPrepared (plugin->dbh,
827                         "select_replication_order", 0, NULL, NULL,
828                         NULL, 1);
829   process_result (plugin,
830                   &repl_proc,
831                   &rc,
832                   ret,
833                   __FILE__, __LINE__);
834 }
835
836
837 /**
838  * Get a random item for expiration.  Call @a proc with all values
839  * ZERO or NULL if the datastore is empty.
840  *
841  * @param cls closure with the `struct Plugin`
842  * @param proc function to call the value (once only).
843  * @param proc_cls closure for @a proc
844  */
845 static void
846 postgres_plugin_get_expiration (void *cls,
847                                 PluginDatumProcessor proc,
848                                 void *proc_cls)
849 {
850   struct Plugin *plugin = cls;
851   struct GNUNET_TIME_Absolute now;
852   struct GNUNET_PQ_QueryParam params[] = {
853     GNUNET_PQ_query_param_absolute_time (&now),
854     GNUNET_PQ_query_param_end
855   };
856   PGresult *ret;
857
858   now = GNUNET_TIME_absolute_get ();
859   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
860                                  "select_expiration_order",
861                                  params);
862   process_result (plugin,
863                   proc, proc_cls,
864                   ret,
865                   __FILE__, __LINE__);
866 }
867
868
869 /**
870  * Update the priority for a particular key in the datastore.  If
871  * the expiration time in value is different than the time found in
872  * the datastore, the higher value should be kept.  For the
873  * anonymity level, the lower value is to be used.  The specified
874  * priority should be added to the existing priority, ignoring the
875  * priority in value.
876  *
877  * Note that it is possible for multiple values to match this put.
878  * In that case, all of the respective values are updated.
879  *
880  * @param cls our `struct Plugin *`
881  * @param uid unique identifier of the datum
882  * @param delta by how much should the priority
883  *     change?
884  * @param expire new expiration time should be the
885  *     MAX of any existing expiration time and
886  *     this value
887  * @param cont continuation called with success or failure status
888  * @param cons_cls continuation closure
889  */
890 static void
891 postgres_plugin_update (void *cls,
892                         uint64_t uid,
893                         uint32_t delta,
894                         struct GNUNET_TIME_Absolute expire,
895                         PluginUpdateCont cont,
896                         void *cont_cls)
897 {
898   struct Plugin *plugin = cls;
899   uint32_t oid = (uint32_t) uid;
900   struct GNUNET_PQ_QueryParam params[] = {
901     GNUNET_PQ_query_param_uint32 (&delta),
902     GNUNET_PQ_query_param_absolute_time (&expire),
903     GNUNET_PQ_query_param_uint32 (&oid),
904     GNUNET_PQ_query_param_end
905   };
906   PGresult *ret;
907
908   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
909                                  "update",
910                                  params);
911   if (GNUNET_OK !=
912       GNUNET_POSTGRES_check_result (plugin->dbh,
913                                     ret,
914                                     PGRES_COMMAND_OK,
915                                     "PQexecPrepared",
916                                     "update"))
917   {
918     cont (cont_cls,
919           GNUNET_SYSERR,
920           NULL);
921     return;
922   }
923   PQclear (ret);
924   cont (cont_cls,
925         GNUNET_OK,
926         NULL);
927 }
928
929
930 /**
931  * Get all of the keys in the datastore.
932  *
933  * @param cls closure with the `struct Plugin *`
934  * @param proc function to call on each key
935  * @param proc_cls closure for @a proc
936  */
937 static void
938 postgres_plugin_get_keys (void *cls,
939                           PluginKeyProcessor proc,
940                           void *proc_cls)
941 {
942   struct Plugin *plugin = cls;
943   int ret;
944   int i;
945   struct GNUNET_HashCode key;
946   PGresult * res;
947
948   res = PQexecPrepared (plugin->dbh,
949                         "get_keys",
950                         0, NULL, NULL, NULL, 1);
951   ret = PQntuples (res);
952   for (i=0;i<ret;i++)
953   {
954     if (sizeof (struct GNUNET_HashCode) !=
955         PQgetlength (res, i, 0))
956     {
957       GNUNET_memcpy (&key,
958               PQgetvalue (res, i, 0),
959               sizeof (struct GNUNET_HashCode));
960       proc (proc_cls, &key, 1);
961     }
962   }
963   PQclear (res);
964   proc (proc_cls, NULL, 0);
965 }
966
967
968 /**
969  * Drop database.
970  *
971  * @param cls closure with the `struct Plugin *`
972  */
973 static void
974 postgres_plugin_drop (void *cls)
975 {
976   struct Plugin *plugin = cls;
977
978   if (GNUNET_OK !=
979       GNUNET_POSTGRES_exec (plugin->dbh,
980                             "DROP TABLE gn090"))
981     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
982                      "postgres",
983                      _("Failed to drop table from database.\n"));
984 }
985
986
987 /**
988  * Entry point for the plugin.
989  *
990  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
991  * @return our `struct Plugin *`
992  */
993 void *
994 libgnunet_plugin_datastore_postgres_init (void *cls)
995 {
996   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
997   struct GNUNET_DATASTORE_PluginFunctions *api;
998   struct Plugin *plugin;
999
1000   plugin = GNUNET_new (struct Plugin);
1001   plugin->env = env;
1002   if (GNUNET_OK != init_connection (plugin))
1003   {
1004     GNUNET_free (plugin);
1005     return NULL;
1006   }
1007   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1008   api->cls = plugin;
1009   api->estimate_size = &postgres_plugin_estimate_size;
1010   api->put = &postgres_plugin_put;
1011   api->update = &postgres_plugin_update;
1012   api->get_key = &postgres_plugin_get_key;
1013   api->get_replication = &postgres_plugin_get_replication;
1014   api->get_expiration = &postgres_plugin_get_expiration;
1015   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1016   api->get_keys = &postgres_plugin_get_keys;
1017   api->drop = &postgres_plugin_drop;
1018   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1019                    "datastore-postgres",
1020                    _("Postgres database running\n"));
1021   return api;
1022 }
1023
1024
1025 /**
1026  * Exit point from the plugin.
1027  *
1028  * @param cls our `struct Plugin *`
1029  * @return always NULL
1030  */
1031 void *
1032 libgnunet_plugin_datastore_postgres_done (void *cls)
1033 {
1034   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1035   struct Plugin *plugin = api->cls;
1036
1037   PQfinish (plugin->dbh);
1038   GNUNET_free (plugin);
1039   GNUNET_free (api);
1040   return NULL;
1041 }
1042
1043 /* end of plugin_datastore_postgres.c */