more pq work
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include "gnunet_pq_lib.h"
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   struct GNUNET_PQ_ExecuteStatement es[] = {
74     /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
75      * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
76      * we do math or inequality tests, so we can't handle the entire range of uint32_t.
77      * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
78      * PostgreSQL also recommends against using WITH OIDS.
79      */
80     GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
81                             "  repl INTEGER NOT NULL DEFAULT 0,"
82                             "  type INTEGER NOT NULL DEFAULT 0,"
83                             "  prio INTEGER NOT NULL DEFAULT 0,"
84                             "  anonLevel INTEGER NOT NULL DEFAULT 0,"
85                             "  expire BIGINT NOT NULL DEFAULT 0,"
86                             "  rvalue BIGINT NOT NULL DEFAULT 0,"
87                             "  hash BYTEA NOT NULL DEFAULT '',"
88                             "  vhash BYTEA NOT NULL DEFAULT '',"
89                             "  value BYTEA NOT NULL DEFAULT '')"
90                             "WITH OIDS"),
91     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
92     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
93     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
94     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
95     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
96     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
97     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
98     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
99     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
100     GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
101     GNUNET_PQ_EXECUTE_STATEMENT_END
102   };
103 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
104   struct GNUNET_PQ_PreparedStatement ps[] = {
105     GNUNET_PQ_make_prepare ("get",
106                             "SELECT " RESULT_COLUMNS " FROM gn090 "
107                             "WHERE oid >= $1::bigint AND "
108                             "(rvalue >= $2 OR 0 = $3::smallint) AND "
109                             "(hash = $4 OR 0 = $5::smallint) AND "
110                             "(type = $6 OR 0 = $7::smallint) "
111                             "ORDER BY oid ASC LIMIT 1",
112                             7),
113     GNUNET_PQ_make_prepare ("put",
114                             "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
115                             "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
116                             9),
117     GNUNET_PQ_make_prepare ("update",
118                             "UPDATE gn090 "
119                             "SET prio = prio + $1, "
120                             "repl = repl + $2, "
121                             "expire = GREATEST(expire, $3) "
122                             "WHERE hash = $4 AND vhash = $5",
123                             5),
124     GNUNET_PQ_make_prepare ("decrepl",
125                             "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
126                             "WHERE oid = $1",
127                             1),
128     GNUNET_PQ_make_prepare ("select_non_anonymous",
129                             "SELECT " RESULT_COLUMNS " FROM gn090 "
130                             "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
131                             "ORDER BY oid ASC LIMIT 1",
132                             2),
133     GNUNET_PQ_make_prepare ("select_expiration_order",
134                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
135                             "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
136                             "UNION "
137                             "(SELECT " RESULT_COLUMNS " FROM gn090 "
138                             "ORDER BY prio ASC LIMIT 1) "
139                             "ORDER BY expire ASC LIMIT 1",
140                             1),
141     GNUNET_PQ_make_prepare ("select_replication_order",
142                             "SELECT " RESULT_COLUMNS " FROM gn090 "
143                             "ORDER BY repl DESC,RANDOM() LIMIT 1",
144                             0),
145     GNUNET_PQ_make_prepare ("delrow",
146                             "DELETE FROM gn090 " "WHERE oid=$1",
147                             1),
148     GNUNET_PQ_make_prepare ("remove", "DELETE FROM gn090 "
149                             "WHERE hash = $1 AND "
150                             "value = $2",
151                             2),
152     GNUNET_PQ_make_prepare ("get_keys",
153                             "SELECT hash FROM gn090",
154                             0),
155     GNUNET_PQ_PREPARED_STATEMENT_END
156   };
157 #undef RESULT_COLUMNS
158
159   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
160                                             "datastore-postgres");
161   if (NULL == plugin->dbh)
162     return GNUNET_SYSERR;
163
164   if ( (GNUNET_OK !=
165         GNUNET_PQ_exec_statements (plugin->dbh,
166                                    es)) ||
167        (GNUNET_OK !=
168         GNUNET_PQ_prepare_statements (plugin->dbh,
169                                       ps)) )
170   {
171     PQfinish (plugin->dbh);
172     plugin->dbh = NULL;
173     return GNUNET_SYSERR;
174   }
175   return GNUNET_OK;
176 }
177
178
179 /**
180  * Get an estimate of how much space the database is
181  * currently using.
182  *
183  * @param cls our `struct Plugin *`
184  * @return number of bytes used on disk
185  */
186 static void
187 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
188 {
189   struct Plugin *plugin = cls;
190   unsigned long long total;
191   PGresult *ret;
192
193   if (NULL == estimate)
194     return;
195   ret =
196       PQexecParams (plugin->dbh,
197                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
198                     NULL, NULL, NULL, NULL, 1);
199   if (GNUNET_OK !=
200       GNUNET_POSTGRES_check_result (plugin->dbh,
201                                     ret,
202                                     PGRES_TUPLES_OK,
203                                     "PQexecParams",
204                                     "get_size"))
205   {
206     *estimate = 0;
207     return;
208   }
209   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
210   {
211     GNUNET_break (0);
212     PQclear (ret);
213     *estimate = 0;
214     return;
215   }
216   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
217   {
218     GNUNET_break (0 == PQgetlength (ret, 0, 0));
219     PQclear (ret);
220     *estimate = 0;
221     return;
222   }
223   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
224   PQclear (ret);
225   *estimate = total;
226 }
227
228
229 /**
230  * Store an item in the datastore.
231  *
232  * @param cls closure with the `struct Plugin`
233  * @param key key for the item
234  * @param absent true if the key was not found in the bloom filter
235  * @param size number of bytes in data
236  * @param data content stored
237  * @param type type of the content
238  * @param priority priority of the content
239  * @param anonymity anonymity-level for the content
240  * @param replication replication-level for the content
241  * @param expiration expiration time for the content
242  * @param cont continuation called with success or failure status
243  * @param cont_cls continuation closure
244  */
245 static void
246 postgres_plugin_put (void *cls,
247                      const struct GNUNET_HashCode *key,
248                      bool absent,
249                      uint32_t size,
250                      const void *data,
251                      enum GNUNET_BLOCK_Type type,
252                      uint32_t priority,
253                      uint32_t anonymity,
254                      uint32_t replication,
255                      struct GNUNET_TIME_Absolute expiration,
256                      PluginPutCont cont,
257                      void *cont_cls)
258 {
259   struct Plugin *plugin = cls;
260   struct GNUNET_HashCode vhash;
261   enum GNUNET_PQ_QueryStatus ret;
262
263   GNUNET_CRYPTO_hash (data,
264                       size,
265                       &vhash);
266   if (! absent)
267   {
268     struct GNUNET_PQ_QueryParam params[] = {
269       GNUNET_PQ_query_param_uint32 (&priority),
270       GNUNET_PQ_query_param_uint32 (&replication),
271       GNUNET_PQ_query_param_absolute_time (&expiration),
272       GNUNET_PQ_query_param_auto_from_type (key),
273       GNUNET_PQ_query_param_auto_from_type (&vhash),
274       GNUNET_PQ_query_param_end
275     };
276     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
277                                               "update",
278                                               params);
279     if (0 > ret)
280     {
281       cont (cont_cls,
282             key,
283             size,
284             GNUNET_SYSERR,
285             _("Postgress exec failure"));
286       return;
287     }
288     bool affected = (0 != ret);
289     if (affected)
290     {
291       cont (cont_cls,
292             key,
293             size,
294             GNUNET_NO,
295             NULL);
296       return;
297     }
298   }
299
300   {
301     uint32_t utype = (uint32_t) type;
302     uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
303                                                 UINT64_MAX);
304     struct GNUNET_PQ_QueryParam params[] = {
305       GNUNET_PQ_query_param_uint32 (&replication),
306       GNUNET_PQ_query_param_uint32 (&utype),
307       GNUNET_PQ_query_param_uint32 (&priority),
308       GNUNET_PQ_query_param_uint32 (&anonymity),
309       GNUNET_PQ_query_param_absolute_time (&expiration),
310       GNUNET_PQ_query_param_uint64 (&rvalue),
311       GNUNET_PQ_query_param_auto_from_type (key),
312       GNUNET_PQ_query_param_auto_from_type (&vhash),
313       GNUNET_PQ_query_param_fixed_size (data, size),
314       GNUNET_PQ_query_param_end
315     };
316
317     ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
318                                               "put",
319                                               params);
320     if (0 > ret)
321     {
322       cont (cont_cls,
323             key,
324             size,
325             GNUNET_SYSERR,
326             "Postgress exec failure");
327       return;
328     }
329   }
330   plugin->env->duc (plugin->env->cls,
331                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
332   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
333                    "datastore-postgres",
334                    "Stored %u bytes in database\n",
335                    (unsigned int) size);
336   cont (cont_cls,
337         key,
338         size,
339         GNUNET_OK,
340         NULL);
341 }
342
343
344 /**
345  * Function invoked to process the result and call the processor.
346  *
347  * @param plugin global plugin data
348  * @param proc function to call the value (once only).
349  * @param proc_cls closure for proc
350  * @param res result from exec
351  * @param filename filename for error messages
352  * @param line line number for error messages
353  */
354 static void
355 process_result (struct Plugin *plugin,
356                 PluginDatumProcessor proc,
357                 void *proc_cls,
358                 PGresult * res,
359                 const char *filename, int line)
360 {
361   int iret;
362   uint32_t rowid;
363   uint32_t utype;
364   uint32_t anonymity;
365   uint32_t replication;
366   uint32_t priority;
367   size_t size;
368   void *data;
369   struct GNUNET_TIME_Absolute expiration_time;
370   struct GNUNET_HashCode key;
371   struct GNUNET_PQ_ResultSpec rs[] = {
372     GNUNET_PQ_result_spec_uint32 ("repl", &replication),
373     GNUNET_PQ_result_spec_uint32 ("type", &utype),
374     GNUNET_PQ_result_spec_uint32 ("prio", &priority),
375     GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
376     GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
377     GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
378     GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
379     GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
380     GNUNET_PQ_result_spec_end
381   };
382
383   if (GNUNET_OK !=
384       GNUNET_POSTGRES_check_result_ (plugin->dbh,
385                                      res,
386                                      PGRES_TUPLES_OK,
387                                      "PQexecPrepared",
388                                      "select",
389                                      filename, line))
390   {
391     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
392                      "datastore-postgres",
393                      "Ending iteration (postgres error)\n");
394     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
395     return;
396   }
397
398   if (0 == PQntuples (res))
399   {
400     /* no result */
401     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
402                      "datastore-postgres",
403                      "Ending iteration (no more results)\n");
404     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
405     PQclear (res);
406     return;
407   }
408   if (1 != PQntuples (res))
409   {
410     GNUNET_break (0);
411     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
412     PQclear (res);
413     return;
414   }
415   if (GNUNET_OK !=
416       GNUNET_PQ_extract_result (res,
417                                 rs,
418                                 0))
419   {
420     GNUNET_break (0);
421     PQclear (res);
422     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
423                                      "delrow",
424                                      rowid);
425     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
426     return;
427   }
428
429   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
430                    "datastore-postgres",
431                    "Found result of size %u bytes and type %u in database\n",
432                    (unsigned int) size,
433                    (unsigned int) utype);
434   iret = proc (proc_cls,
435                &key,
436                size,
437                data,
438                (enum GNUNET_BLOCK_Type) utype,
439                priority,
440                anonymity,
441                replication,
442                expiration_time,
443                rowid);
444   PQclear (res);
445   if (iret == GNUNET_NO)
446   {
447     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448                 "Processor asked for item %u to be removed.\n",
449                 (unsigned int) rowid);
450     if (GNUNET_OK ==
451         GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
452                                          "delrow",
453                                          rowid))
454     {
455       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
456                        "datastore-postgres",
457                        "Deleting %u bytes from database\n",
458                        (unsigned int) size);
459       plugin->env->duc (plugin->env->cls,
460                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
461       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
462                        "datastore-postgres",
463                        "Deleted %u bytes from database\n",
464                        (unsigned int) size);
465     }
466   }
467 }
468
469
470 /**
471  * Get one of the results for a particular key in the datastore.
472  *
473  * @param cls closure with the 'struct Plugin'
474  * @param next_uid return the result with lowest uid >= next_uid
475  * @param random if true, return a random result instead of using next_uid
476  * @param key maybe NULL (to match all entries)
477  * @param type entries of which type are relevant?
478  *     Use 0 for any type.
479  * @param proc function to call on the matching value;
480  *        will be called with NULL if nothing matches
481  * @param proc_cls closure for @a proc
482  */
483 static void
484 postgres_plugin_get_key (void *cls,
485                          uint64_t next_uid,
486                          bool random,
487                          const struct GNUNET_HashCode *key,
488                          enum GNUNET_BLOCK_Type type,
489                          PluginDatumProcessor proc,
490                          void *proc_cls)
491 {
492   struct Plugin *plugin = cls;
493   uint32_t utype = type;
494   uint16_t use_rvalue = random;
495   uint16_t use_key = NULL != key;
496   uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
497   uint64_t rvalue;
498   struct GNUNET_PQ_QueryParam params[] = {
499     GNUNET_PQ_query_param_uint64 (&next_uid),
500     GNUNET_PQ_query_param_uint64 (&rvalue),
501     GNUNET_PQ_query_param_uint16 (&use_rvalue),
502     GNUNET_PQ_query_param_auto_from_type (key),
503     GNUNET_PQ_query_param_uint16 (&use_key),
504     GNUNET_PQ_query_param_uint32 (&utype),
505     GNUNET_PQ_query_param_uint16 (&use_type),
506     GNUNET_PQ_query_param_end
507   };
508   PGresult *ret;
509
510   if (random)
511   {
512     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
513                                        UINT64_MAX);
514     next_uid = 0;
515   }
516   else
517     rvalue = 0;
518
519   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
520                                  "get",
521                                  params);
522   process_result (plugin,
523                   proc,
524                   proc_cls,
525                   ret,
526                   __FILE__, __LINE__);
527 }
528
529
530 /**
531  * Select a subset of the items in the datastore and call
532  * the given iterator for each of them.
533  *
534  * @param cls our `struct Plugin *`
535  * @param next_uid return the result with lowest uid >= next_uid
536  * @param type entries of which type should be considered?
537  *        Must not be zero (ANY).
538  * @param proc function to call on the matching value;
539  *        will be called with NULL if no value matches
540  * @param proc_cls closure for @a proc
541  */
542 static void
543 postgres_plugin_get_zero_anonymity (void *cls,
544                                     uint64_t next_uid,
545                                     enum GNUNET_BLOCK_Type type,
546                                     PluginDatumProcessor proc,
547                                     void *proc_cls)
548 {
549   struct Plugin *plugin = cls;
550   uint32_t utype = type;
551   struct GNUNET_PQ_QueryParam params[] = {
552     GNUNET_PQ_query_param_uint32 (&utype),
553     GNUNET_PQ_query_param_uint64 (&next_uid),
554     GNUNET_PQ_query_param_end
555   };
556   PGresult *ret;
557
558   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
559                                  "select_non_anonymous",
560                                  params);
561
562   process_result (plugin,
563                   proc, proc_cls,
564                   ret,
565                   __FILE__, __LINE__);
566 }
567
568
569 /**
570  * Context for #repl_iter() function.
571  */
572 struct ReplCtx
573 {
574
575   /**
576    * Plugin handle.
577    */
578   struct Plugin *plugin;
579
580   /**
581    * Function to call for the result (or the NULL).
582    */
583   PluginDatumProcessor proc;
584
585   /**
586    * Closure for @e proc.
587    */
588   void *proc_cls;
589 };
590
591
592 /**
593  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
594  * Decrements the replication counter and calls the original
595  * iterator.
596  *
597  * @param cls closure with the `struct ReplCtx *`
598  * @param key key for the content
599  * @param size number of bytes in @a data
600  * @param data content stored
601  * @param type type of the content
602  * @param priority priority of the content
603  * @param anonymity anonymity-level for the content
604  * @param replication replication-level for the content
605  * @param expiration expiration time for the content
606  * @param uid unique identifier for the datum;
607  *        maybe 0 if no unique identifier is available
608  * @return #GNUNET_SYSERR to abort the iteration,
609  *         #GNUNET_OK to continue
610  *         (continue on call to "next", of course),
611  *         #GNUNET_NO to delete the item and continue (if supported)
612  */
613 static int
614 repl_proc (void *cls,
615            const struct GNUNET_HashCode *key,
616            uint32_t size,
617            const void *data,
618            enum GNUNET_BLOCK_Type type,
619            uint32_t priority,
620            uint32_t anonymity,
621            uint32_t replication,
622            struct GNUNET_TIME_Absolute expiration,
623            uint64_t uid)
624 {
625   struct ReplCtx *rc = cls;
626   struct Plugin *plugin = rc->plugin;
627   int ret;
628   uint32_t oid = (uint32_t) uid;
629   struct GNUNET_PQ_QueryParam params[] = {
630     GNUNET_PQ_query_param_uint32 (&oid),
631     GNUNET_PQ_query_param_end
632   };
633   PGresult *qret;
634
635   ret = rc->proc (rc->proc_cls,
636                   key,
637                   size,
638                   data,
639                   type,
640                   priority,
641                   anonymity,
642                   replication,
643                   expiration,
644                   uid);
645   if (NULL == key)
646     return ret;
647   qret = GNUNET_PQ_exec_prepared (plugin->dbh,
648                                   "decrepl",
649                                   params);
650   if (GNUNET_OK !=
651       GNUNET_POSTGRES_check_result (plugin->dbh,
652                                     qret,
653                                     PGRES_COMMAND_OK,
654                                     "PQexecPrepared",
655                                     "decrepl"))
656     return GNUNET_SYSERR;
657   PQclear (qret);
658   return ret;
659 }
660
661
662 /**
663  * Get a random item for replication.  Returns a single, not expired,
664  * random item from those with the highest replication counters.  The
665  * item's replication counter is decremented by one IF it was positive
666  * before.  Call @a proc with all values ZERO or NULL if the datastore
667  * is empty.
668  *
669  * @param cls closure with the `struct Plugin`
670  * @param proc function to call the value (once only).
671  * @param proc_cls closure for @a proc
672  */
673 static void
674 postgres_plugin_get_replication (void *cls,
675                                  PluginDatumProcessor proc,
676                                  void *proc_cls)
677 {
678   struct Plugin *plugin = cls;
679   struct ReplCtx rc;
680   PGresult *ret;
681
682   rc.plugin = plugin;
683   rc.proc = proc;
684   rc.proc_cls = proc_cls;
685   ret = PQexecPrepared (plugin->dbh,
686                         "select_replication_order", 0, NULL, NULL,
687                         NULL, 1);
688   process_result (plugin,
689                   &repl_proc,
690                   &rc,
691                   ret,
692                   __FILE__, __LINE__);
693 }
694
695
696 /**
697  * Get a random item for expiration.  Call @a proc with all values
698  * ZERO or NULL if the datastore is empty.
699  *
700  * @param cls closure with the `struct Plugin`
701  * @param proc function to call the value (once only).
702  * @param proc_cls closure for @a proc
703  */
704 static void
705 postgres_plugin_get_expiration (void *cls,
706                                 PluginDatumProcessor proc,
707                                 void *proc_cls)
708 {
709   struct Plugin *plugin = cls;
710   struct GNUNET_TIME_Absolute now;
711   struct GNUNET_PQ_QueryParam params[] = {
712     GNUNET_PQ_query_param_absolute_time (&now),
713     GNUNET_PQ_query_param_end
714   };
715   PGresult *ret;
716
717   now = GNUNET_TIME_absolute_get ();
718   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
719                                  "select_expiration_order",
720                                  params);
721   process_result (plugin,
722                   proc, proc_cls,
723                   ret,
724                   __FILE__, __LINE__);
725 }
726
727
728 /**
729  * Get all of the keys in the datastore.
730  *
731  * @param cls closure with the `struct Plugin *`
732  * @param proc function to call on each key
733  * @param proc_cls closure for @a proc
734  */
735 static void
736 postgres_plugin_get_keys (void *cls,
737                           PluginKeyProcessor proc,
738                           void *proc_cls)
739 {
740   struct Plugin *plugin = cls;
741   int ret;
742   int i;
743   struct GNUNET_HashCode key;
744   PGresult * res;
745
746   res = PQexecPrepared (plugin->dbh,
747                         "get_keys",
748                         0, NULL, NULL, NULL, 1);
749   ret = PQntuples (res);
750   for (i=0;i<ret;i++)
751   {
752     if (sizeof (struct GNUNET_HashCode) !=
753         PQgetlength (res, i, 0))
754     {
755       GNUNET_memcpy (&key,
756               PQgetvalue (res, i, 0),
757               sizeof (struct GNUNET_HashCode));
758       proc (proc_cls, &key, 1);
759     }
760   }
761   PQclear (res);
762   proc (proc_cls, NULL, 0);
763 }
764
765
766 /**
767  * Drop database.
768  *
769  * @param cls closure with the `struct Plugin *`
770  */
771 static void
772 postgres_plugin_drop (void *cls)
773 {
774   struct Plugin *plugin = cls;
775
776   if (GNUNET_OK !=
777       GNUNET_POSTGRES_exec (plugin->dbh,
778                             "DROP TABLE gn090"))
779     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
780                      "postgres",
781                      _("Failed to drop table from database.\n"));
782 }
783
784
785 /**
786  * Remove a particular key in the datastore.
787  *
788  * @param cls closure
789  * @param key key for the content
790  * @param size number of bytes in data
791  * @param data content stored
792  * @param cont continuation called with success or failure status
793  * @param cont_cls continuation closure for @a cont
794  */
795 static void
796 postgres_plugin_remove_key (void *cls,
797                             const struct GNUNET_HashCode *key,
798                             uint32_t size,
799                             const void *data,
800                             PluginRemoveCont cont,
801                             void *cont_cls)
802 {
803   struct Plugin *plugin = cls;
804   enum GNUNET_PQ_QueryStatus ret;
805   struct GNUNET_PQ_QueryParam params[] = {
806     GNUNET_PQ_query_param_auto_from_type (key),
807     GNUNET_PQ_query_param_fixed_size (data, size),
808     GNUNET_PQ_query_param_end
809   };
810
811   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
812                                             "remove",
813                                             params);
814   if (0 > ret)
815   {
816     cont (cont_cls,
817           key,
818           size,
819           GNUNET_SYSERR,
820           _("Postgress exec failure"));
821     return;
822   }
823   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS == ret)
824   {
825     cont (cont_cls,
826           key,
827           size,
828           GNUNET_NO,
829           NULL);
830     return;
831   }
832   plugin->env->duc (plugin->env->cls,
833                     - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
834   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
835                    "datastore-postgres",
836                    "Deleted %u bytes from database\n",
837                    (unsigned int) size);
838   cont (cont_cls,
839         key,
840         size,
841         GNUNET_OK,
842         NULL);
843 }
844
845
846 /**
847  * Entry point for the plugin.
848  *
849  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
850  * @return our `struct Plugin *`
851  */
852 void *
853 libgnunet_plugin_datastore_postgres_init (void *cls)
854 {
855   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
856   struct GNUNET_DATASTORE_PluginFunctions *api;
857   struct Plugin *plugin;
858
859   plugin = GNUNET_new (struct Plugin);
860   plugin->env = env;
861   if (GNUNET_OK != init_connection (plugin))
862   {
863     GNUNET_free (plugin);
864     return NULL;
865   }
866   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
867   api->cls = plugin;
868   api->estimate_size = &postgres_plugin_estimate_size;
869   api->put = &postgres_plugin_put;
870   api->get_key = &postgres_plugin_get_key;
871   api->get_replication = &postgres_plugin_get_replication;
872   api->get_expiration = &postgres_plugin_get_expiration;
873   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
874   api->get_keys = &postgres_plugin_get_keys;
875   api->drop = &postgres_plugin_drop;
876   api->remove_key = &postgres_plugin_remove_key;
877   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
878                    "datastore-postgres",
879                    _("Postgres database running\n"));
880   return api;
881 }
882
883
884 /**
885  * Exit point from the plugin.
886  *
887  * @param cls our `struct Plugin *`
888  * @return always NULL
889  */
890 void *
891 libgnunet_plugin_datastore_postgres_done (void *cls)
892 {
893   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
894   struct Plugin *plugin = api->cls;
895
896   PQfinish (plugin->dbh);
897   GNUNET_free (plugin);
898   GNUNET_free (api);
899   return NULL;
900 }
901
902 /* end of plugin_datastore_postgres.c */