error handling
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_datastore_plugin.h"
28 #include "gnunet_pq_lib.h"
29
30
31 /**
32  * After how many ms "busy" should a DB operation fail for good?
33  * A low value makes sure that we are more responsive to requests
34  * (especially PUTs).  A high value guarantees a higher success
35  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
36  *
37  * The default value of 1s should ensure that users do not experience
38  * huge latencies while at the same time allowing operations to succeed
39  * with reasonable probability.
40  */
41 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
42
43
44 /**
45  * Context for all functions in this plugin.
46  */
47 struct Plugin
48 {
49   /**
50    * Our execution environment.
51    */
52   struct GNUNET_DATASTORE_PluginEnvironment *env;
53
54   /**
55    * Native Postgres database handle.
56    */
57   struct GNUNET_PQ_Context *dbh;
58 };
59
60
61 /**
62  * @brief Get a database handle
63  *
64  * @param plugin global context
65  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
66  */
67 static int
68 init_connection (struct Plugin *plugin)
69 {
70   struct GNUNET_PQ_ExecuteStatement es[] = {
71     /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
72      * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
73      * we do math or inequality tests, so we can't handle the entire range of uint32_t.
74      * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
75      * PostgreSQL also recommends against using WITH OIDS.
76      */GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
77                             "  repl INTEGER NOT NULL DEFAULT 0,"
78                             "  type INTEGER NOT NULL DEFAULT 0,"
79                             "  prio INTEGER NOT NULL DEFAULT 0,"
80                             "  anonLevel INTEGER NOT NULL DEFAULT 0,"
81                             "  expire BIGINT NOT NULL DEFAULT 0,"
82                             "  rvalue BIGINT NOT NULL DEFAULT 0,"
83                             "  hash BYTEA NOT NULL DEFAULT '',"
84                             "  vhash BYTEA NOT NULL DEFAULT '',"
85                             "  value BYTEA NOT NULL DEFAULT '')"
86                             "WITH OIDS"),
87     GNUNET_PQ_make_try_execute (
88       "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
89     GNUNET_PQ_make_try_execute (
90       "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
91     GNUNET_PQ_make_try_execute (
92       "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
93     GNUNET_PQ_make_try_execute (
94       "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
95     GNUNET_PQ_make_try_execute (
96       "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
97     GNUNET_PQ_make_try_execute (
98       "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
99     GNUNET_PQ_make_try_execute (
100       "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
101     GNUNET_PQ_make_execute (
102       "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
103     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
104     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
105     GNUNET_PQ_EXECUTE_STATEMENT_END
106   };
107
108 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
109   struct GNUNET_PQ_PreparedStatement ps[] = {
110     GNUNET_PQ_make_prepare ("get",
111                             "SELECT " RESULT_COLUMNS " FROM gn090"
112                             " WHERE oid >= $1::bigint AND"
113                             " (rvalue >= $2 OR 0 = $3::smallint) AND"
114                             " (hash = $4 OR 0 = $5::smallint) AND"
115                             " (type = $6 OR 0 = $7::smallint)"
116                             " ORDER BY oid ASC LIMIT 1",
117                             7),
118     GNUNET_PQ_make_prepare ("put",
119                             "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
120                             "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
121                             9),
122     GNUNET_PQ_make_prepare ("update",
123                             "UPDATE gn090"
124                             " SET prio = prio + $1,"
125                             " repl = repl + $2,"
126                             " expire = GREATEST(expire, $3)"
127                             " WHERE hash = $4 AND vhash = $5",
128                             5),
129     GNUNET_PQ_make_prepare ("decrepl",
130                             "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
131                             "WHERE oid = $1",
132                             1),
133     GNUNET_PQ_make_prepare ("select_non_anonymous",
134                             "SELECT " RESULT_COLUMNS " FROM gn090 "
135                             "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
136                             "ORDER BY oid ASC LIMIT 1",
137                             2),
138     GNUNET_PQ_make_prepare ("select_expiration_order",
139                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
140                             "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
141                             "UNION "
142                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
143                             "ORDER BY prio ASC LIMIT 1) "
144                             "ORDER BY expire ASC LIMIT 1",
145                             1),
146     GNUNET_PQ_make_prepare ("select_replication_order",
147                             "SELECT " RESULT_COLUMNS " FROM gn090 "
148                             "ORDER BY repl DESC,RANDOM() LIMIT 1",
149                             0),
150     GNUNET_PQ_make_prepare ("delrow",
151                             "DELETE FROM gn090 "
152                             "WHERE oid=$1",
153                             1),
154     GNUNET_PQ_make_prepare ("remove",
155                             "DELETE FROM gn090"
156                             " WHERE hash = $1 AND"
157                             " value = $2",
158                             2),
159     GNUNET_PQ_make_prepare ("get_keys",
160                             "SELECT hash FROM gn090",
161                             0),
162     GNUNET_PQ_make_prepare ("estimate_size",
163                             "SELECT CASE WHEN NOT EXISTS"
164                             "  (SELECT 1 FROM gn090)"
165                             "  THEN 0"
166                             "  ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
167                             "END AS total",
168                             0),
169     GNUNET_PQ_PREPARED_STATEMENT_END
170   };
171 #undef RESULT_COLUMNS
172
173   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
174                                             "datastore-postgres",
175                                             NULL,
176                                             es,
177                                             ps);
178   if (NULL == plugin->dbh)
179     return GNUNET_SYSERR;
180   return GNUNET_OK;
181 }
182
183
184 /**
185  * Get an estimate of how much space the database is
186  * currently using.
187  *
188  * @param cls our `struct Plugin *`
189  * @return number of bytes used on disk
190  */
191 static void
192 postgres_plugin_estimate_size (void *cls,
193                                unsigned long long *estimate)
194 {
195   struct Plugin *plugin = cls;
196   uint64_t total;
197   struct GNUNET_PQ_QueryParam params[] = {
198     GNUNET_PQ_query_param_end
199   };
200   struct GNUNET_PQ_ResultSpec rs[] = {
201     GNUNET_PQ_result_spec_uint64 ("total",
202                                   &total),
203     GNUNET_PQ_result_spec_end
204   };
205   enum GNUNET_DB_QueryStatus ret;
206
207   if (NULL == estimate)
208     return;
209   ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
210                                                   "estimate_size",
211                                                   params,
212                                                   rs);
213   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
214   {
215     *estimate = 0LL;
216     return;
217   }
218   *estimate = total;
219 }
220
221
222 /**
223  * Store an item in the datastore.
224  *
225  * @param cls closure with the `struct Plugin`
226  * @param key key for the item
227  * @param absent true if the key was not found in the bloom filter
228  * @param size number of bytes in data
229  * @param data content stored
230  * @param type type of the content
231  * @param priority priority of the content
232  * @param anonymity anonymity-level for the content
233  * @param replication replication-level for the content
234  * @param expiration expiration time for the content
235  * @param cont continuation called with success or failure status
236  * @param cont_cls continuation closure
237  */
238 static void
239 postgres_plugin_put (void *cls,
240                      const struct GNUNET_HashCode *key,
241                      bool absent,
242                      uint32_t size,
243                      const void *data,
244                      enum GNUNET_BLOCK_Type type,
245                      uint32_t priority,
246                      uint32_t anonymity,
247                      uint32_t replication,
248                      struct GNUNET_TIME_Absolute expiration,
249                      PluginPutCont cont,
250                      void *cont_cls)
251 {
252   struct Plugin *plugin = cls;
253   struct GNUNET_HashCode vhash;
254   enum GNUNET_DB_QueryStatus ret;
255
256   GNUNET_CRYPTO_hash (data,
257                       size,
258                       &vhash);
259   if (! absent)
260   {
261     struct GNUNET_PQ_QueryParam params[] = {
262       GNUNET_PQ_query_param_uint32 (&priority),
263       GNUNET_PQ_query_param_uint32 (&replication),
264       GNUNET_PQ_query_param_absolute_time (&expiration),
265       GNUNET_PQ_query_param_auto_from_type (key),
266       GNUNET_PQ_query_param_auto_from_type (&vhash),
267       GNUNET_PQ_query_param_end
268     };
269     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
270                                               "update",
271                                               params);
272     if (0 > ret)
273     {
274       cont (cont_cls,
275             key,
276             size,
277             GNUNET_SYSERR,
278             _ ("Postgress exec failure"));
279       return;
280     }
281     bool affected = (0 != ret);
282     if (affected)
283     {
284       cont (cont_cls,
285             key,
286             size,
287             GNUNET_NO,
288             NULL);
289       return;
290     }
291   }
292
293   {
294     uint32_t utype = (uint32_t) type;
295     uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
296                                                 UINT64_MAX);
297     struct GNUNET_PQ_QueryParam params[] = {
298       GNUNET_PQ_query_param_uint32 (&replication),
299       GNUNET_PQ_query_param_uint32 (&utype),
300       GNUNET_PQ_query_param_uint32 (&priority),
301       GNUNET_PQ_query_param_uint32 (&anonymity),
302       GNUNET_PQ_query_param_absolute_time (&expiration),
303       GNUNET_PQ_query_param_uint64 (&rvalue),
304       GNUNET_PQ_query_param_auto_from_type (key),
305       GNUNET_PQ_query_param_auto_from_type (&vhash),
306       GNUNET_PQ_query_param_fixed_size (data, size),
307       GNUNET_PQ_query_param_end
308     };
309
310     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
311                                               "put",
312                                               params);
313     if (0 > ret)
314     {
315       cont (cont_cls,
316             key,
317             size,
318             GNUNET_SYSERR,
319             "Postgress exec failure");
320       return;
321     }
322   }
323   plugin->env->duc (plugin->env->cls,
324                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
325   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
326                    "datastore-postgres",
327                    "Stored %u bytes in database\n",
328                    (unsigned int) size);
329   cont (cont_cls,
330         key,
331         size,
332         GNUNET_OK,
333         NULL);
334 }
335
336
337 /**
338  * Closure for #process_result.
339  */
340 struct ProcessResultContext
341 {
342   /**
343    * The plugin handle.
344    */
345   struct Plugin *plugin;
346
347   /**
348    * Function to call on each result.
349    */
350   PluginDatumProcessor proc;
351
352   /**
353    * Closure for @e proc.
354    */
355   void *proc_cls;
356 };
357
358
359 /**
360  * Function invoked to process the result and call the processor of @a
361  * cls.
362  *
363  * @param cls our `struct ProcessResultContext`
364  * @param res result from exec
365  * @param num_results number of results in @a res
366  */
367 static void
368 process_result (void *cls,
369                 PGresult *res,
370                 unsigned int num_results)
371 {
372   struct ProcessResultContext *prc = cls;
373   struct Plugin *plugin = prc->plugin;
374
375   if (0 == num_results)
376   {
377     /* no result */
378     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
379                      "datastore-postgres",
380                      "Ending iteration (no more results)\n");
381     prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
382                GNUNET_TIME_UNIT_ZERO_ABS, 0);
383     return;
384   }
385   if (1 != num_results)
386   {
387     GNUNET_break (0);
388     prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
389                GNUNET_TIME_UNIT_ZERO_ABS, 0);
390     return;
391   }
392   /* Technically we don't need the loop here, but nicer in case
393      we ever relax the condition above. */
394   for (unsigned int i = 0; i < num_results; i++)
395   {
396     int iret;
397     uint32_t rowid;
398     uint32_t utype;
399     uint32_t anonymity;
400     uint32_t replication;
401     uint32_t priority;
402     size_t size;
403     void *data;
404     struct GNUNET_TIME_Absolute expiration_time;
405     struct GNUNET_HashCode key;
406     struct GNUNET_PQ_ResultSpec rs[] = {
407       GNUNET_PQ_result_spec_uint32 ("repl", &replication),
408       GNUNET_PQ_result_spec_uint32 ("type", &utype),
409       GNUNET_PQ_result_spec_uint32 ("prio", &priority),
410       GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
411       GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
412       GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
413       GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
414       GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
415       GNUNET_PQ_result_spec_end
416     };
417
418     if (GNUNET_OK !=
419         GNUNET_PQ_extract_result (res,
420                                   rs,
421                                   i))
422     {
423       GNUNET_break (0);
424       prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
425                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
426       return;
427     }
428
429     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
430                      "datastore-postgres",
431                      "Found result of size %u bytes and type %u in database\n",
432                      (unsigned int) size,
433                      (unsigned int) utype);
434     iret = prc->proc (prc->proc_cls,
435                       &key,
436                       size,
437                       data,
438                       (enum GNUNET_BLOCK_Type) utype,
439                       priority,
440                       anonymity,
441                       replication,
442                       expiration_time,
443                       rowid);
444     if (iret == GNUNET_NO)
445     {
446       struct GNUNET_PQ_QueryParam param[] = {
447         GNUNET_PQ_query_param_uint32 (&rowid),
448         GNUNET_PQ_query_param_end
449       };
450
451       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452                   "Processor asked for item %u to be removed.\n",
453                   (unsigned int) rowid);
454       if (0 <
455           GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
456                                               "delrow",
457                                               param))
458       {
459         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
460                          "datastore-postgres",
461                          "Deleting %u bytes from database\n",
462                          (unsigned int) size);
463         plugin->env->duc (plugin->env->cls,
464                           -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
465         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
466                          "datastore-postgres",
467                          "Deleted %u bytes from database\n",
468                          (unsigned int) size);
469       }
470     }
471     GNUNET_PQ_cleanup_result (rs);
472   }   /* for (i) */
473 }
474
475
476 /**
477  * Get one of the results for a particular key in the datastore.
478  *
479  * @param cls closure with the `struct Plugin`
480  * @param next_uid return the result with lowest uid >= next_uid
481  * @param random if true, return a random result instead of using next_uid
482  * @param key maybe NULL (to match all entries)
483  * @param type entries of which type are relevant?
484  *     Use 0 for any type.
485  * @param proc function to call on the matching value;
486  *        will be called with NULL if nothing matches
487  * @param proc_cls closure for @a proc
488  */
489 static void
490 postgres_plugin_get_key (void *cls,
491                          uint64_t next_uid,
492                          bool random,
493                          const struct GNUNET_HashCode *key,
494                          enum GNUNET_BLOCK_Type type,
495                          PluginDatumProcessor proc,
496                          void *proc_cls)
497 {
498   struct Plugin *plugin = cls;
499   uint32_t utype = type;
500   uint16_t use_rvalue = random;
501   uint16_t use_key = NULL != key;
502   uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
503   uint64_t rvalue;
504   struct GNUNET_PQ_QueryParam params[] = {
505     GNUNET_PQ_query_param_uint64 (&next_uid),
506     GNUNET_PQ_query_param_uint64 (&rvalue),
507     GNUNET_PQ_query_param_uint16 (&use_rvalue),
508     GNUNET_PQ_query_param_auto_from_type (key),
509     GNUNET_PQ_query_param_uint16 (&use_key),
510     GNUNET_PQ_query_param_uint32 (&utype),
511     GNUNET_PQ_query_param_uint16 (&use_type),
512     GNUNET_PQ_query_param_end
513   };
514   struct ProcessResultContext prc;
515   enum GNUNET_DB_QueryStatus res;
516
517   if (random)
518   {
519     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
520                                        UINT64_MAX);
521     next_uid = 0;
522   }
523   else
524   {
525     rvalue = 0;
526   }
527   prc.plugin = plugin;
528   prc.proc = proc;
529   prc.proc_cls = proc_cls;
530
531   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
532                                               "get",
533                                               params,
534                                               &process_result,
535                                               &prc);
536   if (0 > res)
537     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
538           GNUNET_TIME_UNIT_ZERO_ABS, 0);
539 }
540
541
542 /**
543  * Select a subset of the items in the datastore and call
544  * the given iterator for each of them.
545  *
546  * @param cls our `struct Plugin *`
547  * @param next_uid return the result with lowest uid >= next_uid
548  * @param type entries of which type should be considered?
549  *        Must not be zero (ANY).
550  * @param proc function to call on the matching value;
551  *        will be called with NULL if no value matches
552  * @param proc_cls closure for @a proc
553  */
554 static void
555 postgres_plugin_get_zero_anonymity (void *cls,
556                                     uint64_t next_uid,
557                                     enum GNUNET_BLOCK_Type type,
558                                     PluginDatumProcessor proc,
559                                     void *proc_cls)
560 {
561   struct Plugin *plugin = cls;
562   uint32_t utype = type;
563   struct GNUNET_PQ_QueryParam params[] = {
564     GNUNET_PQ_query_param_uint32 (&utype),
565     GNUNET_PQ_query_param_uint64 (&next_uid),
566     GNUNET_PQ_query_param_end
567   };
568   struct ProcessResultContext prc;
569   enum GNUNET_DB_QueryStatus res;
570
571   prc.plugin = plugin;
572   prc.proc = proc;
573   prc.proc_cls = proc_cls;
574   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
575                                               "select_non_anonymous",
576                                               params,
577                                               &process_result,
578                                               &prc);
579   if (0 > res)
580     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
581           GNUNET_TIME_UNIT_ZERO_ABS, 0);
582 }
583
584
585 /**
586  * Context for #repl_iter() function.
587  */
588 struct ReplCtx
589 {
590   /**
591    * Plugin handle.
592    */
593   struct Plugin *plugin;
594
595   /**
596    * Function to call for the result (or the NULL).
597    */
598   PluginDatumProcessor proc;
599
600   /**
601    * Closure for @e proc.
602    */
603   void *proc_cls;
604 };
605
606
607 /**
608  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
609  * Decrements the replication counter and calls the original
610  * iterator.
611  *
612  * @param cls closure with the `struct ReplCtx *`
613  * @param key key for the content
614  * @param size number of bytes in @a data
615  * @param data content stored
616  * @param type type of the content
617  * @param priority priority of the content
618  * @param anonymity anonymity-level for the content
619  * @param replication replication-level for the content
620  * @param expiration expiration time for the content
621  * @param uid unique identifier for the datum;
622  *        maybe 0 if no unique identifier is available
623  * @return #GNUNET_SYSERR to abort the iteration,
624  *         #GNUNET_OK to continue
625  *         (continue on call to "next", of course),
626  *         #GNUNET_NO to delete the item and continue (if supported)
627  */
628 static int
629 repl_proc (void *cls,
630            const struct GNUNET_HashCode *key,
631            uint32_t size,
632            const void *data,
633            enum GNUNET_BLOCK_Type type,
634            uint32_t priority,
635            uint32_t anonymity,
636            uint32_t replication,
637            struct GNUNET_TIME_Absolute expiration,
638            uint64_t uid)
639 {
640   struct ReplCtx *rc = cls;
641   struct Plugin *plugin = rc->plugin;
642   int ret;
643   uint32_t oid = (uint32_t) uid;
644   struct GNUNET_PQ_QueryParam params[] = {
645     GNUNET_PQ_query_param_uint32 (&oid),
646     GNUNET_PQ_query_param_end
647   };
648   enum GNUNET_DB_QueryStatus qret;
649
650   ret = rc->proc (rc->proc_cls,
651                   key,
652                   size,
653                   data,
654                   type,
655                   priority,
656                   anonymity,
657                   replication,
658                   expiration,
659                   uid);
660   if (NULL == key)
661     return ret;
662   qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
663                                              "decrepl",
664                                              params);
665   if (0 > qret)
666     return GNUNET_SYSERR;
667   return ret;
668 }
669
670
671 /**
672  * Get a random item for replication.  Returns a single, not expired,
673  * random item from those with the highest replication counters.  The
674  * item's replication counter is decremented by one IF it was positive
675  * before.  Call @a proc with all values ZERO or NULL if the datastore
676  * is empty.
677  *
678  * @param cls closure with the `struct Plugin`
679  * @param proc function to call the value (once only).
680  * @param proc_cls closure for @a proc
681  */
682 static void
683 postgres_plugin_get_replication (void *cls,
684                                  PluginDatumProcessor proc,
685                                  void *proc_cls)
686 {
687   struct Plugin *plugin = cls;
688   struct GNUNET_PQ_QueryParam params[] = {
689     GNUNET_PQ_query_param_end
690   };
691   struct ReplCtx rc;
692   struct ProcessResultContext prc;
693   enum GNUNET_DB_QueryStatus res;
694
695   rc.plugin = plugin;
696   rc.proc = proc;
697   rc.proc_cls = proc_cls;
698   prc.plugin = plugin;
699   prc.proc = &repl_proc;
700   prc.proc_cls = &rc;
701   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
702                                               "select_replication_order",
703                                               params,
704                                               &process_result,
705                                               &prc);
706   if (0 > res)
707     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
708           GNUNET_TIME_UNIT_ZERO_ABS, 0);
709 }
710
711
712 /**
713  * Get a random item for expiration.  Call @a proc with all values
714  * ZERO or NULL if the datastore is empty.
715  *
716  * @param cls closure with the `struct Plugin`
717  * @param proc function to call the value (once only).
718  * @param proc_cls closure for @a proc
719  */
720 static void
721 postgres_plugin_get_expiration (void *cls,
722                                 PluginDatumProcessor proc,
723                                 void *proc_cls)
724 {
725   struct Plugin *plugin = cls;
726   struct GNUNET_TIME_Absolute now;
727   struct GNUNET_PQ_QueryParam params[] = {
728     GNUNET_PQ_query_param_absolute_time (&now),
729     GNUNET_PQ_query_param_end
730   };
731   struct ProcessResultContext prc;
732
733   now = GNUNET_TIME_absolute_get ();
734   prc.plugin = plugin;
735   prc.proc = proc;
736   prc.proc_cls = proc_cls;
737   (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
738                                                "select_expiration_order",
739                                                params,
740                                                &process_result,
741                                                &prc);
742 }
743
744
745 /**
746  * Closure for #process_keys.
747  */
748 struct ProcessKeysContext
749 {
750   /**
751    * Function to call for each key.
752    */
753   PluginKeyProcessor proc;
754
755   /**
756    * Closure for @e proc.
757    */
758   void *proc_cls;
759 };
760
761
762 /**
763  * Function to be called with the results of a SELECT statement
764  * that has returned @a num_results results.
765  *
766  * @param cls closure with a `struct ProcessKeysContext`
767  * @param result the postgres result
768  * @param num_result the number of results in @a result
769  */
770 static void
771 process_keys (void *cls,
772               PGresult *result,
773               unsigned int num_results)
774 {
775   struct ProcessKeysContext *pkc = cls;
776
777   for (unsigned i = 0; i < num_results; i++)
778   {
779     struct GNUNET_HashCode key;
780     struct GNUNET_PQ_ResultSpec rs[] = {
781       GNUNET_PQ_result_spec_auto_from_type ("hash",
782                                             &key),
783       GNUNET_PQ_result_spec_end
784     };
785
786     if (GNUNET_OK !=
787         GNUNET_PQ_extract_result (result,
788                                   rs,
789                                   i))
790     {
791       GNUNET_break (0);
792       continue;
793     }
794     pkc->proc (pkc->proc_cls,
795                &key,
796                1);
797     GNUNET_PQ_cleanup_result (rs);
798   }
799 }
800
801
802 /**
803  * Get all of the keys in the datastore.
804  *
805  * @param cls closure with the `struct Plugin *`
806  * @param proc function to call on each key
807  * @param proc_cls closure for @a proc
808  */
809 static void
810 postgres_plugin_get_keys (void *cls,
811                           PluginKeyProcessor proc,
812                           void *proc_cls)
813 {
814   struct Plugin *plugin = cls;
815   struct GNUNET_PQ_QueryParam params[] = {
816     GNUNET_PQ_query_param_end
817   };
818   struct ProcessKeysContext pkc;
819
820   pkc.proc = proc;
821   pkc.proc_cls = proc_cls;
822   (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
823                                                "get_keys",
824                                                params,
825                                                &process_keys,
826                                                &pkc);
827   proc (proc_cls,
828         NULL,
829         0);
830 }
831
832
833 /**
834  * Drop database.
835  *
836  * @param cls closure with the `struct Plugin *`
837  */
838 static void
839 postgres_plugin_drop (void *cls)
840 {
841   struct Plugin *plugin = cls;
842   struct GNUNET_PQ_ExecuteStatement es[] = {
843     GNUNET_PQ_make_execute ("DROP TABLE gn090"),
844     GNUNET_PQ_EXECUTE_STATEMENT_END
845   };
846
847   if (GNUNET_OK !=
848       GNUNET_PQ_exec_statements (plugin->dbh,
849                                  es))
850     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
851                      "postgres",
852                      _ ("Failed to drop table from database.\n"));
853 }
854
855
856 /**
857  * Remove a particular key in the datastore.
858  *
859  * @param cls closure
860  * @param key key for the content
861  * @param size number of bytes in data
862  * @param data content stored
863  * @param cont continuation called with success or failure status
864  * @param cont_cls continuation closure for @a cont
865  */
866 static void
867 postgres_plugin_remove_key (void *cls,
868                             const struct GNUNET_HashCode *key,
869                             uint32_t size,
870                             const void *data,
871                             PluginRemoveCont cont,
872                             void *cont_cls)
873 {
874   struct Plugin *plugin = cls;
875   enum GNUNET_DB_QueryStatus ret;
876   struct GNUNET_PQ_QueryParam params[] = {
877     GNUNET_PQ_query_param_auto_from_type (key),
878     GNUNET_PQ_query_param_fixed_size (data, size),
879     GNUNET_PQ_query_param_end
880   };
881
882   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
883                                             "remove",
884                                             params);
885   if (0 > ret)
886   {
887     cont (cont_cls,
888           key,
889           size,
890           GNUNET_SYSERR,
891           _ ("Postgress exec failure"));
892     return;
893   }
894   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
895   {
896     cont (cont_cls,
897           key,
898           size,
899           GNUNET_NO,
900           NULL);
901     return;
902   }
903   plugin->env->duc (plugin->env->cls,
904                     -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
905   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
906                    "datastore-postgres",
907                    "Deleted %u bytes from database\n",
908                    (unsigned int) size);
909   cont (cont_cls,
910         key,
911         size,
912         GNUNET_OK,
913         NULL);
914 }
915
916
917 /**
918  * Entry point for the plugin.
919  *
920  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
921  * @return our `struct Plugin *`
922  */
923 void *
924 libgnunet_plugin_datastore_postgres_init (void *cls)
925 {
926   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
927   struct GNUNET_DATASTORE_PluginFunctions *api;
928   struct Plugin *plugin;
929
930   plugin = GNUNET_new (struct Plugin);
931   plugin->env = env;
932   if (GNUNET_OK != init_connection (plugin))
933   {
934     GNUNET_free (plugin);
935     return NULL;
936   }
937   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
938   api->cls = plugin;
939   api->estimate_size = &postgres_plugin_estimate_size;
940   api->put = &postgres_plugin_put;
941   api->get_key = &postgres_plugin_get_key;
942   api->get_replication = &postgres_plugin_get_replication;
943   api->get_expiration = &postgres_plugin_get_expiration;
944   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
945   api->get_keys = &postgres_plugin_get_keys;
946   api->drop = &postgres_plugin_drop;
947   api->remove_key = &postgres_plugin_remove_key;
948   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
949                    "datastore-postgres",
950                    _ ("Postgres database running\n"));
951   return api;
952 }
953
954
955 /**
956  * Exit point from the plugin.
957  *
958  * @param cls our `struct Plugin *`
959  * @return always NULL
960  */
961 void *
962 libgnunet_plugin_datastore_postgres_done (void *cls)
963 {
964   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
965   struct Plugin *plugin = api->cls;
966
967   GNUNET_PQ_disconnect (plugin->dbh);
968   GNUNET_free (plugin);
969   GNUNET_free (api);
970   return NULL;
971 }
972
973
974 /* end of plugin_datastore_postgres.c */