Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include "gnunet_pq_lib.h"
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   PGresult *ret;
74
75   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
76   if (NULL == plugin->dbh)
77     return GNUNET_SYSERR;
78
79   /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
80    * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
81    * we do math or inequality tests, so we can't handle the entire range of uint32_t.
82    * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
83    * PostgreSQL also recommends against using WITH OIDS.
84    */
85   ret =
86       PQexec (plugin->dbh,
87               "CREATE TABLE IF NOT EXISTS gn090 ("
88               "  repl INTEGER NOT NULL DEFAULT 0,"
89               "  type INTEGER NOT NULL DEFAULT 0,"
90               "  prio INTEGER NOT NULL DEFAULT 0,"
91               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
92               "  expire BIGINT NOT NULL DEFAULT 0,"
93               "  rvalue BIGINT NOT NULL DEFAULT 0,"
94               "  hash BYTEA NOT NULL DEFAULT '',"
95               "  vhash BYTEA NOT NULL DEFAULT '',"
96               "  value BYTEA NOT NULL DEFAULT '')"
97               "WITH OIDS");
98   if ( (NULL == ret) ||
99        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
100         (0 != strcmp ("42P07",    /* duplicate table */
101                       PQresultErrorField
102                       (ret,
103                        PG_DIAG_SQLSTATE)))))
104   {
105     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
106                                          ret,
107                                          PGRES_COMMAND_OK,
108                                          "CREATE TABLE",
109                                          "gn090");
110     PQfinish (plugin->dbh);
111     plugin->dbh = NULL;
112     return GNUNET_SYSERR;
113   }
114
115   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
116   {
117     if ((GNUNET_OK !=
118          GNUNET_POSTGRES_exec (plugin->dbh,
119                                "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)")) ||
120         (GNUNET_OK !=
121          GNUNET_POSTGRES_exec (plugin->dbh,
122                                "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)")) ||
123         (GNUNET_OK !=
124          GNUNET_POSTGRES_exec (plugin->dbh,
125                                "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)")) ||
126         (GNUNET_OK !=
127          GNUNET_POSTGRES_exec (plugin->dbh,
128                                "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)")) ||
129         (GNUNET_OK !=
130          GNUNET_POSTGRES_exec (plugin->dbh,
131                                "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
132         (GNUNET_OK !=
133          GNUNET_POSTGRES_exec (plugin->dbh,
134                                "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
135         (GNUNET_OK !=
136          GNUNET_POSTGRES_exec (plugin->dbh,
137                                "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)")))
138     {
139       PQclear (ret);
140       PQfinish (plugin->dbh);
141       plugin->dbh = NULL;
142       return GNUNET_SYSERR;
143     }
144   }
145   PQclear (ret);
146
147   ret =
148       PQexec (plugin->dbh,
149               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
150   if (GNUNET_OK !=
151       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
152   {
153     PQfinish (plugin->dbh);
154     plugin->dbh = NULL;
155     return GNUNET_SYSERR;
156   }
157   PQclear (ret);
158   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
159   if (GNUNET_OK !=
160       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
161   {
162     PQfinish (plugin->dbh);
163     plugin->dbh = NULL;
164     return GNUNET_SYSERR;
165   }
166   PQclear (ret);
167   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
168   if (GNUNET_OK !=
169       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
170   {
171     PQfinish (plugin->dbh);
172     plugin->dbh = NULL;
173     return GNUNET_SYSERR;
174   }
175   PQclear (ret);
176 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
177   if ((GNUNET_OK !=
178        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
179                    "SELECT " RESULT_COLUMNS " FROM gn090 "
180                    "WHERE oid >= $1::bigint AND "
181                    "(rvalue >= $2 OR 0 = $3::smallint) AND "
182                    "(hash = $4 OR 0 = $5::smallint) AND "
183                    "(type = $6 OR 0 = $7::smallint) "
184                    "ORDER BY oid ASC LIMIT 1", 7)) ||
185       (GNUNET_OK !=
186        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
187                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
188                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) ||
189       (GNUNET_OK !=
190        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
191                    "UPDATE gn090 "
192                    "SET prio = prio + $1, "
193                    "repl = repl + $2, "
194                    "expire = GREATEST(expire, $3) "
195                    "WHERE hash = $4 AND vhash = $5", 5)) ||
196       (GNUNET_OK !=
197        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
198                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
199                    "WHERE oid = $1", 1)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
202                    "SELECT " RESULT_COLUMNS " FROM gn090 "
203                    "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
204                    "ORDER BY oid ASC LIMIT 1",
205                    2)) ||
206       (GNUNET_OK !=
207        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
208                    "(SELECT " RESULT_COLUMNS " FROM gn090 "
209                     "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
210                    "UNION "
211                    "(SELECT " RESULT_COLUMNS " FROM gn090 "
212                     "ORDER BY prio ASC LIMIT 1) "
213                    "ORDER BY expire ASC LIMIT 1",
214                    1)) ||
215       (GNUNET_OK !=
216        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
217                    "SELECT " RESULT_COLUMNS " FROM gn090 "
218                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
219       (GNUNET_OK !=
220        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
221       (GNUNET_OK !=
222        GNUNET_POSTGRES_prepare (plugin->dbh, "remove", "DELETE FROM gn090 "
223          "WHERE hash = $1 AND "
224          "value = $2", 2)) ||
225       (GNUNET_OK !=
226        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
227   {
228     PQfinish (plugin->dbh);
229     plugin->dbh = NULL;
230     return GNUNET_SYSERR;
231   }
232   return GNUNET_OK;
233 }
234
235
236 /**
237  * Get an estimate of how much space the database is
238  * currently using.
239  *
240  * @param cls our `struct Plugin *`
241  * @return number of bytes used on disk
242  */
243 static void
244 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
245 {
246   struct Plugin *plugin = cls;
247   unsigned long long total;
248   PGresult *ret;
249
250   if (NULL == estimate)
251     return;
252   ret =
253       PQexecParams (plugin->dbh,
254                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
255                     NULL, NULL, NULL, NULL, 1);
256   if (GNUNET_OK !=
257       GNUNET_POSTGRES_check_result (plugin->dbh,
258                                     ret,
259                                     PGRES_TUPLES_OK,
260                                     "PQexecParams",
261                                     "get_size"))
262   {
263     *estimate = 0;
264     return;
265   }
266   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
267   {
268     GNUNET_break (0);
269     PQclear (ret);
270     *estimate = 0;
271     return;
272   }
273   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
274   {
275     GNUNET_break (0 == PQgetlength (ret, 0, 0));
276     PQclear (ret);
277     *estimate = 0;
278     return;
279   }
280   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
281   PQclear (ret);
282   *estimate = total;
283 }
284
285
286 /**
287  * Store an item in the datastore.
288  *
289  * @param cls closure with the `struct Plugin`
290  * @param key key for the item
291  * @param absent true if the key was not found in the bloom filter
292  * @param size number of bytes in data
293  * @param data content stored
294  * @param type type of the content
295  * @param priority priority of the content
296  * @param anonymity anonymity-level for the content
297  * @param replication replication-level for the content
298  * @param expiration expiration time for the content
299  * @param cont continuation called with success or failure status
300  * @param cont_cls continuation closure
301  */
302 static void
303 postgres_plugin_put (void *cls,
304                      const struct GNUNET_HashCode *key,
305                      bool absent,
306                      uint32_t size,
307                      const void *data,
308                      enum GNUNET_BLOCK_Type type,
309                      uint32_t priority,
310                      uint32_t anonymity,
311                      uint32_t replication,
312                      struct GNUNET_TIME_Absolute expiration,
313                      PluginPutCont cont,
314                      void *cont_cls)
315 {
316   struct Plugin *plugin = cls;
317   struct GNUNET_HashCode vhash;
318   PGresult *ret;
319
320   GNUNET_CRYPTO_hash (data,
321                       size,
322                       &vhash);
323
324   if (!absent)
325   {
326     struct GNUNET_PQ_QueryParam params[] = {
327       GNUNET_PQ_query_param_uint32 (&priority),
328       GNUNET_PQ_query_param_uint32 (&replication),
329       GNUNET_PQ_query_param_absolute_time (&expiration),
330       GNUNET_PQ_query_param_auto_from_type (key),
331       GNUNET_PQ_query_param_auto_from_type (&vhash),
332       GNUNET_PQ_query_param_end
333     };
334     ret = GNUNET_PQ_exec_prepared (plugin->dbh,
335                                    "update",
336                                    params);
337     if (GNUNET_OK !=
338         GNUNET_POSTGRES_check_result (plugin->dbh,
339                                       ret,
340                                       PGRES_COMMAND_OK,
341                                       "PQexecPrepared",
342                                       "update"))
343     {
344       cont (cont_cls,
345             key,
346             size,
347             GNUNET_SYSERR,
348             _("Postgress exec failure"));
349       return;
350     }
351     /* What an awful API, this function really does return a string */
352     bool affected = 0 != strcmp ("0", PQcmdTuples (ret));
353     PQclear (ret);
354     if (affected)
355     {
356       cont (cont_cls,
357             key,
358             size,
359             GNUNET_NO,
360             NULL);
361       return;
362     }
363   }
364
365   uint32_t utype = type;
366   uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
367                                               UINT64_MAX);
368   struct GNUNET_PQ_QueryParam params[] = {
369     GNUNET_PQ_query_param_uint32 (&replication),
370     GNUNET_PQ_query_param_uint32 (&utype),
371     GNUNET_PQ_query_param_uint32 (&priority),
372     GNUNET_PQ_query_param_uint32 (&anonymity),
373     GNUNET_PQ_query_param_absolute_time (&expiration),
374     GNUNET_PQ_query_param_uint64 (&rvalue),
375     GNUNET_PQ_query_param_auto_from_type (key),
376     GNUNET_PQ_query_param_auto_from_type (&vhash),
377     GNUNET_PQ_query_param_fixed_size (data, size),
378     GNUNET_PQ_query_param_end
379   };
380
381   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
382                                  "put",
383                                  params);
384   if (GNUNET_OK !=
385       GNUNET_POSTGRES_check_result (plugin->dbh,
386                                     ret,
387                                     PGRES_COMMAND_OK,
388                                     "PQexecPrepared", "put"))
389   {
390     cont (cont_cls, key, size,
391           GNUNET_SYSERR,
392           _("Postgress exec failure"));
393     return;
394   }
395   PQclear (ret);
396   plugin->env->duc (plugin->env->cls,
397                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
398   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
399                    "datastore-postgres",
400                    "Stored %u bytes in database\n",
401                    (unsigned int) size);
402   cont (cont_cls, key, size, GNUNET_OK, NULL);
403 }
404
405
406 /**
407  * Function invoked to process the result and call the processor.
408  *
409  * @param plugin global plugin data
410  * @param proc function to call the value (once only).
411  * @param proc_cls closure for proc
412  * @param res result from exec
413  * @param filename filename for error messages
414  * @param line line number for error messages
415  */
416 static void
417 process_result (struct Plugin *plugin,
418                 PluginDatumProcessor proc,
419                 void *proc_cls,
420                 PGresult * res,
421                 const char *filename, int line)
422 {
423   int iret;
424   uint32_t rowid;
425   uint32_t utype;
426   uint32_t anonymity;
427   uint32_t replication;
428   uint32_t priority;
429   size_t size;
430   void *data;
431   struct GNUNET_TIME_Absolute expiration_time;
432   struct GNUNET_HashCode key;
433   struct GNUNET_PQ_ResultSpec rs[] = {
434     GNUNET_PQ_result_spec_uint32 ("repl", &replication),
435     GNUNET_PQ_result_spec_uint32 ("type", &utype),
436     GNUNET_PQ_result_spec_uint32 ("prio", &priority),
437     GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
438     GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
439     GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
440     GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
441     GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
442     GNUNET_PQ_result_spec_end
443   };
444
445   if (GNUNET_OK !=
446       GNUNET_POSTGRES_check_result_ (plugin->dbh,
447                                      res,
448                                      PGRES_TUPLES_OK,
449                                      "PQexecPrepared",
450                                      "select",
451                                      filename, line))
452   {
453     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
454                      "datastore-postgres",
455                      "Ending iteration (postgres error)\n");
456     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
457     return;
458   }
459
460   if (0 == PQntuples (res))
461   {
462     /* no result */
463     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
464                      "datastore-postgres",
465                      "Ending iteration (no more results)\n");
466     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
467     PQclear (res);
468     return;
469   }
470   if (1 != PQntuples (res))
471   {
472     GNUNET_break (0);
473     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
474     PQclear (res);
475     return;
476   }
477   if (GNUNET_OK !=
478       GNUNET_PQ_extract_result (res,
479                                 rs,
480                                 0))
481   {
482     GNUNET_break (0);
483     PQclear (res);
484     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
485                                      "delrow",
486                                      rowid);
487     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
488     return;
489   }
490
491   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
492                    "datastore-postgres",
493                    "Found result of size %u bytes and type %u in database\n",
494                    (unsigned int) size,
495                    (unsigned int) utype);
496   iret = proc (proc_cls,
497                &key,
498                size,
499                data,
500                (enum GNUNET_BLOCK_Type) utype,
501                priority,
502                anonymity,
503                replication,
504                expiration_time,
505                rowid);
506   PQclear (res);
507   if (iret == GNUNET_NO)
508   {
509     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510                 "Processor asked for item %u to be removed.\n",
511                 (unsigned int) rowid);
512     if (GNUNET_OK ==
513         GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
514                                          "delrow",
515                                          rowid))
516     {
517       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
518                        "datastore-postgres",
519                        "Deleting %u bytes from database\n",
520                        (unsigned int) size);
521       plugin->env->duc (plugin->env->cls,
522                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
523       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
524                        "datastore-postgres",
525                        "Deleted %u bytes from database\n",
526                        (unsigned int) size);
527     }
528   }
529 }
530
531
532 /**
533  * Get one of the results for a particular key in the datastore.
534  *
535  * @param cls closure with the 'struct Plugin'
536  * @param next_uid return the result with lowest uid >= next_uid
537  * @param random if true, return a random result instead of using next_uid
538  * @param key maybe NULL (to match all entries)
539  * @param type entries of which type are relevant?
540  *     Use 0 for any type.
541  * @param proc function to call on the matching value;
542  *        will be called with NULL if nothing matches
543  * @param proc_cls closure for @a proc
544  */
545 static void
546 postgres_plugin_get_key (void *cls,
547                          uint64_t next_uid,
548                          bool random,
549                          const struct GNUNET_HashCode *key,
550                          enum GNUNET_BLOCK_Type type,
551                          PluginDatumProcessor proc,
552                          void *proc_cls)
553 {
554   struct Plugin *plugin = cls;
555   uint32_t utype = type;
556   uint16_t use_rvalue = random;
557   uint16_t use_key = NULL != key;
558   uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
559   uint64_t rvalue;
560   struct GNUNET_PQ_QueryParam params[] = {
561     GNUNET_PQ_query_param_uint64 (&next_uid),
562     GNUNET_PQ_query_param_uint64 (&rvalue),
563     GNUNET_PQ_query_param_uint16 (&use_rvalue),
564     GNUNET_PQ_query_param_auto_from_type (key),
565     GNUNET_PQ_query_param_uint16 (&use_key),
566     GNUNET_PQ_query_param_uint32 (&utype),
567     GNUNET_PQ_query_param_uint16 (&use_type),
568     GNUNET_PQ_query_param_end
569   };
570   PGresult *ret;
571
572   if (random)
573   {
574     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
575                                        UINT64_MAX);
576     next_uid = 0;
577   }
578   else
579     rvalue = 0;
580
581   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
582                                  "get",
583                                  params);
584   process_result (plugin,
585                   proc,
586                   proc_cls,
587                   ret,
588                   __FILE__, __LINE__);
589 }
590
591
592 /**
593  * Select a subset of the items in the datastore and call
594  * the given iterator for each of them.
595  *
596  * @param cls our `struct Plugin *`
597  * @param next_uid return the result with lowest uid >= next_uid
598  * @param type entries of which type should be considered?
599  *        Must not be zero (ANY).
600  * @param proc function to call on the matching value;
601  *        will be called with NULL if no value matches
602  * @param proc_cls closure for @a proc
603  */
604 static void
605 postgres_plugin_get_zero_anonymity (void *cls,
606                                     uint64_t next_uid,
607                                     enum GNUNET_BLOCK_Type type,
608                                     PluginDatumProcessor proc,
609                                     void *proc_cls)
610 {
611   struct Plugin *plugin = cls;
612   uint32_t utype = type;
613   struct GNUNET_PQ_QueryParam params[] = {
614     GNUNET_PQ_query_param_uint32 (&utype),
615     GNUNET_PQ_query_param_uint64 (&next_uid),
616     GNUNET_PQ_query_param_end
617   };
618   PGresult *ret;
619
620   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
621                                  "select_non_anonymous",
622                                  params);
623
624   process_result (plugin,
625                   proc, proc_cls,
626                   ret,
627                   __FILE__, __LINE__);
628 }
629
630
631 /**
632  * Context for #repl_iter() function.
633  */
634 struct ReplCtx
635 {
636
637   /**
638    * Plugin handle.
639    */
640   struct Plugin *plugin;
641
642   /**
643    * Function to call for the result (or the NULL).
644    */
645   PluginDatumProcessor proc;
646
647   /**
648    * Closure for @e proc.
649    */
650   void *proc_cls;
651 };
652
653
654 /**
655  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
656  * Decrements the replication counter and calls the original
657  * iterator.
658  *
659  * @param cls closure with the `struct ReplCtx *`
660  * @param key key for the content
661  * @param size number of bytes in @a data
662  * @param data content stored
663  * @param type type of the content
664  * @param priority priority of the content
665  * @param anonymity anonymity-level for the content
666  * @param replication replication-level for the content
667  * @param expiration expiration time for the content
668  * @param uid unique identifier for the datum;
669  *        maybe 0 if no unique identifier is available
670  * @return #GNUNET_SYSERR to abort the iteration,
671  *         #GNUNET_OK to continue
672  *         (continue on call to "next", of course),
673  *         #GNUNET_NO to delete the item and continue (if supported)
674  */
675 static int
676 repl_proc (void *cls,
677            const struct GNUNET_HashCode *key,
678            uint32_t size,
679            const void *data,
680            enum GNUNET_BLOCK_Type type,
681            uint32_t priority,
682            uint32_t anonymity,
683            uint32_t replication,
684            struct GNUNET_TIME_Absolute expiration,
685            uint64_t uid)
686 {
687   struct ReplCtx *rc = cls;
688   struct Plugin *plugin = rc->plugin;
689   int ret;
690   uint32_t oid = (uint32_t) uid;
691   struct GNUNET_PQ_QueryParam params[] = {
692     GNUNET_PQ_query_param_uint32 (&oid),
693     GNUNET_PQ_query_param_end
694   };
695   PGresult *qret;
696
697   ret = rc->proc (rc->proc_cls,
698                   key,
699                   size,
700                   data,
701                   type,
702                   priority,
703                   anonymity,
704                   replication,
705                   expiration,
706                   uid);
707   if (NULL == key)
708     return ret;
709   qret = GNUNET_PQ_exec_prepared (plugin->dbh,
710                                   "decrepl",
711                                   params);
712   if (GNUNET_OK !=
713       GNUNET_POSTGRES_check_result (plugin->dbh,
714                                     qret,
715                                     PGRES_COMMAND_OK,
716                                     "PQexecPrepared",
717                                     "decrepl"))
718     return GNUNET_SYSERR;
719   PQclear (qret);
720   return ret;
721 }
722
723
724 /**
725  * Get a random item for replication.  Returns a single, not expired,
726  * random item from those with the highest replication counters.  The
727  * item's replication counter is decremented by one IF it was positive
728  * before.  Call @a proc with all values ZERO or NULL if the datastore
729  * is empty.
730  *
731  * @param cls closure with the `struct Plugin`
732  * @param proc function to call the value (once only).
733  * @param proc_cls closure for @a proc
734  */
735 static void
736 postgres_plugin_get_replication (void *cls,
737                                  PluginDatumProcessor proc,
738                                  void *proc_cls)
739 {
740   struct Plugin *plugin = cls;
741   struct ReplCtx rc;
742   PGresult *ret;
743
744   rc.plugin = plugin;
745   rc.proc = proc;
746   rc.proc_cls = proc_cls;
747   ret = PQexecPrepared (plugin->dbh,
748                         "select_replication_order", 0, NULL, NULL,
749                         NULL, 1);
750   process_result (plugin,
751                   &repl_proc,
752                   &rc,
753                   ret,
754                   __FILE__, __LINE__);
755 }
756
757
758 /**
759  * Get a random item for expiration.  Call @a proc with all values
760  * ZERO or NULL if the datastore is empty.
761  *
762  * @param cls closure with the `struct Plugin`
763  * @param proc function to call the value (once only).
764  * @param proc_cls closure for @a proc
765  */
766 static void
767 postgres_plugin_get_expiration (void *cls,
768                                 PluginDatumProcessor proc,
769                                 void *proc_cls)
770 {
771   struct Plugin *plugin = cls;
772   struct GNUNET_TIME_Absolute now;
773   struct GNUNET_PQ_QueryParam params[] = {
774     GNUNET_PQ_query_param_absolute_time (&now),
775     GNUNET_PQ_query_param_end
776   };
777   PGresult *ret;
778
779   now = GNUNET_TIME_absolute_get ();
780   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
781                                  "select_expiration_order",
782                                  params);
783   process_result (plugin,
784                   proc, proc_cls,
785                   ret,
786                   __FILE__, __LINE__);
787 }
788
789
790 /**
791  * Get all of the keys in the datastore.
792  *
793  * @param cls closure with the `struct Plugin *`
794  * @param proc function to call on each key
795  * @param proc_cls closure for @a proc
796  */
797 static void
798 postgres_plugin_get_keys (void *cls,
799                           PluginKeyProcessor proc,
800                           void *proc_cls)
801 {
802   struct Plugin *plugin = cls;
803   int ret;
804   int i;
805   struct GNUNET_HashCode key;
806   PGresult * res;
807
808   res = PQexecPrepared (plugin->dbh,
809                         "get_keys",
810                         0, NULL, NULL, NULL, 1);
811   ret = PQntuples (res);
812   for (i=0;i<ret;i++)
813   {
814     if (sizeof (struct GNUNET_HashCode) !=
815         PQgetlength (res, i, 0))
816     {
817       GNUNET_memcpy (&key,
818               PQgetvalue (res, i, 0),
819               sizeof (struct GNUNET_HashCode));
820       proc (proc_cls, &key, 1);
821     }
822   }
823   PQclear (res);
824   proc (proc_cls, NULL, 0);
825 }
826
827
828 /**
829  * Drop database.
830  *
831  * @param cls closure with the `struct Plugin *`
832  */
833 static void
834 postgres_plugin_drop (void *cls)
835 {
836   struct Plugin *plugin = cls;
837
838   if (GNUNET_OK !=
839       GNUNET_POSTGRES_exec (plugin->dbh,
840                             "DROP TABLE gn090"))
841     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
842                      "postgres",
843                      _("Failed to drop table from database.\n"));
844 }
845
846
847 /**
848  * Remove a particular key in the datastore.
849  *
850  * @param cls closure
851  * @param key key for the content
852  * @param size number of bytes in data
853  * @param data content stored
854  * @param cont continuation called with success or failure status
855  * @param cont_cls continuation closure for @a cont
856  */
857 static void
858 postgres_plugin_remove_key (void *cls,
859                             const struct GNUNET_HashCode *key,
860                             uint32_t size,
861                             const void *data,
862                             PluginRemoveCont cont,
863                             void *cont_cls)
864 {
865   struct Plugin *plugin = cls;
866   PGresult *ret;
867   struct GNUNET_PQ_QueryParam params[] = {
868     GNUNET_PQ_query_param_auto_from_type (key),
869     GNUNET_PQ_query_param_fixed_size (data, size),
870     GNUNET_PQ_query_param_end
871   };
872   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
873                                  "remove",
874                                  params);
875   if (GNUNET_OK !=
876       GNUNET_POSTGRES_check_result (plugin->dbh,
877                                     ret,
878                                     PGRES_COMMAND_OK,
879                                     "PQexecPrepared",
880                                     "remove"))
881   {
882     cont (cont_cls,
883           key,
884           size,
885           GNUNET_SYSERR,
886           _("Postgress exec failure"));
887     return;
888   }
889   /* What an awful API, this function really does return a string */
890   bool affected = 0 != strcmp ("0", PQcmdTuples (ret));
891   PQclear (ret);
892   if (!affected)
893   {
894     cont (cont_cls,
895           key,
896           size,
897           GNUNET_NO,
898           NULL);
899     return;
900   }
901   plugin->env->duc (plugin->env->cls,
902                     - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
903   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
904                    "datastore-postgres",
905                    "Deleted %u bytes from database\n",
906                    (unsigned int) size);
907   cont (cont_cls,
908         key,
909         size,
910         GNUNET_OK,
911         NULL);
912 }
913
914
915 /**
916  * Entry point for the plugin.
917  *
918  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
919  * @return our `struct Plugin *`
920  */
921 void *
922 libgnunet_plugin_datastore_postgres_init (void *cls)
923 {
924   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
925   struct GNUNET_DATASTORE_PluginFunctions *api;
926   struct Plugin *plugin;
927
928   plugin = GNUNET_new (struct Plugin);
929   plugin->env = env;
930   if (GNUNET_OK != init_connection (plugin))
931   {
932     GNUNET_free (plugin);
933     return NULL;
934   }
935   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
936   api->cls = plugin;
937   api->estimate_size = &postgres_plugin_estimate_size;
938   api->put = &postgres_plugin_put;
939   api->get_key = &postgres_plugin_get_key;
940   api->get_replication = &postgres_plugin_get_replication;
941   api->get_expiration = &postgres_plugin_get_expiration;
942   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
943   api->get_keys = &postgres_plugin_get_keys;
944   api->drop = &postgres_plugin_drop;
945   api->remove_key = &postgres_plugin_remove_key;
946   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
947                    "datastore-postgres",
948                    _("Postgres database running\n"));
949   return api;
950 }
951
952
953 /**
954  * Exit point from the plugin.
955  *
956  * @param cls our `struct Plugin *`
957  * @return always NULL
958  */
959 void *
960 libgnunet_plugin_datastore_postgres_done (void *cls)
961 {
962   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
963   struct Plugin *plugin = api->cls;
964
965   PQfinish (plugin->dbh);
966   GNUNET_free (plugin);
967   GNUNET_free (api);
968   return NULL;
969 }
970
971 /* end of plugin_datastore_postgres.c */