uncrustify as demanded.
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_datastore_plugin.h"
28 #include "gnunet_pq_lib.h"
29
30
31 /**
32  * After how many ms "busy" should a DB operation fail for good?
33  * A low value makes sure that we are more responsive to requests
34  * (especially PUTs).  A high value guarantees a higher success
35  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
36  *
37  * The default value of 1s should ensure that users do not experience
38  * huge latencies while at the same time allowing operations to succeed
39  * with reasonable probability.
40  */
41 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
42
43
44 /**
45  * Context for all functions in this plugin.
46  */
47 struct Plugin {
48   /**
49    * Our execution environment.
50    */
51   struct GNUNET_DATASTORE_PluginEnvironment *env;
52
53   /**
54    * Native Postgres database handle.
55    */
56   PGconn *dbh;
57 };
58
59
60 /**
61  * @brief Get a database handle
62  *
63  * @param plugin global context
64  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
65  */
66 static int
67 init_connection(struct Plugin *plugin)
68 {
69   struct GNUNET_PQ_ExecuteStatement es[] = {
70     /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
71      * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
72      * we do math or inequality tests, so we can't handle the entire range of uint32_t.
73      * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
74      * PostgreSQL also recommends against using WITH OIDS.
75      */
76     GNUNET_PQ_make_execute("CREATE TABLE IF NOT EXISTS gn090 ("
77                            "  repl INTEGER NOT NULL DEFAULT 0,"
78                            "  type INTEGER NOT NULL DEFAULT 0,"
79                            "  prio INTEGER NOT NULL DEFAULT 0,"
80                            "  anonLevel INTEGER NOT NULL DEFAULT 0,"
81                            "  expire BIGINT NOT NULL DEFAULT 0,"
82                            "  rvalue BIGINT NOT NULL DEFAULT 0,"
83                            "  hash BYTEA NOT NULL DEFAULT '',"
84                            "  vhash BYTEA NOT NULL DEFAULT '',"
85                            "  value BYTEA NOT NULL DEFAULT '')"
86                            "WITH OIDS"),
87     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
88     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
89     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
90     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
91     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
92     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
93     GNUNET_PQ_make_try_execute("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
94     GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
95     GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
96     GNUNET_PQ_make_execute("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
97     GNUNET_PQ_EXECUTE_STATEMENT_END
98   };
99
100 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
101   struct GNUNET_PQ_PreparedStatement ps[] = {
102     GNUNET_PQ_make_prepare("get",
103                            "SELECT " RESULT_COLUMNS " FROM gn090"
104                            " WHERE oid >= $1::bigint AND"
105                            " (rvalue >= $2 OR 0 = $3::smallint) AND"
106                            " (hash = $4 OR 0 = $5::smallint) AND"
107                            " (type = $6 OR 0 = $7::smallint)"
108                            " ORDER BY oid ASC LIMIT 1",
109                            7),
110     GNUNET_PQ_make_prepare("put",
111                            "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
112                            "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
113                            9),
114     GNUNET_PQ_make_prepare("update",
115                            "UPDATE gn090"
116                            " SET prio = prio + $1,"
117                            " repl = repl + $2,"
118                            " expire = GREATEST(expire, $3)"
119                            " WHERE hash = $4 AND vhash = $5",
120                            5),
121     GNUNET_PQ_make_prepare("decrepl",
122                            "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
123                            "WHERE oid = $1",
124                            1),
125     GNUNET_PQ_make_prepare("select_non_anonymous",
126                            "SELECT " RESULT_COLUMNS " FROM gn090 "
127                            "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
128                            "ORDER BY oid ASC LIMIT 1",
129                            2),
130     GNUNET_PQ_make_prepare("select_expiration_order",
131                            "(SELECT " RESULT_COLUMNS " FROM gn090 "
132                            "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
133                            "UNION "
134                            "(SELECT " RESULT_COLUMNS " FROM gn090 "
135                            "ORDER BY prio ASC LIMIT 1) "
136                            "ORDER BY expire ASC LIMIT 1",
137                            1),
138     GNUNET_PQ_make_prepare("select_replication_order",
139                            "SELECT " RESULT_COLUMNS " FROM gn090 "
140                            "ORDER BY repl DESC,RANDOM() LIMIT 1",
141                            0),
142     GNUNET_PQ_make_prepare("delrow",
143                            "DELETE FROM gn090 "
144                            "WHERE oid=$1",
145                            1),
146     GNUNET_PQ_make_prepare("remove",
147                            "DELETE FROM gn090"
148                            " WHERE hash = $1 AND"
149                            " value = $2",
150                            2),
151     GNUNET_PQ_make_prepare("get_keys",
152                            "SELECT hash FROM gn090",
153                            0),
154     GNUNET_PQ_make_prepare("estimate_size",
155                            "SELECT CASE WHEN NOT EXISTS"
156                            "  (SELECT 1 FROM gn090)"
157                            "  THEN 0"
158                            "  ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
159                            "END AS total",
160                            0),
161     GNUNET_PQ_PREPARED_STATEMENT_END
162   };
163 #undef RESULT_COLUMNS
164
165   plugin->dbh = GNUNET_PQ_connect_with_cfg(plugin->env->cfg,
166                                            "datastore-postgres");
167   if (NULL == plugin->dbh)
168     return GNUNET_SYSERR;
169
170   if ((GNUNET_OK !=
171        GNUNET_PQ_exec_statements(plugin->dbh,
172                                  es)) ||
173       (GNUNET_OK !=
174        GNUNET_PQ_prepare_statements(plugin->dbh,
175                                     ps)))
176     {
177       PQfinish(plugin->dbh);
178       plugin->dbh = NULL;
179       return GNUNET_SYSERR;
180     }
181   return GNUNET_OK;
182 }
183
184
185 /**
186  * Get an estimate of how much space the database is
187  * currently using.
188  *
189  * @param cls our `struct Plugin *`
190  * @return number of bytes used on disk
191  */
192 static void
193 postgres_plugin_estimate_size(void *cls,
194                               unsigned long long *estimate)
195 {
196   struct Plugin *plugin = cls;
197   uint64_t total;
198   struct GNUNET_PQ_QueryParam params[] = {
199     GNUNET_PQ_query_param_end
200   };
201   struct GNUNET_PQ_ResultSpec rs[] = {
202     GNUNET_PQ_result_spec_uint64("total",
203                                  &total),
204     GNUNET_PQ_result_spec_end
205   };
206   enum GNUNET_DB_QueryStatus ret;
207
208   if (NULL == estimate)
209     return;
210   ret = GNUNET_PQ_eval_prepared_singleton_select(plugin->dbh,
211                                                  "estimate_size",
212                                                  params,
213                                                  rs);
214   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
215     {
216       *estimate = 0LL;
217       return;
218     }
219   *estimate = total;
220 }
221
222
223 /**
224  * Store an item in the datastore.
225  *
226  * @param cls closure with the `struct Plugin`
227  * @param key key for the item
228  * @param absent true if the key was not found in the bloom filter
229  * @param size number of bytes in data
230  * @param data content stored
231  * @param type type of the content
232  * @param priority priority of the content
233  * @param anonymity anonymity-level for the content
234  * @param replication replication-level for the content
235  * @param expiration expiration time for the content
236  * @param cont continuation called with success or failure status
237  * @param cont_cls continuation closure
238  */
239 static void
240 postgres_plugin_put(void *cls,
241                     const struct GNUNET_HashCode *key,
242                     bool absent,
243                     uint32_t size,
244                     const void *data,
245                     enum GNUNET_BLOCK_Type type,
246                     uint32_t priority,
247                     uint32_t anonymity,
248                     uint32_t replication,
249                     struct GNUNET_TIME_Absolute expiration,
250                     PluginPutCont cont,
251                     void *cont_cls)
252 {
253   struct Plugin *plugin = cls;
254   struct GNUNET_HashCode vhash;
255   enum GNUNET_DB_QueryStatus ret;
256
257   GNUNET_CRYPTO_hash(data,
258                      size,
259                      &vhash);
260   if (!absent)
261     {
262       struct GNUNET_PQ_QueryParam params[] = {
263         GNUNET_PQ_query_param_uint32(&priority),
264         GNUNET_PQ_query_param_uint32(&replication),
265         GNUNET_PQ_query_param_absolute_time(&expiration),
266         GNUNET_PQ_query_param_auto_from_type(key),
267         GNUNET_PQ_query_param_auto_from_type(&vhash),
268         GNUNET_PQ_query_param_end
269       };
270       ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
271                                                "update",
272                                                params);
273       if (0 > ret)
274         {
275           cont(cont_cls,
276                key,
277                size,
278                GNUNET_SYSERR,
279                _("Postgress exec failure"));
280           return;
281         }
282       bool affected = (0 != ret);
283       if (affected)
284         {
285           cont(cont_cls,
286                key,
287                size,
288                GNUNET_NO,
289                NULL);
290           return;
291         }
292     }
293
294   {
295     uint32_t utype = (uint32_t)type;
296     uint64_t rvalue = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
297                                                UINT64_MAX);
298     struct GNUNET_PQ_QueryParam params[] = {
299       GNUNET_PQ_query_param_uint32(&replication),
300       GNUNET_PQ_query_param_uint32(&utype),
301       GNUNET_PQ_query_param_uint32(&priority),
302       GNUNET_PQ_query_param_uint32(&anonymity),
303       GNUNET_PQ_query_param_absolute_time(&expiration),
304       GNUNET_PQ_query_param_uint64(&rvalue),
305       GNUNET_PQ_query_param_auto_from_type(key),
306       GNUNET_PQ_query_param_auto_from_type(&vhash),
307       GNUNET_PQ_query_param_fixed_size(data, size),
308       GNUNET_PQ_query_param_end
309     };
310
311     ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
312                                              "put",
313                                              params);
314     if (0 > ret)
315       {
316         cont(cont_cls,
317              key,
318              size,
319              GNUNET_SYSERR,
320              "Postgress exec failure");
321         return;
322       }
323   }
324   plugin->env->duc(plugin->env->cls,
325                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
326   GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
327                   "datastore-postgres",
328                   "Stored %u bytes in database\n",
329                   (unsigned int)size);
330   cont(cont_cls,
331        key,
332        size,
333        GNUNET_OK,
334        NULL);
335 }
336
337
338 /**
339  * Closure for #process_result.
340  */
341 struct ProcessResultContext {
342   /**
343    * The plugin handle.
344    */
345   struct Plugin *plugin;
346
347   /**
348    * Function to call on each result.
349    */
350   PluginDatumProcessor proc;
351
352   /**
353    * Closure for @e proc.
354    */
355   void *proc_cls;
356 };
357
358
359 /**
360  * Function invoked to process the result and call the processor of @a
361  * cls.
362  *
363  * @param cls our `struct ProcessResultContext`
364  * @param res result from exec
365  * @param num_results number of results in @a res
366  */
367 static void
368 process_result(void *cls,
369                PGresult *res,
370                unsigned int num_results)
371 {
372   struct ProcessResultContext *prc = cls;
373   struct Plugin *plugin = prc->plugin;
374
375   if (0 == num_results)
376     {
377       /* no result */
378       GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
379                       "datastore-postgres",
380                       "Ending iteration (no more results)\n");
381       prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
382                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
383       return;
384     }
385   if (1 != num_results)
386     {
387       GNUNET_break(0);
388       prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
389                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
390       return;
391     }
392   /* Technically we don't need the loop here, but nicer in case
393      we ever relax the condition above. */
394   for (unsigned int i = 0; i < num_results; i++)
395     {
396       int iret;
397       uint32_t rowid;
398       uint32_t utype;
399       uint32_t anonymity;
400       uint32_t replication;
401       uint32_t priority;
402       size_t size;
403       void *data;
404       struct GNUNET_TIME_Absolute expiration_time;
405       struct GNUNET_HashCode key;
406       struct GNUNET_PQ_ResultSpec rs[] = {
407         GNUNET_PQ_result_spec_uint32("repl", &replication),
408         GNUNET_PQ_result_spec_uint32("type", &utype),
409         GNUNET_PQ_result_spec_uint32("prio", &priority),
410         GNUNET_PQ_result_spec_uint32("anonLevel", &anonymity),
411         GNUNET_PQ_result_spec_absolute_time("expire", &expiration_time),
412         GNUNET_PQ_result_spec_auto_from_type("hash", &key),
413         GNUNET_PQ_result_spec_variable_size("value", &data, &size),
414         GNUNET_PQ_result_spec_uint32("oid", &rowid),
415         GNUNET_PQ_result_spec_end
416       };
417
418       if (GNUNET_OK !=
419           GNUNET_PQ_extract_result(res,
420                                    rs,
421                                    i))
422         {
423           GNUNET_break(0);
424           prc->proc(prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
425                     GNUNET_TIME_UNIT_ZERO_ABS, 0);
426           return;
427         }
428
429       GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
430                       "datastore-postgres",
431                       "Found result of size %u bytes and type %u in database\n",
432                       (unsigned int)size,
433                       (unsigned int)utype);
434       iret = prc->proc(prc->proc_cls,
435                        &key,
436                        size,
437                        data,
438                        (enum GNUNET_BLOCK_Type)utype,
439                        priority,
440                        anonymity,
441                        replication,
442                        expiration_time,
443                        rowid);
444       if (iret == GNUNET_NO)
445         {
446           struct GNUNET_PQ_QueryParam param[] = {
447             GNUNET_PQ_query_param_uint32(&rowid),
448             GNUNET_PQ_query_param_end
449           };
450
451           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
452                      "Processor asked for item %u to be removed.\n",
453                      (unsigned int)rowid);
454           if (0 <
455               GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
456                                                  "delrow",
457                                                  param))
458             {
459               GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
460                               "datastore-postgres",
461                               "Deleting %u bytes from database\n",
462                               (unsigned int)size);
463               plugin->env->duc(plugin->env->cls,
464                                -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
465               GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
466                               "datastore-postgres",
467                               "Deleted %u bytes from database\n",
468                               (unsigned int)size);
469             }
470         }
471       GNUNET_PQ_cleanup_result(rs);
472     } /* for (i) */
473 }
474
475
476 /**
477  * Get one of the results for a particular key in the datastore.
478  *
479  * @param cls closure with the `struct Plugin`
480  * @param next_uid return the result with lowest uid >= next_uid
481  * @param random if true, return a random result instead of using next_uid
482  * @param key maybe NULL (to match all entries)
483  * @param type entries of which type are relevant?
484  *     Use 0 for any type.
485  * @param proc function to call on the matching value;
486  *        will be called with NULL if nothing matches
487  * @param proc_cls closure for @a proc
488  */
489 static void
490 postgres_plugin_get_key(void *cls,
491                         uint64_t next_uid,
492                         bool random,
493                         const struct GNUNET_HashCode *key,
494                         enum GNUNET_BLOCK_Type type,
495                         PluginDatumProcessor proc,
496                         void *proc_cls)
497 {
498   struct Plugin *plugin = cls;
499   uint32_t utype = type;
500   uint16_t use_rvalue = random;
501   uint16_t use_key = NULL != key;
502   uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
503   uint64_t rvalue;
504   struct GNUNET_PQ_QueryParam params[] = {
505     GNUNET_PQ_query_param_uint64(&next_uid),
506     GNUNET_PQ_query_param_uint64(&rvalue),
507     GNUNET_PQ_query_param_uint16(&use_rvalue),
508     GNUNET_PQ_query_param_auto_from_type(key),
509     GNUNET_PQ_query_param_uint16(&use_key),
510     GNUNET_PQ_query_param_uint32(&utype),
511     GNUNET_PQ_query_param_uint16(&use_type),
512     GNUNET_PQ_query_param_end
513   };
514   struct ProcessResultContext prc;
515   enum GNUNET_DB_QueryStatus res;
516
517   if (random)
518     {
519       rvalue = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
520                                         UINT64_MAX);
521       next_uid = 0;
522     }
523   else
524     {
525       rvalue = 0;
526     }
527   prc.plugin = plugin;
528   prc.proc = proc;
529   prc.proc_cls = proc_cls;
530
531   res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
532                                              "get",
533                                              params,
534                                              &process_result,
535                                              &prc);
536   if (0 > res)
537     proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
538          GNUNET_TIME_UNIT_ZERO_ABS, 0);
539 }
540
541
542 /**
543  * Select a subset of the items in the datastore and call
544  * the given iterator for each of them.
545  *
546  * @param cls our `struct Plugin *`
547  * @param next_uid return the result with lowest uid >= next_uid
548  * @param type entries of which type should be considered?
549  *        Must not be zero (ANY).
550  * @param proc function to call on the matching value;
551  *        will be called with NULL if no value matches
552  * @param proc_cls closure for @a proc
553  */
554 static void
555 postgres_plugin_get_zero_anonymity(void *cls,
556                                    uint64_t next_uid,
557                                    enum GNUNET_BLOCK_Type type,
558                                    PluginDatumProcessor proc,
559                                    void *proc_cls)
560 {
561   struct Plugin *plugin = cls;
562   uint32_t utype = type;
563   struct GNUNET_PQ_QueryParam params[] = {
564     GNUNET_PQ_query_param_uint32(&utype),
565     GNUNET_PQ_query_param_uint64(&next_uid),
566     GNUNET_PQ_query_param_end
567   };
568   struct ProcessResultContext prc;
569   enum GNUNET_DB_QueryStatus res;
570
571   prc.plugin = plugin;
572   prc.proc = proc;
573   prc.proc_cls = proc_cls;
574   res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
575                                              "select_non_anonymous",
576                                              params,
577                                              &process_result,
578                                              &prc);
579   if (0 > res)
580     proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
581          GNUNET_TIME_UNIT_ZERO_ABS, 0);
582 }
583
584
585 /**
586  * Context for #repl_iter() function.
587  */
588 struct ReplCtx {
589   /**
590    * Plugin handle.
591    */
592   struct Plugin *plugin;
593
594   /**
595    * Function to call for the result (or the NULL).
596    */
597   PluginDatumProcessor proc;
598
599   /**
600    * Closure for @e proc.
601    */
602   void *proc_cls;
603 };
604
605
606 /**
607  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
608  * Decrements the replication counter and calls the original
609  * iterator.
610  *
611  * @param cls closure with the `struct ReplCtx *`
612  * @param key key for the content
613  * @param size number of bytes in @a data
614  * @param data content stored
615  * @param type type of the content
616  * @param priority priority of the content
617  * @param anonymity anonymity-level for the content
618  * @param replication replication-level for the content
619  * @param expiration expiration time for the content
620  * @param uid unique identifier for the datum;
621  *        maybe 0 if no unique identifier is available
622  * @return #GNUNET_SYSERR to abort the iteration,
623  *         #GNUNET_OK to continue
624  *         (continue on call to "next", of course),
625  *         #GNUNET_NO to delete the item and continue (if supported)
626  */
627 static int
628 repl_proc(void *cls,
629           const struct GNUNET_HashCode *key,
630           uint32_t size,
631           const void *data,
632           enum GNUNET_BLOCK_Type type,
633           uint32_t priority,
634           uint32_t anonymity,
635           uint32_t replication,
636           struct GNUNET_TIME_Absolute expiration,
637           uint64_t uid)
638 {
639   struct ReplCtx *rc = cls;
640   struct Plugin *plugin = rc->plugin;
641   int ret;
642   uint32_t oid = (uint32_t)uid;
643   struct GNUNET_PQ_QueryParam params[] = {
644     GNUNET_PQ_query_param_uint32(&oid),
645     GNUNET_PQ_query_param_end
646   };
647   enum GNUNET_DB_QueryStatus qret;
648
649   ret = rc->proc(rc->proc_cls,
650                  key,
651                  size,
652                  data,
653                  type,
654                  priority,
655                  anonymity,
656                  replication,
657                  expiration,
658                  uid);
659   if (NULL == key)
660     return ret;
661   qret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
662                                             "decrepl",
663                                             params);
664   if (0 > qret)
665     return GNUNET_SYSERR;
666   return ret;
667 }
668
669
670 /**
671  * Get a random item for replication.  Returns a single, not expired,
672  * random item from those with the highest replication counters.  The
673  * item's replication counter is decremented by one IF it was positive
674  * before.  Call @a proc with all values ZERO or NULL if the datastore
675  * is empty.
676  *
677  * @param cls closure with the `struct Plugin`
678  * @param proc function to call the value (once only).
679  * @param proc_cls closure for @a proc
680  */
681 static void
682 postgres_plugin_get_replication(void *cls,
683                                 PluginDatumProcessor proc,
684                                 void *proc_cls)
685 {
686   struct Plugin *plugin = cls;
687   struct GNUNET_PQ_QueryParam params[] = {
688     GNUNET_PQ_query_param_end
689   };
690   struct ReplCtx rc;
691   struct ProcessResultContext prc;
692   enum GNUNET_DB_QueryStatus res;
693
694   rc.plugin = plugin;
695   rc.proc = proc;
696   rc.proc_cls = proc_cls;
697   prc.plugin = plugin;
698   prc.proc = &repl_proc;
699   prc.proc_cls = &rc;
700   res = GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
701                                              "select_replication_order",
702                                              params,
703                                              &process_result,
704                                              &prc);
705   if (0 > res)
706     proc(proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
707          GNUNET_TIME_UNIT_ZERO_ABS, 0);
708 }
709
710
711 /**
712  * Get a random item for expiration.  Call @a proc with all values
713  * ZERO or NULL if the datastore is empty.
714  *
715  * @param cls closure with the `struct Plugin`
716  * @param proc function to call the value (once only).
717  * @param proc_cls closure for @a proc
718  */
719 static void
720 postgres_plugin_get_expiration(void *cls,
721                                PluginDatumProcessor proc,
722                                void *proc_cls)
723 {
724   struct Plugin *plugin = cls;
725   struct GNUNET_TIME_Absolute now;
726   struct GNUNET_PQ_QueryParam params[] = {
727     GNUNET_PQ_query_param_absolute_time(&now),
728     GNUNET_PQ_query_param_end
729   };
730   struct ProcessResultContext prc;
731
732   now = GNUNET_TIME_absolute_get();
733   prc.plugin = plugin;
734   prc.proc = proc;
735   prc.proc_cls = proc_cls;
736   (void)GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
737                                              "select_expiration_order",
738                                              params,
739                                              &process_result,
740                                              &prc);
741 }
742
743
744 /**
745  * Closure for #process_keys.
746  */
747 struct ProcessKeysContext {
748   /**
749    * Function to call for each key.
750    */
751   PluginKeyProcessor proc;
752
753   /**
754    * Closure for @e proc.
755    */
756   void *proc_cls;
757 };
758
759
760 /**
761  * Function to be called with the results of a SELECT statement
762  * that has returned @a num_results results.
763  *
764  * @param cls closure with a `struct ProcessKeysContext`
765  * @param result the postgres result
766  * @param num_result the number of results in @a result
767  */
768 static void
769 process_keys(void *cls,
770              PGresult *result,
771              unsigned int num_results)
772 {
773   struct ProcessKeysContext *pkc = cls;
774
775   for (unsigned i = 0; i < num_results; i++)
776     {
777       struct GNUNET_HashCode key;
778       struct GNUNET_PQ_ResultSpec rs[] = {
779         GNUNET_PQ_result_spec_auto_from_type("hash",
780                                              &key),
781         GNUNET_PQ_result_spec_end
782       };
783
784       if (GNUNET_OK !=
785           GNUNET_PQ_extract_result(result,
786                                    rs,
787                                    i))
788         {
789           GNUNET_break(0);
790           continue;
791         }
792       pkc->proc(pkc->proc_cls,
793                 &key,
794                 1);
795       GNUNET_PQ_cleanup_result(rs);
796     }
797 }
798
799
800 /**
801  * Get all of the keys in the datastore.
802  *
803  * @param cls closure with the `struct Plugin *`
804  * @param proc function to call on each key
805  * @param proc_cls closure for @a proc
806  */
807 static void
808 postgres_plugin_get_keys(void *cls,
809                          PluginKeyProcessor proc,
810                          void *proc_cls)
811 {
812   struct Plugin *plugin = cls;
813   struct GNUNET_PQ_QueryParam params[] = {
814     GNUNET_PQ_query_param_end
815   };
816   struct ProcessKeysContext pkc;
817
818   pkc.proc = proc;
819   pkc.proc_cls = proc_cls;
820   (void)GNUNET_PQ_eval_prepared_multi_select(plugin->dbh,
821                                              "get_keys",
822                                              params,
823                                              &process_keys,
824                                              &pkc);
825   proc(proc_cls,
826        NULL,
827        0);
828 }
829
830
831 /**
832  * Drop database.
833  *
834  * @param cls closure with the `struct Plugin *`
835  */
836 static void
837 postgres_plugin_drop(void *cls)
838 {
839   struct Plugin *plugin = cls;
840   struct GNUNET_PQ_ExecuteStatement es[] = {
841     GNUNET_PQ_make_execute("DROP TABLE gn090"),
842     GNUNET_PQ_EXECUTE_STATEMENT_END
843   };
844
845   if (GNUNET_OK !=
846       GNUNET_PQ_exec_statements(plugin->dbh,
847                                 es))
848     GNUNET_log_from(GNUNET_ERROR_TYPE_WARNING,
849                     "postgres",
850                     _("Failed to drop table from database.\n"));
851 }
852
853
854 /**
855  * Remove a particular key in the datastore.
856  *
857  * @param cls closure
858  * @param key key for the content
859  * @param size number of bytes in data
860  * @param data content stored
861  * @param cont continuation called with success or failure status
862  * @param cont_cls continuation closure for @a cont
863  */
864 static void
865 postgres_plugin_remove_key(void *cls,
866                            const struct GNUNET_HashCode *key,
867                            uint32_t size,
868                            const void *data,
869                            PluginRemoveCont cont,
870                            void *cont_cls)
871 {
872   struct Plugin *plugin = cls;
873   enum GNUNET_DB_QueryStatus ret;
874   struct GNUNET_PQ_QueryParam params[] = {
875     GNUNET_PQ_query_param_auto_from_type(key),
876     GNUNET_PQ_query_param_fixed_size(data, size),
877     GNUNET_PQ_query_param_end
878   };
879
880   ret = GNUNET_PQ_eval_prepared_non_select(plugin->dbh,
881                                            "remove",
882                                            params);
883   if (0 > ret)
884     {
885       cont(cont_cls,
886            key,
887            size,
888            GNUNET_SYSERR,
889            _("Postgress exec failure"));
890       return;
891     }
892   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
893     {
894       cont(cont_cls,
895            key,
896            size,
897            GNUNET_NO,
898            NULL);
899       return;
900     }
901   plugin->env->duc(plugin->env->cls,
902                    -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
903   GNUNET_log_from(GNUNET_ERROR_TYPE_DEBUG,
904                   "datastore-postgres",
905                   "Deleted %u bytes from database\n",
906                   (unsigned int)size);
907   cont(cont_cls,
908        key,
909        size,
910        GNUNET_OK,
911        NULL);
912 }
913
914
915 /**
916  * Entry point for the plugin.
917  *
918  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
919  * @return our `struct Plugin *`
920  */
921 void *
922 libgnunet_plugin_datastore_postgres_init(void *cls)
923 {
924   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
925   struct GNUNET_DATASTORE_PluginFunctions *api;
926   struct Plugin *plugin;
927
928   plugin = GNUNET_new(struct Plugin);
929   plugin->env = env;
930   if (GNUNET_OK != init_connection(plugin))
931     {
932       GNUNET_free(plugin);
933       return NULL;
934     }
935   api = GNUNET_new(struct GNUNET_DATASTORE_PluginFunctions);
936   api->cls = plugin;
937   api->estimate_size = &postgres_plugin_estimate_size;
938   api->put = &postgres_plugin_put;
939   api->get_key = &postgres_plugin_get_key;
940   api->get_replication = &postgres_plugin_get_replication;
941   api->get_expiration = &postgres_plugin_get_expiration;
942   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
943   api->get_keys = &postgres_plugin_get_keys;
944   api->drop = &postgres_plugin_drop;
945   api->remove_key = &postgres_plugin_remove_key;
946   GNUNET_log_from(GNUNET_ERROR_TYPE_INFO,
947                   "datastore-postgres",
948                   _("Postgres database running\n"));
949   return api;
950 }
951
952
953 /**
954  * Exit point from the plugin.
955  *
956  * @param cls our `struct Plugin *`
957  * @return always NULL
958  */
959 void *
960 libgnunet_plugin_datastore_postgres_done(void *cls)
961 {
962   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
963   struct Plugin *plugin = api->cls;
964
965   PQfinish(plugin->dbh);
966   GNUNET_free(plugin);
967   GNUNET_free(api);
968   return NULL;
969 }
970
971 /* end of plugin_datastore_postgres.c */