fix #4546
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include "gnunet_pq_lib.h"
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   PGresult *ret;
74
75   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
76   if (NULL == plugin->dbh)
77     return GNUNET_SYSERR;
78
79   ret =
80       PQexec (plugin->dbh,
81               "CREATE TABLE gn090 ("
82               "  repl INTEGER NOT NULL DEFAULT 0,"
83               "  type INTEGER NOT NULL DEFAULT 0,"
84               "  prio INTEGER NOT NULL DEFAULT 0,"
85               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
86               "  expire BIGINT NOT NULL DEFAULT 0,"
87               "  rvalue BIGINT NOT NULL DEFAULT 0,"
88               "  hash BYTEA NOT NULL DEFAULT '',"
89               "  vhash BYTEA NOT NULL DEFAULT '',"
90               "  value BYTEA NOT NULL DEFAULT '')"
91               "WITH OIDS");
92   if ( (NULL == ret) ||
93        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
94         (0 != strcmp ("42P07",    /* duplicate table */
95                       PQresultErrorField
96                       (ret,
97                        PG_DIAG_SQLSTATE)))))
98   {
99     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
100                                          ret,
101                                          PGRES_COMMAND_OK,
102                                          "CREATE TABLE",
103                                          "gn090");
104     PQfinish (plugin->dbh);
105     plugin->dbh = NULL;
106     return GNUNET_SYSERR;
107   }
108
109   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
110   {
111     if ((GNUNET_OK !=
112          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
113         (GNUNET_OK !=
114          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
115         (GNUNET_OK !=
116          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
117         (GNUNET_OK !=
118          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
119         (GNUNET_OK !=
120          GNUNET_POSTGRES_exec (plugin->dbh,
121                                "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
122         (GNUNET_OK !=
123          GNUNET_POSTGRES_exec (plugin->dbh,
124                                "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
125         (GNUNET_OK !=
126          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
127         (GNUNET_OK !=
128          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
129     {
130       PQclear (ret);
131       PQfinish (plugin->dbh);
132       plugin->dbh = NULL;
133       return GNUNET_SYSERR;
134     }
135   }
136   PQclear (ret);
137
138   ret =
139       PQexec (plugin->dbh,
140               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
141   if (GNUNET_OK !=
142       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
143   {
144     PQfinish (plugin->dbh);
145     plugin->dbh = NULL;
146     return GNUNET_SYSERR;
147   }
148   PQclear (ret);
149   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
150   if (GNUNET_OK !=
151       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
152   {
153     PQfinish (plugin->dbh);
154     plugin->dbh = NULL;
155     return GNUNET_SYSERR;
156   }
157   PQclear (ret);
158   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
159   if (GNUNET_OK !=
160       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
161   {
162     PQfinish (plugin->dbh);
163     plugin->dbh = NULL;
164     return GNUNET_SYSERR;
165   }
166   PQclear (ret);
167   if ((GNUNET_OK !=
168        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
169                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
170                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
171                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
172       (GNUNET_OK !=
173        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
174                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
175                    "WHERE hash=$1 AND type=$2 "
176                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
177       (GNUNET_OK !=
178        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
179                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
180                    "WHERE hash=$1 AND vhash=$2 "
181                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
182       (GNUNET_OK !=
183        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
184                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
185                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
186       (GNUNET_OK !=
187        GNUNET_POSTGRES_prepare (plugin->dbh, "count_getvt",
188                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3)) ||
189       (GNUNET_OK !=
190        GNUNET_POSTGRES_prepare (plugin->dbh, "count_gett",
191                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2)) ||
192       (GNUNET_OK !=
193        GNUNET_POSTGRES_prepare (plugin->dbh, "count_getv",
194                                 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2)) ||
195       (GNUNET_OK !=
196        GNUNET_POSTGRES_prepare (plugin->dbh, "count_get",
197                                 "SELECT count(*) FROM gn090 WHERE hash=$1", 1)) ||
198       (GNUNET_OK !=
199        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
200                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
201                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
202       (GNUNET_OK !=
203        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
204                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
205                    "WHERE oid = $3", 3)) ||
206       (GNUNET_OK !=
207        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
208                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
209                    "WHERE oid = $1", 1)) ||
210       (GNUNET_OK !=
211        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
212                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
213                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
214                    1)) ||
215       (GNUNET_OK !=
216        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
217                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
218                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
219                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
220                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
221                    1)) ||
222       (GNUNET_OK !=
223        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
224                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
225                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
226       (GNUNET_OK !=
227        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
228       (GNUNET_OK !=
229        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
230   {
231     PQfinish (plugin->dbh);
232     plugin->dbh = NULL;
233     return GNUNET_SYSERR;
234   }
235   return GNUNET_OK;
236 }
237
238
239 /**
240  * Get an estimate of how much space the database is
241  * currently using.
242  *
243  * @param cls our `struct Plugin *`
244  * @return number of bytes used on disk
245  */
246 static void
247 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
248 {
249   struct Plugin *plugin = cls;
250   unsigned long long total;
251   PGresult *ret;
252
253   if (NULL == estimate)
254     return;
255   ret =
256       PQexecParams (plugin->dbh,
257                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
258                     NULL, NULL, NULL, NULL, 1);
259   if (GNUNET_OK !=
260       GNUNET_POSTGRES_check_result (plugin->dbh,
261                                     ret,
262                                     PGRES_TUPLES_OK,
263                                     "PQexecParams",
264                                     "get_size"))
265   {
266     *estimate = 0;
267     return;
268   }
269   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
270   {
271     GNUNET_break (0);
272     PQclear (ret);
273     *estimate = 0;
274     return;
275   }
276   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
277   {
278     GNUNET_break (0 == PQgetlength (ret, 0, 0));
279     PQclear (ret);
280     *estimate = 0;
281     return;
282   }
283   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
284   PQclear (ret);
285   *estimate = total;
286 }
287
288
289 /**
290  * Store an item in the datastore.
291  *
292  * @param cls closure with the `struct Plugin`
293  * @param key key for the item
294  * @param size number of bytes in data
295  * @param data content stored
296  * @param type type of the content
297  * @param priority priority of the content
298  * @param anonymity anonymity-level for the content
299  * @param replication replication-level for the content
300  * @param expiration expiration time for the content
301  * @param cont continuation called with success or failure status
302  * @param cont_cls continuation closure
303  */
304 static void
305 postgres_plugin_put (void *cls,
306                      const struct GNUNET_HashCode *key,
307                      uint32_t size,
308                      const void *data,
309                      enum GNUNET_BLOCK_Type type,
310                      uint32_t priority,
311                      uint32_t anonymity,
312                      uint32_t replication,
313                      struct GNUNET_TIME_Absolute expiration,
314                      PluginPutCont cont,
315                      void *cont_cls)
316 {
317   struct Plugin *plugin = cls;
318   uint32_t utype = type;
319   struct GNUNET_HashCode vhash;
320   PGresult *ret;
321   struct GNUNET_PQ_QueryParam params[] = {
322     GNUNET_PQ_query_param_uint32 (&replication),
323     GNUNET_PQ_query_param_uint32 (&utype),
324     GNUNET_PQ_query_param_uint32 (&priority),
325     GNUNET_PQ_query_param_uint32 (&anonymity),
326     GNUNET_PQ_query_param_absolute_time (&expiration),
327     GNUNET_PQ_query_param_auto_from_type (key),
328     GNUNET_PQ_query_param_auto_from_type (&vhash),
329     GNUNET_PQ_query_param_fixed_size (data, size),
330     GNUNET_PQ_query_param_end
331   };
332
333   GNUNET_CRYPTO_hash (data, size, &vhash);
334   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
335                                  "put",
336                                  params);
337   if (GNUNET_OK !=
338       GNUNET_POSTGRES_check_result (plugin->dbh,
339                                     ret,
340                                     PGRES_COMMAND_OK,
341                                     "PQexecPrepared", "put"))
342   {
343     cont (cont_cls, key, size,
344           GNUNET_SYSERR,
345           _("Postgress exec failure"));
346     return;
347   }
348   PQclear (ret);
349   plugin->env->duc (plugin->env->cls,
350                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
351   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
352                    "datastore-postgres",
353                    "Stored %u bytes in database\n",
354                    (unsigned int) size);
355   cont (cont_cls, key, size, GNUNET_OK, NULL);
356 }
357
358
359 /**
360  * Function invoked to process the result and call the processor.
361  *
362  * @param plugin global plugin data
363  * @param proc function to call the value (once only).
364  * @param proc_cls closure for proc
365  * @param res result from exec
366  * @param filename filename for error messages
367  * @param line line number for error messages
368  */
369 static void
370 process_result (struct Plugin *plugin,
371                 PluginDatumProcessor proc,
372                 void *proc_cls,
373                 PGresult * res,
374                 const char *filename, int line)
375 {
376   int iret;
377   uint32_t rowid;
378   uint32_t utype;
379   uint32_t anonymity;
380   uint32_t priority;
381   size_t size;
382   void *data;
383   struct GNUNET_TIME_Absolute expiration_time;
384   struct GNUNET_HashCode key;
385   struct GNUNET_PQ_ResultSpec rs[] = {
386     GNUNET_PQ_result_spec_uint32 ("type", &utype),
387     GNUNET_PQ_result_spec_uint32 ("prio", &priority),
388     GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
389     GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
390     GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
391     GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
392     GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
393     GNUNET_PQ_result_spec_end
394   };
395
396   if (GNUNET_OK !=
397       GNUNET_POSTGRES_check_result_ (plugin->dbh,
398                                      res,
399                                      PGRES_TUPLES_OK,
400                                      "PQexecPrepared",
401                                      "select",
402                                      filename, line))
403   {
404     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
405                      "datastore-postgres",
406                      "Ending iteration (postgres error)\n");
407     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
408           GNUNET_TIME_UNIT_ZERO_ABS, 0);
409     return;
410   }
411
412   if (0 == PQntuples (res))
413   {
414     /* no result */
415     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
416                      "datastore-postgres",
417                      "Ending iteration (no more results)\n");
418     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
419           GNUNET_TIME_UNIT_ZERO_ABS, 0);
420     PQclear (res);
421     return;
422   }
423   if (1 != PQntuples (res))
424   {
425     GNUNET_break (0);
426     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
427           GNUNET_TIME_UNIT_ZERO_ABS, 0);
428     PQclear (res);
429     return;
430   }
431   if (GNUNET_OK !=
432       GNUNET_PQ_extract_result (res,
433                                 rs,
434                                 0))
435   {
436     GNUNET_break (0);
437     PQclear (res);
438     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
439                                      "delrow",
440                                      rowid);
441     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
442           GNUNET_TIME_UNIT_ZERO_ABS, 0);
443     return;
444   }
445
446   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
447                    "datastore-postgres",
448                    "Found result of size %u bytes and type %u in database\n",
449                    (unsigned int) size,
450                    (unsigned int) utype);
451   iret = proc (proc_cls,
452                &key,
453                size,
454                data,
455                (enum GNUNET_BLOCK_Type) utype,
456                priority,
457                anonymity,
458                expiration_time,
459                rowid);
460   PQclear (res);
461   if (iret == GNUNET_NO)
462   {
463     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464                 "Processor asked for item %u to be removed.\n",
465                 (unsigned int) rowid);
466     if (GNUNET_OK ==
467         GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
468                                          "delrow",
469                                          rowid))
470     {
471       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
472                        "datastore-postgres",
473                        "Deleting %u bytes from database\n",
474                        (unsigned int) size);
475       plugin->env->duc (plugin->env->cls,
476                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
477       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
478                        "datastore-postgres",
479                        "Deleted %u bytes from database\n",
480                        (unsigned int) size);
481     }
482   }
483 }
484
485
486 /**
487  * Iterate over the results for a particular key
488  * in the datastore.
489  *
490  * @param cls closure with the 'struct Plugin'
491  * @param offset offset of the result (modulo num-results);
492  *        specific ordering does not matter for the offset
493  * @param key maybe NULL (to match all entries)
494  * @param vhash hash of the value, maybe NULL (to
495  *        match all values that have the right key).
496  *        Note that for DBlocks there is no difference
497  *        betwen key and vhash, but for other blocks
498  *        there may be!
499  * @param type entries of which type are relevant?
500  *     Use 0 for any type.
501  * @param proc function to call on the matching value;
502  *        will be called once with a NULL if no value matches
503  * @param proc_cls closure for iter
504  */
505 static void
506 postgres_plugin_get_key (void *cls,
507                          uint64_t offset,
508                          const struct GNUNET_HashCode *key,
509                          const struct GNUNET_HashCode *vhash,
510                          enum GNUNET_BLOCK_Type type,
511                          PluginDatumProcessor proc,
512                          void *proc_cls)
513 {
514   struct Plugin *plugin = cls;
515   uint32_t utype = type;
516   PGresult *ret;
517   uint64_t total;
518   uint64_t limit_off;
519
520   if (0 != type)
521   {
522     if (NULL != vhash)
523     {
524       struct GNUNET_PQ_QueryParam params[] = {
525         GNUNET_PQ_query_param_auto_from_type (key),
526         GNUNET_PQ_query_param_auto_from_type (vhash),
527         GNUNET_PQ_query_param_uint32 (&utype),
528         GNUNET_PQ_query_param_end
529       };
530       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
531                                      "count_getvt",
532                                      params);
533     }
534     else
535     {
536       struct GNUNET_PQ_QueryParam params[] = {
537         GNUNET_PQ_query_param_auto_from_type (key),
538         GNUNET_PQ_query_param_uint32 (&utype),
539         GNUNET_PQ_query_param_end
540       };
541       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
542                                      "count_gett",
543                                      params);
544     }
545   }
546   else
547   {
548     if (NULL != vhash)
549     {
550       struct GNUNET_PQ_QueryParam params[] = {
551         GNUNET_PQ_query_param_auto_from_type (key),
552         GNUNET_PQ_query_param_auto_from_type (vhash),
553         GNUNET_PQ_query_param_end
554       };
555       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
556                                      "count_getv",
557                                      params);
558     }
559     else
560     {
561       struct GNUNET_PQ_QueryParam params[] = {
562         GNUNET_PQ_query_param_auto_from_type (key),
563         GNUNET_PQ_query_param_end
564       };
565       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
566                                      "count_get",
567                                      params);
568     }
569   }
570
571   if (GNUNET_OK !=
572       GNUNET_POSTGRES_check_result (plugin->dbh,
573                                     ret,
574                                     PGRES_TUPLES_OK,
575                                     "PQexecParams",
576                                     "count"))
577   {
578     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
579           GNUNET_TIME_UNIT_ZERO_ABS, 0);
580     return;
581   }
582   if ( (PQntuples (ret) != 1) ||
583        (PQnfields (ret) != 1) ||
584        (PQgetlength (ret, 0, 0) != sizeof (uint64_t)))
585   {
586     GNUNET_break (0);
587     PQclear (ret);
588     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
589           GNUNET_TIME_UNIT_ZERO_ABS, 0);
590     return;
591   }
592   total = GNUNET_ntohll (*(const uint64_t *) PQgetvalue (ret, 0, 0));
593   PQclear (ret);
594   if (0 == total)
595   {
596     proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
597           GNUNET_TIME_UNIT_ZERO_ABS, 0);
598     return;
599   }
600   limit_off = offset % total;
601
602   if (0 != type)
603   {
604     if (NULL != vhash)
605     {
606       struct GNUNET_PQ_QueryParam params[] = {
607         GNUNET_PQ_query_param_auto_from_type (key),
608         GNUNET_PQ_query_param_auto_from_type (vhash),
609         GNUNET_PQ_query_param_uint32 (&utype),
610         GNUNET_PQ_query_param_uint64 (&limit_off),
611         GNUNET_PQ_query_param_end
612       };
613       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
614                                      "getvt",
615                                      params);
616     }
617     else
618     {
619       struct GNUNET_PQ_QueryParam params[] = {
620         GNUNET_PQ_query_param_auto_from_type (key),
621         GNUNET_PQ_query_param_uint32 (&utype),
622         GNUNET_PQ_query_param_uint64 (&limit_off),
623         GNUNET_PQ_query_param_end
624       };
625       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
626                                      "gett",
627                                      params);
628     }
629   }
630   else
631   {
632     if (NULL != vhash)
633     {
634       struct GNUNET_PQ_QueryParam params[] = {
635         GNUNET_PQ_query_param_auto_from_type (key),
636         GNUNET_PQ_query_param_auto_from_type (vhash),
637         GNUNET_PQ_query_param_uint64 (&limit_off),
638         GNUNET_PQ_query_param_end
639       };
640       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
641                                      "getv",
642                                      params);
643     }
644     else
645     {
646       struct GNUNET_PQ_QueryParam params[] = {
647         GNUNET_PQ_query_param_auto_from_type (key),
648         GNUNET_PQ_query_param_uint64 (&limit_off),
649         GNUNET_PQ_query_param_end
650       };
651       ret = GNUNET_PQ_exec_prepared (plugin->dbh,
652                                      "get",
653                                      params);
654     }
655   }
656   process_result (plugin,
657                   proc,
658                   proc_cls,
659                   ret,
660                   __FILE__, __LINE__);
661 }
662
663
664 /**
665  * Select a subset of the items in the datastore and call
666  * the given iterator for each of them.
667  *
668  * @param cls our `struct Plugin *`
669  * @param offset offset of the result (modulo num-results);
670  *        specific ordering does not matter for the offset
671  * @param type entries of which type should be considered?
672  *        Use 0 for any type.
673  * @param proc function to call on the matching value;
674  *        will be called with a NULL if no value matches
675  * @param proc_cls closure for @a proc
676  */
677 static void
678 postgres_plugin_get_zero_anonymity (void *cls,
679                                     uint64_t offset,
680                                     enum GNUNET_BLOCK_Type type,
681                                     PluginDatumProcessor proc,
682                                     void *proc_cls)
683 {
684   struct Plugin *plugin = cls;
685   uint32_t utype = type;
686   struct GNUNET_PQ_QueryParam params[] = {
687     GNUNET_PQ_query_param_uint32 (&utype),
688     GNUNET_PQ_query_param_uint64 (&offset),
689     GNUNET_PQ_query_param_end
690   };
691   PGresult *ret;
692
693   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
694                                  "select_non_anonymous",
695                                  params);
696
697   process_result (plugin,
698                   proc, proc_cls,
699                   ret,
700                   __FILE__, __LINE__);
701 }
702
703
704 /**
705  * Context for #repl_iter() function.
706  */
707 struct ReplCtx
708 {
709
710   /**
711    * Plugin handle.
712    */
713   struct Plugin *plugin;
714
715   /**
716    * Function to call for the result (or the NULL).
717    */
718   PluginDatumProcessor proc;
719
720   /**
721    * Closure for @e proc.
722    */
723   void *proc_cls;
724 };
725
726
727 /**
728  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
729  * Decrements the replication counter and calls the original
730  * iterator.
731  *
732  * @param cls closure with the `struct ReplCtx *`
733  * @param key key for the content
734  * @param size number of bytes in @a data
735  * @param data content stored
736  * @param type type of the content
737  * @param priority priority of the content
738  * @param anonymity anonymity-level for the content
739  * @param expiration expiration time for the content
740  * @param uid unique identifier for the datum;
741  *        maybe 0 if no unique identifier is available
742  * @return #GNUNET_SYSERR to abort the iteration,
743  *         #GNUNET_OK to continue
744  *         (continue on call to "next", of course),
745  *         #GNUNET_NO to delete the item and continue (if supported)
746  */
747 static int
748 repl_proc (void *cls,
749            const struct GNUNET_HashCode *key,
750            uint32_t size,
751            const void *data,
752            enum GNUNET_BLOCK_Type type,
753            uint32_t priority,
754            uint32_t anonymity,
755            struct GNUNET_TIME_Absolute expiration,
756            uint64_t uid)
757 {
758   struct ReplCtx *rc = cls;
759   struct Plugin *plugin = rc->plugin;
760   int ret;
761   uint32_t oid = (uint32_t) uid;
762   struct GNUNET_PQ_QueryParam params[] = {
763     GNUNET_PQ_query_param_uint32 (&oid),
764     GNUNET_PQ_query_param_end
765   };
766   PGresult *qret;
767
768   ret = rc->proc (rc->proc_cls,
769                   key,
770                   size, data,
771                   type,
772                   priority,
773                   anonymity,
774                   expiration, uid);
775   if (NULL == key)
776     return ret;
777   qret = GNUNET_PQ_exec_prepared (plugin->dbh,
778                                   "decrepl",
779                                   params);
780   if (GNUNET_OK !=
781       GNUNET_POSTGRES_check_result (plugin->dbh,
782                                     qret,
783                                     PGRES_COMMAND_OK,
784                                     "PQexecPrepared",
785                                     "decrepl"))
786     return GNUNET_SYSERR;
787   PQclear (qret);
788   return ret;
789 }
790
791
792 /**
793  * Get a random item for replication.  Returns a single, not expired,
794  * random item from those with the highest replication counters.  The
795  * item's replication counter is decremented by one IF it was positive
796  * before.  Call @a proc with all values ZERO or NULL if the datastore
797  * is empty.
798  *
799  * @param cls closure with the `struct Plugin`
800  * @param proc function to call the value (once only).
801  * @param proc_cls closure for @a proc
802  */
803 static void
804 postgres_plugin_get_replication (void *cls,
805                                  PluginDatumProcessor proc,
806                                  void *proc_cls)
807 {
808   struct Plugin *plugin = cls;
809   struct ReplCtx rc;
810   PGresult *ret;
811
812   rc.plugin = plugin;
813   rc.proc = proc;
814   rc.proc_cls = proc_cls;
815   ret = PQexecPrepared (plugin->dbh,
816                         "select_replication_order", 0, NULL, NULL,
817                         NULL, 1);
818   process_result (plugin,
819                   &repl_proc,
820                   &rc,
821                   ret,
822                   __FILE__, __LINE__);
823 }
824
825
826 /**
827  * Get a random item for expiration.  Call @a proc with all values
828  * ZERO or NULL if the datastore is empty.
829  *
830  * @param cls closure with the `struct Plugin`
831  * @param proc function to call the value (once only).
832  * @param proc_cls closure for @a proc
833  */
834 static void
835 postgres_plugin_get_expiration (void *cls,
836                                 PluginDatumProcessor proc,
837                                 void *proc_cls)
838 {
839   struct Plugin *plugin = cls;
840   struct GNUNET_TIME_Absolute now;
841   struct GNUNET_PQ_QueryParam params[] = {
842     GNUNET_PQ_query_param_absolute_time (&now),
843     GNUNET_PQ_query_param_end
844   };
845   PGresult *ret;
846
847   now = GNUNET_TIME_absolute_get ();
848   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
849                                  "select_expiration_order",
850                                  params);
851   process_result (plugin,
852                   proc, proc_cls,
853                   ret,
854                   __FILE__, __LINE__);
855 }
856
857
858 /**
859  * Update the priority for a particular key in the datastore.  If
860  * the expiration time in value is different than the time found in
861  * the datastore, the higher value should be kept.  For the
862  * anonymity level, the lower value is to be used.  The specified
863  * priority should be added to the existing priority, ignoring the
864  * priority in value.
865  *
866  * Note that it is possible for multiple values to match this put.
867  * In that case, all of the respective values are updated.
868  *
869  * @param cls our `struct Plugin *`
870  * @param uid unique identifier of the datum
871  * @param delta by how much should the priority
872  *     change?  If priority + delta < 0 the
873  *     priority should be set to 0 (never go
874  *     negative).
875  * @param expire new expiration time should be the
876  *     MAX of any existing expiration time and
877  *     this value
878  * @param cont continuation called with success or failure status
879  * @param cons_cls continuation closure
880  */
881 static void
882 postgres_plugin_update (void *cls,
883                         uint64_t uid,
884                         int delta,
885                         struct GNUNET_TIME_Absolute expire,
886                         PluginUpdateCont cont,
887                         void *cont_cls)
888 {
889   struct Plugin *plugin = cls;
890   uint32_t idelta = delta;
891   uint32_t oid = (uint32_t) uid;
892   struct GNUNET_PQ_QueryParam params[] = {
893     GNUNET_PQ_query_param_uint32 (&idelta),
894     GNUNET_PQ_query_param_absolute_time (&expire),
895     GNUNET_PQ_query_param_uint32 (&oid),
896     GNUNET_PQ_query_param_end
897   };
898   PGresult *ret;
899
900   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
901                                  "update",
902                                  params);
903   if (GNUNET_OK !=
904       GNUNET_POSTGRES_check_result (plugin->dbh,
905                                     ret,
906                                     PGRES_COMMAND_OK,
907                                     "PQexecPrepared",
908                                     "update"))
909   {
910     cont (cont_cls,
911           GNUNET_SYSERR,
912           NULL);
913     return;
914   }
915   PQclear (ret);
916   cont (cont_cls,
917         GNUNET_OK,
918         NULL);
919 }
920
921
922 /**
923  * Get all of the keys in the datastore.
924  *
925  * @param cls closure with the `struct Plugin *`
926  * @param proc function to call on each key
927  * @param proc_cls closure for @a proc
928  */
929 static void
930 postgres_plugin_get_keys (void *cls,
931                           PluginKeyProcessor proc,
932                           void *proc_cls)
933 {
934   struct Plugin *plugin = cls;
935   int ret;
936   int i;
937   struct GNUNET_HashCode key;
938   PGresult * res;
939
940   res = PQexecPrepared (plugin->dbh,
941                         "get_keys",
942                         0, NULL, NULL, NULL, 1);
943   ret = PQntuples (res);
944   for (i=0;i<ret;i++)
945   {
946     if (sizeof (struct GNUNET_HashCode) !=
947         PQgetlength (res, i, 0))
948     {
949       memcpy (&key,
950               PQgetvalue (res, i, 0),
951               sizeof (struct GNUNET_HashCode));
952       proc (proc_cls, &key, 1);
953     }
954   }
955   PQclear (res);
956   proc (proc_cls, NULL, 0);
957 }
958
959
960 /**
961  * Drop database.
962  *
963  * @param cls closure with the `struct Plugin *`
964  */
965 static void
966 postgres_plugin_drop (void *cls)
967 {
968   struct Plugin *plugin = cls;
969
970   if (GNUNET_OK !=
971       GNUNET_POSTGRES_exec (plugin->dbh,
972                             "DROP TABLE gn090"))
973     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
974                      "postgres",
975                      _("Failed to drop table from database.\n"));
976 }
977
978
979 /**
980  * Entry point for the plugin.
981  *
982  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
983  * @return our `struct Plugin *`
984  */
985 void *
986 libgnunet_plugin_datastore_postgres_init (void *cls)
987 {
988   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
989   struct GNUNET_DATASTORE_PluginFunctions *api;
990   struct Plugin *plugin;
991
992   plugin = GNUNET_new (struct Plugin);
993   plugin->env = env;
994   if (GNUNET_OK != init_connection (plugin))
995   {
996     GNUNET_free (plugin);
997     return NULL;
998   }
999   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1000   api->cls = plugin;
1001   api->estimate_size = &postgres_plugin_estimate_size;
1002   api->put = &postgres_plugin_put;
1003   api->update = &postgres_plugin_update;
1004   api->get_key = &postgres_plugin_get_key;
1005   api->get_replication = &postgres_plugin_get_replication;
1006   api->get_expiration = &postgres_plugin_get_expiration;
1007   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1008   api->get_keys = &postgres_plugin_get_keys;
1009   api->drop = &postgres_plugin_drop;
1010   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1011                    "datastore-postgres",
1012                    _("Postgres database running\n"));
1013   return api;
1014 }
1015
1016
1017 /**
1018  * Exit point from the plugin.
1019  *
1020  * @param cls our `struct Plugin *`
1021  * @return always NULL
1022  */
1023 void *
1024 libgnunet_plugin_datastore_postgres_done (void *cls)
1025 {
1026   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1027   struct Plugin *plugin = api->cls;
1028
1029   PQfinish (plugin->dbh);
1030   GNUNET_free (plugin);
1031   GNUNET_free (api);
1032   return NULL;
1033 }
1034
1035 /* end of plugin_datastore_postgres.c */