tighten formatting rules
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_datastore_plugin.h"
28 #include "gnunet_pq_lib.h"
29
30
31 /**
32  * After how many ms "busy" should a DB operation fail for good?
33  * A low value makes sure that we are more responsive to requests
34  * (especially PUTs).  A high value guarantees a higher success
35  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
36  *
37  * The default value of 1s should ensure that users do not experience
38  * huge latencies while at the same time allowing operations to succeed
39  * with reasonable probability.
40  */
41 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
42
43
44 /**
45  * Context for all functions in this plugin.
46  */
47 struct Plugin
48 {
49   /**
50    * Our execution environment.
51    */
52   struct GNUNET_DATASTORE_PluginEnvironment *env;
53
54   /**
55    * Native Postgres database handle.
56    */
57   struct GNUNET_PQ_Context *dbh;
58 };
59
60
61 /**
62  * @brief Get a database handle
63  *
64  * @param plugin global context
65  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
66  */
67 static int
68 init_connection (struct Plugin *plugin)
69 {
70   struct GNUNET_PQ_ExecuteStatement es[] = {
71     /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
72      * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
73      * we do math or inequality tests, so we can't handle the entire range of uint32_t.
74      * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
75      * PostgreSQL also recommends against using WITH OIDS.
76      */GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
77                             "  repl INTEGER NOT NULL DEFAULT 0,"
78                             "  type INTEGER NOT NULL DEFAULT 0,"
79                             "  prio INTEGER NOT NULL DEFAULT 0,"
80                             "  anonLevel INTEGER NOT NULL DEFAULT 0,"
81                             "  expire BIGINT NOT NULL DEFAULT 0,"
82                             "  rvalue BIGINT NOT NULL DEFAULT 0,"
83                             "  hash BYTEA NOT NULL DEFAULT '',"
84                             "  vhash BYTEA NOT NULL DEFAULT '',"
85                             "  value BYTEA NOT NULL DEFAULT '')"
86                             "WITH OIDS"),
87     GNUNET_PQ_make_try_execute (
88       "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
89     GNUNET_PQ_make_try_execute (
90       "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
91     GNUNET_PQ_make_try_execute (
92       "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
93     GNUNET_PQ_make_try_execute (
94       "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
95     GNUNET_PQ_make_try_execute (
96       "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
97     GNUNET_PQ_make_try_execute (
98       "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
99     GNUNET_PQ_make_try_execute (
100       "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
101     GNUNET_PQ_make_execute (
102       "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
103     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
104     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
105     GNUNET_PQ_EXECUTE_STATEMENT_END
106   };
107
108 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
109   struct GNUNET_PQ_PreparedStatement ps[] = {
110     GNUNET_PQ_make_prepare ("get",
111                             "SELECT " RESULT_COLUMNS " FROM gn090"
112                             " WHERE oid >= $1::bigint AND"
113                             " (rvalue >= $2 OR 0 = $3::smallint) AND"
114                             " (hash = $4 OR 0 = $5::smallint) AND"
115                             " (type = $6 OR 0 = $7::smallint)"
116                             " ORDER BY oid ASC LIMIT 1",
117                             7),
118     GNUNET_PQ_make_prepare ("put",
119                             "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
120                             "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
121                             9),
122     GNUNET_PQ_make_prepare ("update",
123                             "UPDATE gn090"
124                             " SET prio = prio + $1,"
125                             " repl = repl + $2,"
126                             " expire = GREATEST(expire, $3)"
127                             " WHERE hash = $4 AND vhash = $5",
128                             5),
129     GNUNET_PQ_make_prepare ("decrepl",
130                             "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
131                             "WHERE oid = $1",
132                             1),
133     GNUNET_PQ_make_prepare ("select_non_anonymous",
134                             "SELECT " RESULT_COLUMNS " FROM gn090 "
135                             "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
136                             "ORDER BY oid ASC LIMIT 1",
137                             2),
138     GNUNET_PQ_make_prepare ("select_expiration_order",
139                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
140                             "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
141                             "UNION "
142                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
143                             "ORDER BY prio ASC LIMIT 1) "
144                             "ORDER BY expire ASC LIMIT 1",
145                             1),
146     GNUNET_PQ_make_prepare ("select_replication_order",
147                             "SELECT " RESULT_COLUMNS " FROM gn090 "
148                             "ORDER BY repl DESC,RANDOM() LIMIT 1",
149                             0),
150     GNUNET_PQ_make_prepare ("delrow",
151                             "DELETE FROM gn090 "
152                             "WHERE oid=$1",
153                             1),
154     GNUNET_PQ_make_prepare ("remove",
155                             "DELETE FROM gn090"
156                             " WHERE hash = $1 AND"
157                             " value = $2",
158                             2),
159     GNUNET_PQ_make_prepare ("get_keys",
160                             "SELECT hash FROM gn090",
161                             0),
162     GNUNET_PQ_make_prepare ("estimate_size",
163                             "SELECT CASE WHEN NOT EXISTS"
164                             "  (SELECT 1 FROM gn090)"
165                             "  THEN 0"
166                             "  ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
167                             "END AS total",
168                             0),
169     GNUNET_PQ_PREPARED_STATEMENT_END
170   };
171 #undef RESULT_COLUMNS
172
173   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
174                                             "datastore-postgres",
175                                             es,
176                                             ps);
177   if (NULL == plugin->dbh)
178     return GNUNET_SYSERR;
179   return GNUNET_OK;
180 }
181
182
183 /**
184  * Get an estimate of how much space the database is
185  * currently using.
186  *
187  * @param cls our `struct Plugin *`
188  * @return number of bytes used on disk
189  */
190 static void
191 postgres_plugin_estimate_size (void *cls,
192                                unsigned long long *estimate)
193 {
194   struct Plugin *plugin = cls;
195   uint64_t total;
196   struct GNUNET_PQ_QueryParam params[] = {
197     GNUNET_PQ_query_param_end
198   };
199   struct GNUNET_PQ_ResultSpec rs[] = {
200     GNUNET_PQ_result_spec_uint64 ("total",
201                                   &total),
202     GNUNET_PQ_result_spec_end
203   };
204   enum GNUNET_DB_QueryStatus ret;
205
206   if (NULL == estimate)
207     return;
208   ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
209                                                   "estimate_size",
210                                                   params,
211                                                   rs);
212   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
213   {
214     *estimate = 0LL;
215     return;
216   }
217   *estimate = total;
218 }
219
220
221 /**
222  * Store an item in the datastore.
223  *
224  * @param cls closure with the `struct Plugin`
225  * @param key key for the item
226  * @param absent true if the key was not found in the bloom filter
227  * @param size number of bytes in data
228  * @param data content stored
229  * @param type type of the content
230  * @param priority priority of the content
231  * @param anonymity anonymity-level for the content
232  * @param replication replication-level for the content
233  * @param expiration expiration time for the content
234  * @param cont continuation called with success or failure status
235  * @param cont_cls continuation closure
236  */
237 static void
238 postgres_plugin_put (void *cls,
239                      const struct GNUNET_HashCode *key,
240                      bool absent,
241                      uint32_t size,
242                      const void *data,
243                      enum GNUNET_BLOCK_Type type,
244                      uint32_t priority,
245                      uint32_t anonymity,
246                      uint32_t replication,
247                      struct GNUNET_TIME_Absolute expiration,
248                      PluginPutCont cont,
249                      void *cont_cls)
250 {
251   struct Plugin *plugin = cls;
252   struct GNUNET_HashCode vhash;
253   enum GNUNET_DB_QueryStatus ret;
254
255   GNUNET_CRYPTO_hash (data,
256                       size,
257                       &vhash);
258   if (! absent)
259   {
260     struct GNUNET_PQ_QueryParam params[] = {
261       GNUNET_PQ_query_param_uint32 (&priority),
262       GNUNET_PQ_query_param_uint32 (&replication),
263       GNUNET_PQ_query_param_absolute_time (&expiration),
264       GNUNET_PQ_query_param_auto_from_type (key),
265       GNUNET_PQ_query_param_auto_from_type (&vhash),
266       GNUNET_PQ_query_param_end
267     };
268     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
269                                               "update",
270                                               params);
271     if (0 > ret)
272     {
273       cont (cont_cls,
274             key,
275             size,
276             GNUNET_SYSERR,
277             _ ("Postgress exec failure"));
278       return;
279     }
280     bool affected = (0 != ret);
281     if (affected)
282     {
283       cont (cont_cls,
284             key,
285             size,
286             GNUNET_NO,
287             NULL);
288       return;
289     }
290   }
291
292   {
293     uint32_t utype = (uint32_t) type;
294     uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
295                                                 UINT64_MAX);
296     struct GNUNET_PQ_QueryParam params[] = {
297       GNUNET_PQ_query_param_uint32 (&replication),
298       GNUNET_PQ_query_param_uint32 (&utype),
299       GNUNET_PQ_query_param_uint32 (&priority),
300       GNUNET_PQ_query_param_uint32 (&anonymity),
301       GNUNET_PQ_query_param_absolute_time (&expiration),
302       GNUNET_PQ_query_param_uint64 (&rvalue),
303       GNUNET_PQ_query_param_auto_from_type (key),
304       GNUNET_PQ_query_param_auto_from_type (&vhash),
305       GNUNET_PQ_query_param_fixed_size (data, size),
306       GNUNET_PQ_query_param_end
307     };
308
309     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
310                                               "put",
311                                               params);
312     if (0 > ret)
313     {
314       cont (cont_cls,
315             key,
316             size,
317             GNUNET_SYSERR,
318             "Postgress exec failure");
319       return;
320     }
321   }
322   plugin->env->duc (plugin->env->cls,
323                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
324   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
325                    "datastore-postgres",
326                    "Stored %u bytes in database\n",
327                    (unsigned int) size);
328   cont (cont_cls,
329         key,
330         size,
331         GNUNET_OK,
332         NULL);
333 }
334
335
336 /**
337  * Closure for #process_result.
338  */
339 struct ProcessResultContext
340 {
341   /**
342    * The plugin handle.
343    */
344   struct Plugin *plugin;
345
346   /**
347    * Function to call on each result.
348    */
349   PluginDatumProcessor proc;
350
351   /**
352    * Closure for @e proc.
353    */
354   void *proc_cls;
355 };
356
357
358 /**
359  * Function invoked to process the result and call the processor of @a
360  * cls.
361  *
362  * @param cls our `struct ProcessResultContext`
363  * @param res result from exec
364  * @param num_results number of results in @a res
365  */
366 static void
367 process_result (void *cls,
368                 PGresult *res,
369                 unsigned int num_results)
370 {
371   struct ProcessResultContext *prc = cls;
372   struct Plugin *plugin = prc->plugin;
373
374   if (0 == num_results)
375   {
376     /* no result */
377     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
378                      "datastore-postgres",
379                      "Ending iteration (no more results)\n");
380     prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
381                GNUNET_TIME_UNIT_ZERO_ABS, 0);
382     return;
383   }
384   if (1 != num_results)
385   {
386     GNUNET_break (0);
387     prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
388                GNUNET_TIME_UNIT_ZERO_ABS, 0);
389     return;
390   }
391   /* Technically we don't need the loop here, but nicer in case
392      we ever relax the condition above. */
393   for (unsigned int i = 0; i < num_results; i++)
394   {
395     int iret;
396     uint32_t rowid;
397     uint32_t utype;
398     uint32_t anonymity;
399     uint32_t replication;
400     uint32_t priority;
401     size_t size;
402     void *data;
403     struct GNUNET_TIME_Absolute expiration_time;
404     struct GNUNET_HashCode key;
405     struct GNUNET_PQ_ResultSpec rs[] = {
406       GNUNET_PQ_result_spec_uint32 ("repl", &replication),
407       GNUNET_PQ_result_spec_uint32 ("type", &utype),
408       GNUNET_PQ_result_spec_uint32 ("prio", &priority),
409       GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
410       GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
411       GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
412       GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
413       GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
414       GNUNET_PQ_result_spec_end
415     };
416
417     if (GNUNET_OK !=
418         GNUNET_PQ_extract_result (res,
419                                   rs,
420                                   i))
421     {
422       GNUNET_break (0);
423       prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
424                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
425       return;
426     }
427
428     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
429                      "datastore-postgres",
430                      "Found result of size %u bytes and type %u in database\n",
431                      (unsigned int) size,
432                      (unsigned int) utype);
433     iret = prc->proc (prc->proc_cls,
434                       &key,
435                       size,
436                       data,
437                       (enum GNUNET_BLOCK_Type) utype,
438                       priority,
439                       anonymity,
440                       replication,
441                       expiration_time,
442                       rowid);
443     if (iret == GNUNET_NO)
444     {
445       struct GNUNET_PQ_QueryParam param[] = {
446         GNUNET_PQ_query_param_uint32 (&rowid),
447         GNUNET_PQ_query_param_end
448       };
449
450       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451                   "Processor asked for item %u to be removed.\n",
452                   (unsigned int) rowid);
453       if (0 <
454           GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
455                                               "delrow",
456                                               param))
457       {
458         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
459                          "datastore-postgres",
460                          "Deleting %u bytes from database\n",
461                          (unsigned int) size);
462         plugin->env->duc (plugin->env->cls,
463                           -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
464         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
465                          "datastore-postgres",
466                          "Deleted %u bytes from database\n",
467                          (unsigned int) size);
468       }
469     }
470     GNUNET_PQ_cleanup_result (rs);
471   }   /* for (i) */
472 }
473
474
475 /**
476  * Get one of the results for a particular key in the datastore.
477  *
478  * @param cls closure with the `struct Plugin`
479  * @param next_uid return the result with lowest uid >= next_uid
480  * @param random if true, return a random result instead of using next_uid
481  * @param key maybe NULL (to match all entries)
482  * @param type entries of which type are relevant?
483  *     Use 0 for any type.
484  * @param proc function to call on the matching value;
485  *        will be called with NULL if nothing matches
486  * @param proc_cls closure for @a proc
487  */
488 static void
489 postgres_plugin_get_key (void *cls,
490                          uint64_t next_uid,
491                          bool random,
492                          const struct GNUNET_HashCode *key,
493                          enum GNUNET_BLOCK_Type type,
494                          PluginDatumProcessor proc,
495                          void *proc_cls)
496 {
497   struct Plugin *plugin = cls;
498   uint32_t utype = type;
499   uint16_t use_rvalue = random;
500   uint16_t use_key = NULL != key;
501   uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
502   uint64_t rvalue;
503   struct GNUNET_PQ_QueryParam params[] = {
504     GNUNET_PQ_query_param_uint64 (&next_uid),
505     GNUNET_PQ_query_param_uint64 (&rvalue),
506     GNUNET_PQ_query_param_uint16 (&use_rvalue),
507     GNUNET_PQ_query_param_auto_from_type (key),
508     GNUNET_PQ_query_param_uint16 (&use_key),
509     GNUNET_PQ_query_param_uint32 (&utype),
510     GNUNET_PQ_query_param_uint16 (&use_type),
511     GNUNET_PQ_query_param_end
512   };
513   struct ProcessResultContext prc;
514   enum GNUNET_DB_QueryStatus res;
515
516   if (random)
517   {
518     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
519                                        UINT64_MAX);
520     next_uid = 0;
521   }
522   else
523   {
524     rvalue = 0;
525   }
526   prc.plugin = plugin;
527   prc.proc = proc;
528   prc.proc_cls = proc_cls;
529
530   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
531                                               "get",
532                                               params,
533                                               &process_result,
534                                               &prc);
535   if (0 > res)
536     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
537           GNUNET_TIME_UNIT_ZERO_ABS, 0);
538 }
539
540
541 /**
542  * Select a subset of the items in the datastore and call
543  * the given iterator for each of them.
544  *
545  * @param cls our `struct Plugin *`
546  * @param next_uid return the result with lowest uid >= next_uid
547  * @param type entries of which type should be considered?
548  *        Must not be zero (ANY).
549  * @param proc function to call on the matching value;
550  *        will be called with NULL if no value matches
551  * @param proc_cls closure for @a proc
552  */
553 static void
554 postgres_plugin_get_zero_anonymity (void *cls,
555                                     uint64_t next_uid,
556                                     enum GNUNET_BLOCK_Type type,
557                                     PluginDatumProcessor proc,
558                                     void *proc_cls)
559 {
560   struct Plugin *plugin = cls;
561   uint32_t utype = type;
562   struct GNUNET_PQ_QueryParam params[] = {
563     GNUNET_PQ_query_param_uint32 (&utype),
564     GNUNET_PQ_query_param_uint64 (&next_uid),
565     GNUNET_PQ_query_param_end
566   };
567   struct ProcessResultContext prc;
568   enum GNUNET_DB_QueryStatus res;
569
570   prc.plugin = plugin;
571   prc.proc = proc;
572   prc.proc_cls = proc_cls;
573   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
574                                               "select_non_anonymous",
575                                               params,
576                                               &process_result,
577                                               &prc);
578   if (0 > res)
579     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
580           GNUNET_TIME_UNIT_ZERO_ABS, 0);
581 }
582
583
584 /**
585  * Context for #repl_iter() function.
586  */
587 struct ReplCtx
588 {
589   /**
590    * Plugin handle.
591    */
592   struct Plugin *plugin;
593
594   /**
595    * Function to call for the result (or the NULL).
596    */
597   PluginDatumProcessor proc;
598
599   /**
600    * Closure for @e proc.
601    */
602   void *proc_cls;
603 };
604
605
606 /**
607  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
608  * Decrements the replication counter and calls the original
609  * iterator.
610  *
611  * @param cls closure with the `struct ReplCtx *`
612  * @param key key for the content
613  * @param size number of bytes in @a data
614  * @param data content stored
615  * @param type type of the content
616  * @param priority priority of the content
617  * @param anonymity anonymity-level for the content
618  * @param replication replication-level for the content
619  * @param expiration expiration time for the content
620  * @param uid unique identifier for the datum;
621  *        maybe 0 if no unique identifier is available
622  * @return #GNUNET_SYSERR to abort the iteration,
623  *         #GNUNET_OK to continue
624  *         (continue on call to "next", of course),
625  *         #GNUNET_NO to delete the item and continue (if supported)
626  */
627 static int
628 repl_proc (void *cls,
629            const struct GNUNET_HashCode *key,
630            uint32_t size,
631            const void *data,
632            enum GNUNET_BLOCK_Type type,
633            uint32_t priority,
634            uint32_t anonymity,
635            uint32_t replication,
636            struct GNUNET_TIME_Absolute expiration,
637            uint64_t uid)
638 {
639   struct ReplCtx *rc = cls;
640   struct Plugin *plugin = rc->plugin;
641   int ret;
642   uint32_t oid = (uint32_t) uid;
643   struct GNUNET_PQ_QueryParam params[] = {
644     GNUNET_PQ_query_param_uint32 (&oid),
645     GNUNET_PQ_query_param_end
646   };
647   enum GNUNET_DB_QueryStatus qret;
648
649   ret = rc->proc (rc->proc_cls,
650                   key,
651                   size,
652                   data,
653                   type,
654                   priority,
655                   anonymity,
656                   replication,
657                   expiration,
658                   uid);
659   if (NULL == key)
660     return ret;
661   qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
662                                              "decrepl",
663                                              params);
664   if (0 > qret)
665     return GNUNET_SYSERR;
666   return ret;
667 }
668
669
670 /**
671  * Get a random item for replication.  Returns a single, not expired,
672  * random item from those with the highest replication counters.  The
673  * item's replication counter is decremented by one IF it was positive
674  * before.  Call @a proc with all values ZERO or NULL if the datastore
675  * is empty.
676  *
677  * @param cls closure with the `struct Plugin`
678  * @param proc function to call the value (once only).
679  * @param proc_cls closure for @a proc
680  */
681 static void
682 postgres_plugin_get_replication (void *cls,
683                                  PluginDatumProcessor proc,
684                                  void *proc_cls)
685 {
686   struct Plugin *plugin = cls;
687   struct GNUNET_PQ_QueryParam params[] = {
688     GNUNET_PQ_query_param_end
689   };
690   struct ReplCtx rc;
691   struct ProcessResultContext prc;
692   enum GNUNET_DB_QueryStatus res;
693
694   rc.plugin = plugin;
695   rc.proc = proc;
696   rc.proc_cls = proc_cls;
697   prc.plugin = plugin;
698   prc.proc = &repl_proc;
699   prc.proc_cls = &rc;
700   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
701                                               "select_replication_order",
702                                               params,
703                                               &process_result,
704                                               &prc);
705   if (0 > res)
706     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
707           GNUNET_TIME_UNIT_ZERO_ABS, 0);
708 }
709
710
711 /**
712  * Get a random item for expiration.  Call @a proc with all values
713  * ZERO or NULL if the datastore is empty.
714  *
715  * @param cls closure with the `struct Plugin`
716  * @param proc function to call the value (once only).
717  * @param proc_cls closure for @a proc
718  */
719 static void
720 postgres_plugin_get_expiration (void *cls,
721                                 PluginDatumProcessor proc,
722                                 void *proc_cls)
723 {
724   struct Plugin *plugin = cls;
725   struct GNUNET_TIME_Absolute now;
726   struct GNUNET_PQ_QueryParam params[] = {
727     GNUNET_PQ_query_param_absolute_time (&now),
728     GNUNET_PQ_query_param_end
729   };
730   struct ProcessResultContext prc;
731
732   now = GNUNET_TIME_absolute_get ();
733   prc.plugin = plugin;
734   prc.proc = proc;
735   prc.proc_cls = proc_cls;
736   (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
737                                                "select_expiration_order",
738                                                params,
739                                                &process_result,
740                                                &prc);
741 }
742
743
744 /**
745  * Closure for #process_keys.
746  */
747 struct ProcessKeysContext
748 {
749   /**
750    * Function to call for each key.
751    */
752   PluginKeyProcessor proc;
753
754   /**
755    * Closure for @e proc.
756    */
757   void *proc_cls;
758 };
759
760
761 /**
762  * Function to be called with the results of a SELECT statement
763  * that has returned @a num_results results.
764  *
765  * @param cls closure with a `struct ProcessKeysContext`
766  * @param result the postgres result
767  * @param num_result the number of results in @a result
768  */
769 static void
770 process_keys (void *cls,
771               PGresult *result,
772               unsigned int num_results)
773 {
774   struct ProcessKeysContext *pkc = cls;
775
776   for (unsigned i = 0; i < num_results; i++)
777   {
778     struct GNUNET_HashCode key;
779     struct GNUNET_PQ_ResultSpec rs[] = {
780       GNUNET_PQ_result_spec_auto_from_type ("hash",
781                                             &key),
782       GNUNET_PQ_result_spec_end
783     };
784
785     if (GNUNET_OK !=
786         GNUNET_PQ_extract_result (result,
787                                   rs,
788                                   i))
789     {
790       GNUNET_break (0);
791       continue;
792     }
793     pkc->proc (pkc->proc_cls,
794                &key,
795                1);
796     GNUNET_PQ_cleanup_result (rs);
797   }
798 }
799
800
801 /**
802  * Get all of the keys in the datastore.
803  *
804  * @param cls closure with the `struct Plugin *`
805  * @param proc function to call on each key
806  * @param proc_cls closure for @a proc
807  */
808 static void
809 postgres_plugin_get_keys (void *cls,
810                           PluginKeyProcessor proc,
811                           void *proc_cls)
812 {
813   struct Plugin *plugin = cls;
814   struct GNUNET_PQ_QueryParam params[] = {
815     GNUNET_PQ_query_param_end
816   };
817   struct ProcessKeysContext pkc;
818
819   pkc.proc = proc;
820   pkc.proc_cls = proc_cls;
821   (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
822                                                "get_keys",
823                                                params,
824                                                &process_keys,
825                                                &pkc);
826   proc (proc_cls,
827         NULL,
828         0);
829 }
830
831
832 /**
833  * Drop database.
834  *
835  * @param cls closure with the `struct Plugin *`
836  */
837 static void
838 postgres_plugin_drop (void *cls)
839 {
840   struct Plugin *plugin = cls;
841   struct GNUNET_PQ_ExecuteStatement es[] = {
842     GNUNET_PQ_make_execute ("DROP TABLE gn090"),
843     GNUNET_PQ_EXECUTE_STATEMENT_END
844   };
845
846   if (GNUNET_OK !=
847       GNUNET_PQ_exec_statements (plugin->dbh,
848                                  es))
849     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
850                      "postgres",
851                      _ ("Failed to drop table from database.\n"));
852 }
853
854
855 /**
856  * Remove a particular key in the datastore.
857  *
858  * @param cls closure
859  * @param key key for the content
860  * @param size number of bytes in data
861  * @param data content stored
862  * @param cont continuation called with success or failure status
863  * @param cont_cls continuation closure for @a cont
864  */
865 static void
866 postgres_plugin_remove_key (void *cls,
867                             const struct GNUNET_HashCode *key,
868                             uint32_t size,
869                             const void *data,
870                             PluginRemoveCont cont,
871                             void *cont_cls)
872 {
873   struct Plugin *plugin = cls;
874   enum GNUNET_DB_QueryStatus ret;
875   struct GNUNET_PQ_QueryParam params[] = {
876     GNUNET_PQ_query_param_auto_from_type (key),
877     GNUNET_PQ_query_param_fixed_size (data, size),
878     GNUNET_PQ_query_param_end
879   };
880
881   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
882                                             "remove",
883                                             params);
884   if (0 > ret)
885   {
886     cont (cont_cls,
887           key,
888           size,
889           GNUNET_SYSERR,
890           _ ("Postgress exec failure"));
891     return;
892   }
893   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
894   {
895     cont (cont_cls,
896           key,
897           size,
898           GNUNET_NO,
899           NULL);
900     return;
901   }
902   plugin->env->duc (plugin->env->cls,
903                     -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
904   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
905                    "datastore-postgres",
906                    "Deleted %u bytes from database\n",
907                    (unsigned int) size);
908   cont (cont_cls,
909         key,
910         size,
911         GNUNET_OK,
912         NULL);
913 }
914
915
916 /**
917  * Entry point for the plugin.
918  *
919  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
920  * @return our `struct Plugin *`
921  */
922 void *
923 libgnunet_plugin_datastore_postgres_init (void *cls)
924 {
925   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
926   struct GNUNET_DATASTORE_PluginFunctions *api;
927   struct Plugin *plugin;
928
929   plugin = GNUNET_new (struct Plugin);
930   plugin->env = env;
931   if (GNUNET_OK != init_connection (plugin))
932   {
933     GNUNET_free (plugin);
934     return NULL;
935   }
936   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
937   api->cls = plugin;
938   api->estimate_size = &postgres_plugin_estimate_size;
939   api->put = &postgres_plugin_put;
940   api->get_key = &postgres_plugin_get_key;
941   api->get_replication = &postgres_plugin_get_replication;
942   api->get_expiration = &postgres_plugin_get_expiration;
943   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
944   api->get_keys = &postgres_plugin_get_keys;
945   api->drop = &postgres_plugin_drop;
946   api->remove_key = &postgres_plugin_remove_key;
947   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
948                    "datastore-postgres",
949                    _ ("Postgres database running\n"));
950   return api;
951 }
952
953
954 /**
955  * Exit point from the plugin.
956  *
957  * @param cls our `struct Plugin *`
958  * @return always NULL
959  */
960 void *
961 libgnunet_plugin_datastore_postgres_done (void *cls)
962 {
963   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
964   struct Plugin *plugin = api->cls;
965
966   GNUNET_PQ_disconnect (plugin->dbh);
967   GNUNET_free (plugin);
968   GNUNET_free (api);
969   return NULL;
970 }
971
972
973 /* end of plugin_datastore_postgres.c */