src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_datastore_plugin.h"
28 #include "gnunet_pq_lib.h"
29
30
31 /**
32  * After how many ms "busy" should a DB operation fail for good?
33  * A low value makes sure that we are more responsive to requests
34  * (especially PUTs).  A high value guarantees a higher success
35  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
36  *
37  * The default value of 1s should ensure that users do not experience
38  * huge latencies while at the same time allowing operations to succeed
39  * with reasonable probability.
40  */
41 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
42
43
44 /**
45  * Context for all functions in this plugin.
46  */
47 struct Plugin
48 {
49   /**
50    * Our execution environment.
51    */
52   struct GNUNET_DATASTORE_PluginEnvironment *env;
53
54   /**
55    * Native Postgres database handle.
56    */
57   PGconn *dbh;
58
59 };
60
61
62 /**
63  * @brief Get a database handle
64  *
65  * @param plugin global context
66  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
67  */
68 static int
69 init_connection (struct Plugin *plugin)
70 {
71   struct GNUNET_PQ_ExecuteStatement es[] = {
72     /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
73      * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
74      * we do math or inequality tests, so we can't handle the entire range of uint32_t.
75      * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
76      * PostgreSQL also recommends against using WITH OIDS.
77      */
78     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
79                             "  repl INTEGER NOT NULL DEFAULT 0,"
80                             "  type INTEGER NOT NULL DEFAULT 0,"
81                             "  prio INTEGER NOT NULL DEFAULT 0,"
82                             "  anonLevel INTEGER NOT NULL DEFAULT 0,"
83                             "  expire BIGINT NOT NULL DEFAULT 0,"
84                             "  rvalue BIGINT NOT NULL DEFAULT 0,"
85                             "  hash BYTEA NOT NULL DEFAULT '',"
86                             "  vhash BYTEA NOT NULL DEFAULT '',"
87                             "  value BYTEA NOT NULL DEFAULT '')"
88                             "WITH OIDS"),
89     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
90     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
91     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
92     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
93     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
94     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
95     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
96     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
97     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
98     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
99     GNUNET_PQ_EXECUTE_STATEMENT_END
100   };
101 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
102   struct GNUNET_PQ_PreparedStatement ps[] = {
103     GNUNET_PQ_make_prepare ("get",
104                             "SELECT " RESULT_COLUMNS " FROM gn090"
105                             " WHERE oid >= $1::bigint AND"
106                             " (rvalue >= $2 OR 0 = $3::smallint) AND"
107                             " (hash = $4 OR 0 = $5::smallint) AND"
108                             " (type = $6 OR 0 = $7::smallint)"
109                             " ORDER BY oid ASC LIMIT 1",
110                             7),
111     GNUNET_PQ_make_prepare ("put",
112                             "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
113                             "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
114                             9),
115     GNUNET_PQ_make_prepare ("update",
116                             "UPDATE gn090"
117                             " SET prio = prio + $1,"
118                             " repl = repl + $2,"
119                             " expire = GREATEST(expire, $3)"
120                             " WHERE hash = $4 AND vhash = $5",
121                             5),
122     GNUNET_PQ_make_prepare ("decrepl",
123                             "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
124                             "WHERE oid = $1",
125                             1),
126     GNUNET_PQ_make_prepare ("select_non_anonymous",
127                             "SELECT " RESULT_COLUMNS " FROM gn090 "
128                             "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
129                             "ORDER BY oid ASC LIMIT 1",
130                             2),
131     GNUNET_PQ_make_prepare ("select_expiration_order",
132                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
133                             "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
134                             "UNION "
135                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
136                             "ORDER BY prio ASC LIMIT 1) "
137                             "ORDER BY expire ASC LIMIT 1",
138                             1),
139     GNUNET_PQ_make_prepare ("select_replication_order",
140                             "SELECT " RESULT_COLUMNS " FROM gn090 "
141                             "ORDER BY repl DESC,RANDOM() LIMIT 1",
142                             0),
143     GNUNET_PQ_make_prepare ("delrow",
144                             "DELETE FROM gn090 "
145                             "WHERE oid=$1",
146                             1),
147     GNUNET_PQ_make_prepare ("remove",
148                             "DELETE FROM gn090"
149                             " WHERE hash = $1 AND"
150                             " value = $2",
151                             2),
152     GNUNET_PQ_make_prepare ("get_keys",
153                             "SELECT hash FROM gn090",
154                             0),
155     GNUNET_PQ_make_prepare ("estimate_size",
156                             "SELECT SUM(LENGTH(value))+256*COUNT(*) AS total FROM gn090",
157                             0),
158     GNUNET_PQ_PREPARED_STATEMENT_END
159   };
160 #undef RESULT_COLUMNS
161
162   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
163                                             "datastore-postgres");
164   if (NULL == plugin->dbh)
165     return GNUNET_SYSERR;
166
167   if ( (GNUNET_OK !=
168         GNUNET_PQ_exec_statements (plugin->dbh,
169                                    es)) ||
170        (GNUNET_OK !=
171         GNUNET_PQ_prepare_statements (plugin->dbh,
172                                       ps)) )
173   {
174     PQfinish (plugin->dbh);
175     plugin->dbh = NULL;
176     return GNUNET_SYSERR;
177   }
178   return GNUNET_OK;
179 }
180
181
182 /**
183  * Get an estimate of how much space the database is
184  * currently using.
185  *
186  * @param cls our `struct Plugin *`
187  * @return number of bytes used on disk
188  */
189 static void
190 postgres_plugin_estimate_size (void *cls,
191                                unsigned long long *estimate)
192 {
193   struct Plugin *plugin = cls;
194   uint64_t total;
195   struct GNUNET_PQ_QueryParam params[] = {
196     GNUNET_PQ_query_param_end
197   };
198   struct GNUNET_PQ_ResultSpec rs[] = {
199     GNUNET_PQ_result_spec_uint64 ("total",
200                                   &total),
201     GNUNET_PQ_result_spec_end
202   };
203   enum GNUNET_DB_QueryStatus ret;
204
205   if (NULL == estimate)
206     return;
207   ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
208                                                   "estimate_size",
209                                                   params,
210                                                   rs);
211   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
212   {
213     *estimate = 0LL;
214     return;
215   }
216   *estimate = total;
217 }
218
219
220 /**
221  * Store an item in the datastore.
222  *
223  * @param cls closure with the `struct Plugin`
224  * @param key key for the item
225  * @param absent true if the key was not found in the bloom filter
226  * @param size number of bytes in data
227  * @param data content stored
228  * @param type type of the content
229  * @param priority priority of the content
230  * @param anonymity anonymity-level for the content
231  * @param replication replication-level for the content
232  * @param expiration expiration time for the content
233  * @param cont continuation called with success or failure status
234  * @param cont_cls continuation closure
235  */
236 static void
237 postgres_plugin_put (void *cls,
238                      const struct GNUNET_HashCode *key,
239                      bool absent,
240                      uint32_t size,
241                      const void *data,
242                      enum GNUNET_BLOCK_Type type,
243                      uint32_t priority,
244                      uint32_t anonymity,
245                      uint32_t replication,
246                      struct GNUNET_TIME_Absolute expiration,
247                      PluginPutCont cont,
248                      void *cont_cls)
249 {
250   struct Plugin *plugin = cls;
251   struct GNUNET_HashCode vhash;
252   enum GNUNET_DB_QueryStatus ret;
253
254   GNUNET_CRYPTO_hash (data,
255                       size,
256                       &vhash);
257   if (! absent)
258   {
259     struct GNUNET_PQ_QueryParam params[] = {
260       GNUNET_PQ_query_param_uint32 (&priority),
261       GNUNET_PQ_query_param_uint32 (&replication),
262       GNUNET_PQ_query_param_absolute_time (&expiration),
263       GNUNET_PQ_query_param_auto_from_type (key),
264       GNUNET_PQ_query_param_auto_from_type (&vhash),
265       GNUNET_PQ_query_param_end
266     };
267     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
268                                               "update",
269                                               params);
270     if (0 > ret)
271     {
272       cont (cont_cls,
273             key,
274             size,
275             GNUNET_SYSERR,
276             _("Postgress exec failure"));
277       return;
278     }
279     bool affected = (0 != ret);
280     if (affected)
281     {
282       cont (cont_cls,
283             key,
284             size,
285             GNUNET_NO,
286             NULL);
287       return;
288     }
289   }
290
291   {
292     uint32_t utype = (uint32_t) type;
293     uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
294                                                 UINT64_MAX);
295     struct GNUNET_PQ_QueryParam params[] = {
296       GNUNET_PQ_query_param_uint32 (&replication),
297       GNUNET_PQ_query_param_uint32 (&utype),
298       GNUNET_PQ_query_param_uint32 (&priority),
299       GNUNET_PQ_query_param_uint32 (&anonymity),
300       GNUNET_PQ_query_param_absolute_time (&expiration),
301       GNUNET_PQ_query_param_uint64 (&rvalue),
302       GNUNET_PQ_query_param_auto_from_type (key),
303       GNUNET_PQ_query_param_auto_from_type (&vhash),
304       GNUNET_PQ_query_param_fixed_size (data, size),
305       GNUNET_PQ_query_param_end
306     };
307
308     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
309                                               "put",
310                                               params);
311     if (0 > ret)
312     {
313       cont (cont_cls,
314             key,
315             size,
316             GNUNET_SYSERR,
317             "Postgress exec failure");
318       return;
319     }
320   }
321   plugin->env->duc (plugin->env->cls,
322                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
323   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
324                    "datastore-postgres",
325                    "Stored %u bytes in database\n",
326                    (unsigned int) size);
327   cont (cont_cls,
328         key,
329         size,
330         GNUNET_OK,
331         NULL);
332 }
333
334
335 /**
336  * Closure for #process_result.
337  */
338 struct ProcessResultContext
339 {
340
341   /**
342    * The plugin handle.
343    */
344   struct Plugin *plugin;
345
346   /**
347    * Function to call on each result.
348    */
349   PluginDatumProcessor proc;
350
351   /**
352    * Closure for @e proc.
353    */
354   void *proc_cls;
355
356 };
357
358
359 /**
360  * Function invoked to process the result and call the processor of @a
361  * cls.
362  *
363  * @param cls our `struct ProcessResultContext`
364  * @param res result from exec
365  * @param num_results number of results in @a res
366  */
367 static void
368 process_result (void *cls,
369                 PGresult *res,
370                 unsigned int num_results)
371 {
372   struct ProcessResultContext *prc = cls;
373   struct Plugin *plugin = prc->plugin;
374
375   if (0 == num_results)
376   {
377     /* no result */
378     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
379                      "datastore-postgres",
380                      "Ending iteration (no more results)\n");
381     prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
382                GNUNET_TIME_UNIT_ZERO_ABS, 0);
383     return;
384   }
385   if (1 != num_results)
386   {
387     GNUNET_break (0);
388     prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
389                GNUNET_TIME_UNIT_ZERO_ABS, 0);
390     return;
391   }
392   /* Technically we don't need the loop here, but nicer in case
393      we ever relax the condition above. */
394   for (unsigned int i=0;i<num_results;i++)
395   {
396     int iret;
397     uint32_t rowid;
398     uint32_t utype;
399     uint32_t anonymity;
400     uint32_t replication;
401     uint32_t priority;
402     size_t size;
403     void *data;
404     struct GNUNET_TIME_Absolute expiration_time;
405     struct GNUNET_HashCode key;
406     struct GNUNET_PQ_ResultSpec rs[] = {
407       GNUNET_PQ_result_spec_uint32 ("repl", &replication),
408       GNUNET_PQ_result_spec_uint32 ("type", &utype),
409       GNUNET_PQ_result_spec_uint32 ("prio", &priority),
410       GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
411       GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
412       GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
413       GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
414       GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
415       GNUNET_PQ_result_spec_end
416     };
417
418     if (GNUNET_OK !=
419         GNUNET_PQ_extract_result (res,
420                                   rs,
421                                   i))
422     {
423       GNUNET_break (0);
424       prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
425                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
426       return;
427     }
428
429     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
430                      "datastore-postgres",
431                      "Found result of size %u bytes and type %u in database\n",
432                      (unsigned int) size,
433                      (unsigned int) utype);
434     iret = prc->proc (prc->proc_cls,
435                       &key,
436                       size,
437                       data,
438                       (enum GNUNET_BLOCK_Type) utype,
439                       priority,
440                       anonymity,
441                       replication,
442                       expiration_time,
443                       rowid);
444     if (iret == GNUNET_NO)
445     {
446       struct GNUNET_PQ_QueryParam param[] = {
447         GNUNET_PQ_query_param_uint32 (&rowid),
448         GNUNET_PQ_query_param_end
449       };
450
451       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452                   "Processor asked for item %u to be removed.\n",
453                   (unsigned int) rowid);
454       if (0 <
455           GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
456                                               "delrow",
457                                               param))
458       {
459         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
460                          "datastore-postgres",
461                          "Deleting %u bytes from database\n",
462                          (unsigned int) size);
463         plugin->env->duc (plugin->env->cls,
464                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
465         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
466                          "datastore-postgres",
467                          "Deleted %u bytes from database\n",
468                          (unsigned int) size);
469       }
470     }
471     GNUNET_PQ_cleanup_result (rs);
472   } /* for (i) */
473 }
474
475
476 /**
477  * Get one of the results for a particular key in the datastore.
478  *
479  * @param cls closure with the `struct Plugin`
480  * @param next_uid return the result with lowest uid >= next_uid
481  * @param random if true, return a random result instead of using next_uid
482  * @param key maybe NULL (to match all entries)
483  * @param type entries of which type are relevant?
484  *     Use 0 for any type.
485  * @param proc function to call on the matching value;
486  *        will be called with NULL if nothing matches
487  * @param proc_cls closure for @a proc
488  */
489 static void
490 postgres_plugin_get_key (void *cls,
491                          uint64_t next_uid,
492                          bool random,
493                          const struct GNUNET_HashCode *key,
494                          enum GNUNET_BLOCK_Type type,
495                          PluginDatumProcessor proc,
496                          void *proc_cls)
497 {
498   struct Plugin *plugin = cls;
499   uint32_t utype = type;
500   uint16_t use_rvalue = random;
501   uint16_t use_key = NULL != key;
502   uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
503   uint64_t rvalue;
504   struct GNUNET_PQ_QueryParam params[] = {
505     GNUNET_PQ_query_param_uint64 (&next_uid),
506     GNUNET_PQ_query_param_uint64 (&rvalue),
507     GNUNET_PQ_query_param_uint16 (&use_rvalue),
508     GNUNET_PQ_query_param_auto_from_type (key),
509     GNUNET_PQ_query_param_uint16 (&use_key),
510     GNUNET_PQ_query_param_uint32 (&utype),
511     GNUNET_PQ_query_param_uint16 (&use_type),
512     GNUNET_PQ_query_param_end
513   };
514   struct ProcessResultContext prc;
515   enum GNUNET_DB_QueryStatus res;
516
517   if (random)
518   {
519     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
520                                        UINT64_MAX);
521     next_uid = 0;
522   }
523   else
524   {
525     rvalue = 0;
526   }
527   prc.plugin = plugin;
528   prc.proc = proc;
529   prc.proc_cls = proc_cls;
530
531   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
532                                               "get",
533                                               params,
534                                               &process_result,
535                                               &prc);
536   if (0 > res)
537     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
538           GNUNET_TIME_UNIT_ZERO_ABS, 0);
539 }
540
541
542 /**
543  * Select a subset of the items in the datastore and call
544  * the given iterator for each of them.
545  *
546  * @param cls our `struct Plugin *`
547  * @param next_uid return the result with lowest uid >= next_uid
548  * @param type entries of which type should be considered?
549  *        Must not be zero (ANY).
550  * @param proc function to call on the matching value;
551  *        will be called with NULL if no value matches
552  * @param proc_cls closure for @a proc
553  */
554 static void
555 postgres_plugin_get_zero_anonymity (void *cls,
556                                     uint64_t next_uid,
557                                     enum GNUNET_BLOCK_Type type,
558                                     PluginDatumProcessor proc,
559                                     void *proc_cls)
560 {
561   struct Plugin *plugin = cls;
562   uint32_t utype = type;
563   struct GNUNET_PQ_QueryParam params[] = {
564     GNUNET_PQ_query_param_uint32 (&utype),
565     GNUNET_PQ_query_param_uint64 (&next_uid),
566     GNUNET_PQ_query_param_end
567   };
568   struct ProcessResultContext prc;
569   enum GNUNET_DB_QueryStatus res;
570
571   prc.plugin = plugin;
572   prc.proc = proc;
573   prc.proc_cls = proc_cls;
574   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
575                                               "select_non_anonymous",
576                                               params,
577                                               &process_result,
578                                               &prc);
579   if (0 > res)
580     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
581           GNUNET_TIME_UNIT_ZERO_ABS, 0);
582 }
583
584
585 /**
586  * Context for #repl_iter() function.
587  */
588 struct ReplCtx
589 {
590
591   /**
592    * Plugin handle.
593    */
594   struct Plugin *plugin;
595
596   /**
597    * Function to call for the result (or the NULL).
598    */
599   PluginDatumProcessor proc;
600
601   /**
602    * Closure for @e proc.
603    */
604   void *proc_cls;
605 };
606
607
608 /**
609  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
610  * Decrements the replication counter and calls the original
611  * iterator.
612  *
613  * @param cls closure with the `struct ReplCtx *`
614  * @param key key for the content
615  * @param size number of bytes in @a data
616  * @param data content stored
617  * @param type type of the content
618  * @param priority priority of the content
619  * @param anonymity anonymity-level for the content
620  * @param replication replication-level for the content
621  * @param expiration expiration time for the content
622  * @param uid unique identifier for the datum;
623  *        maybe 0 if no unique identifier is available
624  * @return #GNUNET_SYSERR to abort the iteration,
625  *         #GNUNET_OK to continue
626  *         (continue on call to "next", of course),
627  *         #GNUNET_NO to delete the item and continue (if supported)
628  */
629 static int
630 repl_proc (void *cls,
631            const struct GNUNET_HashCode *key,
632            uint32_t size,
633            const void *data,
634            enum GNUNET_BLOCK_Type type,
635            uint32_t priority,
636            uint32_t anonymity,
637            uint32_t replication,
638            struct GNUNET_TIME_Absolute expiration,
639            uint64_t uid)
640 {
641   struct ReplCtx *rc = cls;
642   struct Plugin *plugin = rc->plugin;
643   int ret;
644   uint32_t oid = (uint32_t) uid;
645   struct GNUNET_PQ_QueryParam params[] = {
646     GNUNET_PQ_query_param_uint32 (&oid),
647     GNUNET_PQ_query_param_end
648   };
649   enum GNUNET_DB_QueryStatus qret;
650
651   ret = rc->proc (rc->proc_cls,
652                   key,
653                   size,
654                   data,
655                   type,
656                   priority,
657                   anonymity,
658                   replication,
659                   expiration,
660                   uid);
661   if (NULL == key)
662     return ret;
663   qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
664                                              "decrepl",
665                                              params);
666   if (0 > qret)
667     return GNUNET_SYSERR;
668   return ret;
669 }
670
671
672 /**
673  * Get a random item for replication.  Returns a single, not expired,
674  * random item from those with the highest replication counters.  The
675  * item's replication counter is decremented by one IF it was positive
676  * before.  Call @a proc with all values ZERO or NULL if the datastore
677  * is empty.
678  *
679  * @param cls closure with the `struct Plugin`
680  * @param proc function to call the value (once only).
681  * @param proc_cls closure for @a proc
682  */
683 static void
684 postgres_plugin_get_replication (void *cls,
685                                  PluginDatumProcessor proc,
686                                  void *proc_cls)
687 {
688   struct Plugin *plugin = cls;
689   struct GNUNET_PQ_QueryParam params[] = {
690     GNUNET_PQ_query_param_end
691   };
692   struct ReplCtx rc;
693   struct ProcessResultContext prc;
694   enum GNUNET_DB_QueryStatus res;
695
696   rc.plugin = plugin;
697   rc.proc = proc;
698   rc.proc_cls = proc_cls;
699   prc.plugin = plugin;
700   prc.proc = &repl_proc;
701   prc.proc_cls = &rc;
702   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
703                                               "select_replication_order",
704                                               params,
705                                               &process_result,
706                                               &prc);
707   if (0 > res)
708     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
709           GNUNET_TIME_UNIT_ZERO_ABS, 0);
710 }
711
712
713 /**
714  * Get a random item for expiration.  Call @a proc with all values
715  * ZERO or NULL if the datastore is empty.
716  *
717  * @param cls closure with the `struct Plugin`
718  * @param proc function to call the value (once only).
719  * @param proc_cls closure for @a proc
720  */
721 static void
722 postgres_plugin_get_expiration (void *cls,
723                                 PluginDatumProcessor proc,
724                                 void *proc_cls)
725 {
726   struct Plugin *plugin = cls;
727   struct GNUNET_TIME_Absolute now;
728   struct GNUNET_PQ_QueryParam params[] = {
729     GNUNET_PQ_query_param_absolute_time (&now),
730     GNUNET_PQ_query_param_end
731   };
732   struct ProcessResultContext prc;
733
734   now = GNUNET_TIME_absolute_get ();
735   prc.plugin = plugin;
736   prc.proc = proc;
737   prc.proc_cls = proc_cls;
738   (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
739                                                "select_expiration_order",
740                                                params,
741                                                &process_result,
742                                                &prc);
743 }
744
745
746 /**
747  * Closure for #process_keys.
748  */
749 struct ProcessKeysContext
750 {
751
752   /**
753    * Function to call for each key.
754    */
755   PluginKeyProcessor proc;
756
757   /**
758    * Closure for @e proc.
759    */
760   void *proc_cls;
761 };
762
763
764 /**
765  * Function to be called with the results of a SELECT statement
766  * that has returned @a num_results results.
767  *
768  * @param cls closure with a `struct ProcessKeysContext`
769  * @param result the postgres result
770  * @param num_result the number of results in @a result
771  */
772 static void
773 process_keys (void *cls,
774               PGresult *result,
775               unsigned int num_results)
776 {
777   struct ProcessKeysContext *pkc = cls;
778
779   for (unsigned i=0;i<num_results;i++)
780   {
781     struct GNUNET_HashCode key;
782     struct GNUNET_PQ_ResultSpec rs[] = {
783       GNUNET_PQ_result_spec_auto_from_type ("hash",
784                                             &key),
785       GNUNET_PQ_result_spec_end
786     };
787
788     if (GNUNET_OK !=
789         GNUNET_PQ_extract_result (result,
790                                   rs,
791                                   i))
792     {
793       GNUNET_break (0);
794       continue;
795     }
796     pkc->proc (pkc->proc_cls,
797                &key,
798                1);
799     GNUNET_PQ_cleanup_result (rs);
800   }
801 }
802
803
804 /**
805  * Get all of the keys in the datastore.
806  *
807  * @param cls closure with the `struct Plugin *`
808  * @param proc function to call on each key
809  * @param proc_cls closure for @a proc
810  */
811 static void
812 postgres_plugin_get_keys (void *cls,
813                           PluginKeyProcessor proc,
814                           void *proc_cls)
815 {
816   struct Plugin *plugin = cls;
817   struct GNUNET_PQ_QueryParam params[] = {
818     GNUNET_PQ_query_param_end
819   };
820   struct ProcessKeysContext pkc;
821
822   pkc.proc = proc;
823   pkc.proc_cls = proc_cls;
824   (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
825                                                "get_keys",
826                                                params,
827                                                &process_keys,
828                                                &pkc);
829   proc (proc_cls,
830         NULL,
831         0);
832 }
833
834
835 /**
836  * Drop database.
837  *
838  * @param cls closure with the `struct Plugin *`
839  */
840 static void
841 postgres_plugin_drop (void *cls)
842 {
843   struct Plugin *plugin = cls;
844   struct GNUNET_PQ_ExecuteStatement es[] = {
845     GNUNET_PQ_make_execute ("DROP TABLE gn090"),
846     GNUNET_PQ_EXECUTE_STATEMENT_END
847   };
848
849   if (GNUNET_OK !=
850       GNUNET_PQ_exec_statements (plugin->dbh,
851                                  es))
852     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
853                      "postgres",
854                      _("Failed to drop table from database.\n"));
855 }
856
857
858 /**
859  * Remove a particular key in the datastore.
860  *
861  * @param cls closure
862  * @param key key for the content
863  * @param size number of bytes in data
864  * @param data content stored
865  * @param cont continuation called with success or failure status
866  * @param cont_cls continuation closure for @a cont
867  */
868 static void
869 postgres_plugin_remove_key (void *cls,
870                             const struct GNUNET_HashCode *key,
871                             uint32_t size,
872                             const void *data,
873                             PluginRemoveCont cont,
874                             void *cont_cls)
875 {
876   struct Plugin *plugin = cls;
877   enum GNUNET_DB_QueryStatus ret;
878   struct GNUNET_PQ_QueryParam params[] = {
879     GNUNET_PQ_query_param_auto_from_type (key),
880     GNUNET_PQ_query_param_fixed_size (data, size),
881     GNUNET_PQ_query_param_end
882   };
883
884   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
885                                             "remove",
886                                             params);
887   if (0 > ret)
888   {
889     cont (cont_cls,
890           key,
891           size,
892           GNUNET_SYSERR,
893           _("Postgress exec failure"));
894     return;
895   }
896   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
897   {
898     cont (cont_cls,
899           key,
900           size,
901           GNUNET_NO,
902           NULL);
903     return;
904   }
905   plugin->env->duc (plugin->env->cls,
906                     - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
907   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
908                    "datastore-postgres",
909                    "Deleted %u bytes from database\n",
910                    (unsigned int) size);
911   cont (cont_cls,
912         key,
913         size,
914         GNUNET_OK,
915         NULL);
916 }
917
918
919 /**
920  * Entry point for the plugin.
921  *
922  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
923  * @return our `struct Plugin *`
924  */
925 void *
926 libgnunet_plugin_datastore_postgres_init (void *cls)
927 {
928   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
929   struct GNUNET_DATASTORE_PluginFunctions *api;
930   struct Plugin *plugin;
931
932   plugin = GNUNET_new (struct Plugin);
933   plugin->env = env;
934   if (GNUNET_OK != init_connection (plugin))
935   {
936     GNUNET_free (plugin);
937     return NULL;
938   }
939   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
940   api->cls = plugin;
941   api->estimate_size = &postgres_plugin_estimate_size;
942   api->put = &postgres_plugin_put;
943   api->get_key = &postgres_plugin_get_key;
944   api->get_replication = &postgres_plugin_get_replication;
945   api->get_expiration = &postgres_plugin_get_expiration;
946   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
947   api->get_keys = &postgres_plugin_get_keys;
948   api->drop = &postgres_plugin_drop;
949   api->remove_key = &postgres_plugin_remove_key;
950   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
951                    "datastore-postgres",
952                    _("Postgres database running\n"));
953   return api;
954 }
955
956
957 /**
958  * Exit point from the plugin.
959  *
960  * @param cls our `struct Plugin *`
961  * @return always NULL
962  */
963 void *
964 libgnunet_plugin_datastore_postgres_done (void *cls)
965 {
966   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
967   struct Plugin *plugin = api->cls;
968
969   PQfinish (plugin->dbh);
970   GNUNET_free (plugin);
971   GNUNET_free (api);
972   return NULL;
973 }
974
975 /* end of plugin_datastore_postgres.c */