-indentation
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include <postgresql/libpq-fe.h>
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   PGresult *ret;
74
75   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
76   if (NULL == plugin->dbh)
77     return GNUNET_SYSERR;
78
79   ret =
80       PQexec (plugin->dbh,
81               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
82               "  type INTEGER NOT NULL DEFAULT 0,"
83               "  prio INTEGER NOT NULL DEFAULT 0,"
84               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
85               "  expire BIGINT NOT NULL DEFAULT 0,"
86               "  rvalue BIGINT NOT NULL DEFAULT 0,"
87               "  hash BYTEA NOT NULL DEFAULT '',"
88               "  vhash BYTEA NOT NULL DEFAULT '',"
89               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
90   if ( (NULL == ret) ||
91        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
92         (0 != strcmp ("42P07",    /* duplicate table */
93                       PQresultErrorField
94                       (ret,
95                        PG_DIAG_SQLSTATE)))))
96   {
97     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
98                                          ret,
99                                          PGRES_COMMAND_OK,
100                                          "CREATE TABLE",
101                                          "gn090");
102     PQfinish (plugin->dbh);
103     plugin->dbh = NULL;
104     return GNUNET_SYSERR;
105   }
106
107   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
108   {
109     if ((GNUNET_OK !=
110          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
111         (GNUNET_OK !=
112          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
113         (GNUNET_OK !=
114          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
115         (GNUNET_OK !=
116          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
117         (GNUNET_OK !=
118          GNUNET_POSTGRES_exec (plugin->dbh,
119                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
120         (GNUNET_OK !=
121          GNUNET_POSTGRES_exec (plugin->dbh,
122                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
123         (GNUNET_OK !=
124          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
125         (GNUNET_OK !=
126          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
127     {
128       PQclear (ret);
129       PQfinish (plugin->dbh);
130       plugin->dbh = NULL;
131       return GNUNET_SYSERR;
132     }
133   }
134   PQclear (ret);
135
136   ret =
137       PQexec (plugin->dbh,
138               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
139   if (GNUNET_OK !=
140       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
141   {
142     PQfinish (plugin->dbh);
143     plugin->dbh = NULL;
144     return GNUNET_SYSERR;
145   }
146   PQclear (ret);
147   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
148   if (GNUNET_OK !=
149       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
150   {
151     PQfinish (plugin->dbh);
152     plugin->dbh = NULL;
153     return GNUNET_SYSERR;
154   }
155   PQclear (ret);
156   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
157   if (GNUNET_OK !=
158       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
159   {
160     PQfinish (plugin->dbh);
161     plugin->dbh = NULL;
162     return GNUNET_SYSERR;
163   }
164   PQclear (ret);
165   if ((GNUNET_OK !=
166        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
167                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
168                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
169                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
170       (GNUNET_OK !=
171        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
172                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
173                    "WHERE hash=$1 AND type=$2 "
174                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
175       (GNUNET_OK !=
176        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
177                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
178                    "WHERE hash=$1 AND vhash=$2 "
179                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
180       (GNUNET_OK !=
181        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
182                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
183                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
184       (GNUNET_OK !=
185        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
186                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
187                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
188       (GNUNET_OK !=
189        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
190                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
191                    "WHERE oid = $3", 3)) ||
192       (GNUNET_OK !=
193        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
194                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
195                    "WHERE oid = $1", 1)) ||
196       (GNUNET_OK !=
197        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
198                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
199                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
200                    1)) ||
201       (GNUNET_OK !=
202        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
203                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
204                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
205                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
206                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
207                    1)) ||
208       (GNUNET_OK !=
209        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
210                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
211                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
212       (GNUNET_OK !=
213        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
214       (GNUNET_OK !=
215        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
216   {
217     PQfinish (plugin->dbh);
218     plugin->dbh = NULL;
219     return GNUNET_SYSERR;
220   }
221   return GNUNET_OK;
222 }
223
224
225 /**
226  * Get an estimate of how much space the database is
227  * currently using.
228  *
229  * @param cls our `struct Plugin *`
230  * @return number of bytes used on disk
231  */
232 static unsigned long long
233 postgres_plugin_estimate_size (void *cls)
234 {
235   struct Plugin *plugin = cls;
236   unsigned long long total;
237   PGresult *ret;
238
239   ret =
240       PQexecParams (plugin->dbh,
241                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
242                     NULL, NULL, NULL, NULL, 1);
243   if (GNUNET_OK !=
244       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
245   {
246     return 0;
247   }
248   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
249   {
250     GNUNET_break (0);
251     PQclear (ret);
252     return 0;
253   }
254   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
255   {
256     GNUNET_break (0 == PQgetlength (ret, 0, 0));
257     PQclear (ret);
258     return 0;
259   }
260   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
261   PQclear (ret);
262   return total;
263 }
264
265
266 /**
267  * Store an item in the datastore.
268  *
269  * @param cls closure with the `struct Plugin`
270  * @param key key for the item
271  * @param size number of bytes in data
272  * @param data content stored
273  * @param type type of the content
274  * @param priority priority of the content
275  * @param anonymity anonymity-level for the content
276  * @param replication replication-level for the content
277  * @param expiration expiration time for the content
278  * @param msg set to error message
279  * @return #GNUNET_OK on success
280  */
281 static int
282 postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
283                      const void *data, enum GNUNET_BLOCK_Type type,
284                      uint32_t priority, uint32_t anonymity,
285                      uint32_t replication,
286                      struct GNUNET_TIME_Absolute expiration, char **msg)
287 {
288   struct Plugin *plugin = cls;
289   struct GNUNET_HashCode vhash;
290   PGresult *ret;
291   uint32_t btype = htonl (type);
292   uint32_t bprio = htonl (priority);
293   uint32_t banon = htonl (anonymity);
294   uint32_t brepl = htonl (replication);
295   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value_us__;
296
297   const char *paramValues[] = {
298     (const char *) &brepl,
299     (const char *) &btype,
300     (const char *) &bprio,
301     (const char *) &banon,
302     (const char *) &bexpi,
303     (const char *) key,
304     (const char *) &vhash,
305     (const char *) data
306   };
307   int paramLengths[] = {
308     sizeof (brepl),
309     sizeof (btype),
310     sizeof (bprio),
311     sizeof (banon),
312     sizeof (bexpi),
313     sizeof (struct GNUNET_HashCode),
314     sizeof (struct GNUNET_HashCode),
315     size
316   };
317   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
318
319   GNUNET_CRYPTO_hash (data, size, &vhash);
320   ret =
321       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
322                       paramFormats, 1);
323   if (GNUNET_OK !=
324       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
325     return GNUNET_SYSERR;
326   PQclear (ret);
327   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
328   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
329                    "Stored %u bytes in database\n", (unsigned int) size);
330   return GNUNET_OK;
331 }
332
333
334 /**
335  * Function invoked to process the result and call
336  * the processor.
337  *
338  * @param plugin global plugin data
339  * @param proc function to call the value (once only).
340  * @param proc_cls closure for proc
341  * @param res result from exec
342  * @param filename filename for error messages
343  * @param line line number for error messages
344  */
345 static void
346 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
347                 void *proc_cls, PGresult * res,
348                 const char *filename, int line)
349 {
350   int iret;
351   enum GNUNET_BLOCK_Type type;
352   uint32_t anonymity;
353   uint32_t priority;
354   uint32_t size;
355   unsigned int rowid;
356   struct GNUNET_TIME_Absolute expiration_time;
357   struct GNUNET_HashCode key;
358
359   if (GNUNET_OK !=
360       GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
361                                      filename, line))
362   {
363     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
364                      "Ending iteration (postgres error)\n");
365     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
366     return;
367   }
368
369   if (0 == PQntuples (res))
370   {
371     /* no result */
372     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
373                      "Ending iteration (no more results)\n");
374     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
375     PQclear (res);
376     return;
377   }
378   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
379       (sizeof (uint32_t) != PQfsize (res, 0)) ||
380       (sizeof (uint32_t) != PQfsize (res, 6)))
381   {
382     GNUNET_break (0);
383     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
384     PQclear (res);
385     return;
386   }
387   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
388   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
389       (sizeof (uint32_t) != PQfsize (res, 1)) ||
390       (sizeof (uint32_t) != PQfsize (res, 2)) ||
391       (sizeof (uint64_t) != PQfsize (res, 3)) ||
392       (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 4)))
393   {
394     GNUNET_break (0);
395     PQclear (res);
396     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
397     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
398     return;
399   }
400
401   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
402   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
403   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
404   expiration_time.abs_value_us =
405       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
406   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (struct GNUNET_HashCode));
407   size = PQgetlength (res, 0, 5);
408   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
409                    "Found result of size %u bytes and type %u in database\n",
410                    (unsigned int) size, (unsigned int) type);
411   iret =
412       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
413             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
414             rowid);
415   PQclear (res);
416   if (iret == GNUNET_NO)
417   {
418     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419                 "Processor asked for item %u to be removed.\n", rowid);
420     if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
421     {
422       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
423                        "Deleting %u bytes from database\n",
424                        (unsigned int) size);
425       plugin->env->duc (plugin->env->cls,
426                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
427       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
428                        "Deleted %u bytes from database\n", (unsigned int) size);
429     }
430   }
431 }
432
433
434 /**
435  * Iterate over the results for a particular key
436  * in the datastore.
437  *
438  * @param cls closure with the 'struct Plugin'
439  * @param offset offset of the result (modulo num-results);
440  *        specific ordering does not matter for the offset
441  * @param key maybe NULL (to match all entries)
442  * @param vhash hash of the value, maybe NULL (to
443  *        match all values that have the right key).
444  *        Note that for DBlocks there is no difference
445  *        betwen key and vhash, but for other blocks
446  *        there may be!
447  * @param type entries of which type are relevant?
448  *     Use 0 for any type.
449  * @param proc function to call on the matching value;
450  *        will be called once with a NULL if no value matches
451  * @param proc_cls closure for iter
452  */
453 static void
454 postgres_plugin_get_key (void *cls, uint64_t offset,
455                          const struct GNUNET_HashCode * key,
456                          const struct GNUNET_HashCode * vhash,
457                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
458                          void *proc_cls)
459 {
460   struct Plugin *plugin = cls;
461   const int paramFormats[] = { 1, 1, 1, 1, 1 };
462   int paramLengths[4];
463   const char *paramValues[4];
464   int nparams;
465   const char *pname;
466   PGresult *ret;
467   uint64_t total;
468   uint64_t blimit_off;
469   uint32_t btype;
470
471   GNUNET_assert (key != NULL);
472   paramValues[0] = (const char *) key;
473   paramLengths[0] = sizeof (struct GNUNET_HashCode);
474   btype = htonl (type);
475   if (type != 0)
476   {
477     if (vhash != NULL)
478     {
479       paramValues[1] = (const char *) vhash;
480       paramLengths[1] = sizeof (struct GNUNET_HashCode);
481       paramValues[2] = (const char *) &btype;
482       paramLengths[2] = sizeof (btype);
483       paramValues[3] = (const char *) &blimit_off;
484       paramLengths[3] = sizeof (blimit_off);
485       nparams = 4;
486       pname = "getvt";
487       ret =
488           PQexecParams (plugin->dbh,
489                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
490                         3, NULL, paramValues, paramLengths, paramFormats, 1);
491     }
492     else
493     {
494       paramValues[1] = (const char *) &btype;
495       paramLengths[1] = sizeof (btype);
496       paramValues[2] = (const char *) &blimit_off;
497       paramLengths[2] = sizeof (blimit_off);
498       nparams = 3;
499       pname = "gett";
500       ret =
501           PQexecParams (plugin->dbh,
502                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
503                         2, NULL, paramValues, paramLengths, paramFormats, 1);
504     }
505   }
506   else
507   {
508     if (vhash != NULL)
509     {
510       paramValues[1] = (const char *) vhash;
511       paramLengths[1] = sizeof (struct GNUNET_HashCode);
512       paramValues[2] = (const char *) &blimit_off;
513       paramLengths[2] = sizeof (blimit_off);
514       nparams = 3;
515       pname = "getv";
516       ret =
517           PQexecParams (plugin->dbh,
518                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
519                         2, NULL, paramValues, paramLengths, paramFormats, 1);
520     }
521     else
522     {
523       paramValues[1] = (const char *) &blimit_off;
524       paramLengths[1] = sizeof (blimit_off);
525       nparams = 2;
526       pname = "get";
527       ret =
528           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
529                         1, NULL, paramValues, paramLengths, paramFormats, 1);
530     }
531   }
532   if (GNUNET_OK !=
533       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
534   {
535     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
536     return;
537   }
538   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
539       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
540   {
541     GNUNET_break (0);
542     PQclear (ret);
543     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
544     return;
545   }
546   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
547   PQclear (ret);
548   if (total == 0)
549   {
550     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
551     return;
552   }
553   blimit_off = GNUNET_htonll (offset % total);
554   ret =
555       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
556                       paramFormats, 1);
557   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
558 }
559
560
561 /**
562  * Select a subset of the items in the datastore and call
563  * the given iterator for each of them.
564  *
565  * @param cls our "struct Plugin*"
566  * @param offset offset of the result (modulo num-results);
567  *        specific ordering does not matter for the offset
568  * @param type entries of which type should be considered?
569  *        Use 0 for any type.
570  * @param proc function to call on the matching value;
571  *        will be called with a NULL if no value matches
572  * @param proc_cls closure for proc
573  */
574 static void
575 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
576                                     enum GNUNET_BLOCK_Type type,
577                                     PluginDatumProcessor proc, void *proc_cls)
578 {
579   struct Plugin *plugin = cls;
580   uint32_t btype;
581   uint64_t boff;
582   const int paramFormats[] = { 1, 1 };
583   int paramLengths[] = { sizeof (btype), sizeof (boff) };
584   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
585   PGresult *ret;
586
587   btype = htonl ((uint32_t) type);
588   boff = GNUNET_htonll (offset);
589   ret =
590       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
591                       paramLengths, paramFormats, 1);
592   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
593 }
594
595
596 /**
597  * Context for 'repl_iter' function.
598  */
599 struct ReplCtx
600 {
601
602   /**
603    * Plugin handle.
604    */
605   struct Plugin *plugin;
606
607   /**
608    * Function to call for the result (or the NULL).
609    */
610   PluginDatumProcessor proc;
611
612   /**
613    * Closure for proc.
614    */
615   void *proc_cls;
616 };
617
618
619 /**
620  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
621  * Decrements the replication counter and calls the original
622  * iterator.
623  *
624  * @param cls closure with the 'struct ReplCtx*'
625  * @param key key for the content
626  * @param size number of bytes in data
627  * @param data content stored
628  * @param type type of the content
629  * @param priority priority of the content
630  * @param anonymity anonymity-level for the content
631  * @param expiration expiration time for the content
632  * @param uid unique identifier for the datum;
633  *        maybe 0 if no unique identifier is available
634  *
635  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
636  *         (continue on call to "next", of course),
637  *         GNUNET_NO to delete the item and continue (if supported)
638  */
639 static int
640 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
641            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
642            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
643            uint64_t uid)
644 {
645   struct ReplCtx *rc = cls;
646   struct Plugin *plugin = rc->plugin;
647   int ret;
648   PGresult *qret;
649   uint32_t boid;
650
651   ret =
652       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
653                 expiration, uid);
654   if (NULL != key)
655   {
656     boid = htonl ((uint32_t) uid);
657     const char *paramValues[] = {
658       (const char *) &boid,
659     };
660     int paramLengths[] = {
661       sizeof (boid),
662     };
663     const int paramFormats[] = { 1 };
664     qret =
665         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
666                         paramFormats, 1);
667     if (GNUNET_OK !=
668         GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
669                       "decrepl"))
670       return GNUNET_SYSERR;
671     PQclear (qret);
672   }
673   return ret;
674 }
675
676
677 /**
678  * Get a random item for replication.  Returns a single, not expired, random item
679  * from those with the highest replication counters.  The item's
680  * replication counter is decremented by one IF it was positive before.
681  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
682  *
683  * @param cls closure with the 'struct Plugin'
684  * @param proc function to call the value (once only).
685  * @param proc_cls closure for proc
686  */
687 static void
688 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
689                                  void *proc_cls)
690 {
691   struct Plugin *plugin = cls;
692   struct ReplCtx rc;
693   PGresult *ret;
694
695   rc.plugin = plugin;
696   rc.proc = proc;
697   rc.proc_cls = proc_cls;
698   ret =
699       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
700                       NULL, 1);
701   process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
702 }
703
704
705 /**
706  * Get a random item for expiration.
707  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
708  *
709  * @param cls closure with the 'struct Plugin'
710  * @param proc function to call the value (once only).
711  * @param proc_cls closure for proc
712  */
713 static void
714 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
715                                 void *proc_cls)
716 {
717   struct Plugin *plugin = cls;
718   uint64_t btime;
719   const int paramFormats[] = { 1 };
720   int paramLengths[] = { sizeof (btime) };
721   const char *paramValues[] = { (const char *) &btime };
722   PGresult *ret;
723
724   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value_us);
725   ret =
726       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
727                       paramLengths, paramFormats, 1);
728   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
729 }
730
731
732 /**
733  * Update the priority for a particular key in the datastore.  If
734  * the expiration time in value is different than the time found in
735  * the datastore, the higher value should be kept.  For the
736  * anonymity level, the lower value is to be used.  The specified
737  * priority should be added to the existing priority, ignoring the
738  * priority in value.
739  *
740  * Note that it is possible for multiple values to match this put.
741  * In that case, all of the respective values are updated.
742  *
743  * @param cls our "struct Plugin*"
744  * @param uid unique identifier of the datum
745  * @param delta by how much should the priority
746  *     change?  If priority + delta < 0 the
747  *     priority should be set to 0 (never go
748  *     negative).
749  * @param expire new expiration time should be the
750  *     MAX of any existing expiration time and
751  *     this value
752  * @param msg set to error message
753  * @return GNUNET_OK on success
754  */
755 static int
756 postgres_plugin_update (void *cls, uint64_t uid, int delta,
757                         struct GNUNET_TIME_Absolute expire, char **msg)
758 {
759   struct Plugin *plugin = cls;
760   PGresult *ret;
761   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
762   uint32_t boid = htonl ((uint32_t) uid);
763   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value_us__;
764
765   const char *paramValues[] = {
766     (const char *) &bdelta,
767     (const char *) &bexpire,
768     (const char *) &boid,
769   };
770   int paramLengths[] = {
771     sizeof (bdelta),
772     sizeof (bexpire),
773     sizeof (boid),
774   };
775   const int paramFormats[] = { 1, 1, 1 };
776
777   ret =
778       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
779                       paramFormats, 1);
780   if (GNUNET_OK !=
781       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
782     return GNUNET_SYSERR;
783   PQclear (ret);
784   return GNUNET_OK;
785 }
786
787
788
789 /**
790  * Get all of the keys in the datastore.
791  *
792  * @param cls closure with the 'struct Plugin'
793  * @param proc function to call on each key
794  * @param proc_cls closure for proc
795  */
796 static void
797 postgres_plugin_get_keys (void *cls,
798                           PluginKeyProcessor proc,
799                           void *proc_cls)
800 {
801   struct Plugin *plugin = cls;
802   int ret;
803   int i;
804   struct GNUNET_HashCode key;
805   PGresult * res;
806
807   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
808   ret = PQntuples (res);
809   for (i=0;i<ret;i++)
810   {
811     if (sizeof (struct GNUNET_HashCode) != PQgetlength (res, i, 0))
812     {
813       memcpy (&key, PQgetvalue (res, i, 0), sizeof (struct GNUNET_HashCode));
814       proc (proc_cls, &key, 1);
815     }
816   }
817   PQclear (res);
818 }
819
820
821
822 /**
823  * Drop database.
824  *
825  * @param cls closure with the 'struct Plugin'
826  */
827 static void
828 postgres_plugin_drop (void *cls)
829 {
830   struct Plugin *plugin = cls;
831
832   if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
833     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
834 }
835
836
837 /**
838  * Entry point for the plugin.
839  *
840  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
841  * @return our "struct Plugin*"
842  */
843 void *
844 libgnunet_plugin_datastore_postgres_init (void *cls)
845 {
846   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
847   struct GNUNET_DATASTORE_PluginFunctions *api;
848   struct Plugin *plugin;
849
850   plugin = GNUNET_new (struct Plugin);
851   plugin->env = env;
852   if (GNUNET_OK != init_connection (plugin))
853   {
854     GNUNET_free (plugin);
855     return NULL;
856   }
857   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
858   api->cls = plugin;
859   api->estimate_size = &postgres_plugin_estimate_size;
860   api->put = &postgres_plugin_put;
861   api->update = &postgres_plugin_update;
862   api->get_key = &postgres_plugin_get_key;
863   api->get_replication = &postgres_plugin_get_replication;
864   api->get_expiration = &postgres_plugin_get_expiration;
865   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
866   api->get_keys = &postgres_plugin_get_keys;
867   api->drop = &postgres_plugin_drop;
868   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
869                    _("Postgres database running\n"));
870   return api;
871 }
872
873
874 /**
875  * Exit point from the plugin.
876  * @param cls our "struct Plugin*"
877  * @return always NULL
878  */
879 void *
880 libgnunet_plugin_datastore_postgres_done (void *cls)
881 {
882   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
883   struct Plugin *plugin = api->cls;
884
885   PQfinish (plugin->dbh);
886   GNUNET_free (plugin);
887   GNUNET_free (api);
888   return NULL;
889 }
890
891 /* end of plugin_datastore_postgres.c */