-keep track of messages passed to mq
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30
31
32 /**
33  * After how many ms "busy" should a DB operation fail for good?
34  * A low value makes sure that we are more responsive to requests
35  * (especially PUTs).  A high value guarantees a higher success
36  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
37  *
38  * The default value of 1s should ensure that users do not experience
39  * huge latencies while at the same time allowing operations to succeed
40  * with reasonable probability.
41  */
42 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
43
44
45 /**
46  * Context for all functions in this plugin.
47  */
48 struct Plugin
49 {
50   /**
51    * Our execution environment.
52    */
53   struct GNUNET_DATASTORE_PluginEnvironment *env;
54
55   /**
56    * Native Postgres database handle.
57    */
58   PGconn *dbh;
59
60 };
61
62
63 /**
64  * @brief Get a database handle
65  *
66  * @param plugin global context
67  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
68  */
69 static int
70 init_connection (struct Plugin *plugin)
71 {
72   PGresult *ret;
73
74   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
75   if (NULL == plugin->dbh)
76     return GNUNET_SYSERR;
77
78   ret =
79       PQexec (plugin->dbh,
80               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
81               "  type INTEGER NOT NULL DEFAULT 0,"
82               "  prio INTEGER NOT NULL DEFAULT 0,"
83               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
84               "  expire BIGINT NOT NULL DEFAULT 0,"
85               "  rvalue BIGINT NOT NULL DEFAULT 0,"
86               "  hash BYTEA NOT NULL DEFAULT '',"
87               "  vhash BYTEA NOT NULL DEFAULT '',"
88               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
89   if ( (NULL == ret) ||
90        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
91         (0 != strcmp ("42P07",    /* duplicate table */
92                       PQresultErrorField
93                       (ret,
94                        PG_DIAG_SQLSTATE)))))
95   {
96     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
97                                          ret,
98                                          PGRES_COMMAND_OK,
99                                          "CREATE TABLE",
100                                          "gn090");
101     PQfinish (plugin->dbh);
102     plugin->dbh = NULL;
103     return GNUNET_SYSERR;
104   }
105
106   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
107   {
108     if ((GNUNET_OK !=
109          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
110         (GNUNET_OK !=
111          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
112         (GNUNET_OK !=
113          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
114         (GNUNET_OK !=
115          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
116         (GNUNET_OK !=
117          GNUNET_POSTGRES_exec (plugin->dbh,
118                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
119         (GNUNET_OK !=
120          GNUNET_POSTGRES_exec (plugin->dbh,
121                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
122         (GNUNET_OK !=
123          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
124         (GNUNET_OK !=
125          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
126     {
127       PQclear (ret);
128       PQfinish (plugin->dbh);
129       plugin->dbh = NULL;
130       return GNUNET_SYSERR;
131     }
132   }
133   PQclear (ret);
134
135   ret =
136       PQexec (plugin->dbh,
137               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
138   if (GNUNET_OK !=
139       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
140   {
141     PQfinish (plugin->dbh);
142     plugin->dbh = NULL;
143     return GNUNET_SYSERR;
144   }
145   PQclear (ret);
146   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
147   if (GNUNET_OK !=
148       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
149   {
150     PQfinish (plugin->dbh);
151     plugin->dbh = NULL;
152     return GNUNET_SYSERR;
153   }
154   PQclear (ret);
155   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
156   if (GNUNET_OK !=
157       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
158   {
159     PQfinish (plugin->dbh);
160     plugin->dbh = NULL;
161     return GNUNET_SYSERR;
162   }
163   PQclear (ret);
164   if ((GNUNET_OK !=
165        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
166                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
167                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
168                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
169       (GNUNET_OK !=
170        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
171                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
172                    "WHERE hash=$1 AND type=$2 "
173                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
174       (GNUNET_OK !=
175        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
176                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
177                    "WHERE hash=$1 AND vhash=$2 "
178                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
179       (GNUNET_OK !=
180        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
181                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
182                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
183       (GNUNET_OK !=
184        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
185                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
186                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
187       (GNUNET_OK !=
188        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
189                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
190                    "WHERE oid = $3", 3)) ||
191       (GNUNET_OK !=
192        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
193                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
194                    "WHERE oid = $1", 1)) ||
195       (GNUNET_OK !=
196        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
197                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
198                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
199                    1)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
202                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
203                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
204                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
205                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
206                    1)) ||
207       (GNUNET_OK !=
208        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
209                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
210                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
211       (GNUNET_OK !=
212        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
213       (GNUNET_OK !=
214        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
215   {
216     PQfinish (plugin->dbh);
217     plugin->dbh = NULL;
218     return GNUNET_SYSERR;
219   }
220   return GNUNET_OK;
221 }
222
223
224 /**
225  * Get an estimate of how much space the database is
226  * currently using.
227  *
228  * @param cls our `struct Plugin *`
229  * @return number of bytes used on disk
230  */
231 static void
232 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
233 {
234   struct Plugin *plugin = cls;
235   unsigned long long total;
236   PGresult *ret;
237
238   if (NULL == estimate)
239     return;
240   ret =
241       PQexecParams (plugin->dbh,
242                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
243                     NULL, NULL, NULL, NULL, 1);
244   if (GNUNET_OK !=
245       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
246   {
247     *estimate = 0;
248     return;
249   }
250   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
251   {
252     GNUNET_break (0);
253     PQclear (ret);
254     *estimate = 0;
255     return;
256   }
257   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
258   {
259     GNUNET_break (0 == PQgetlength (ret, 0, 0));
260     PQclear (ret);
261     *estimate = 0;
262     return;
263   }
264   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
265   PQclear (ret);
266   *estimate = total;
267 }
268
269
270 /**
271  * Store an item in the datastore.
272  *
273  * @param cls closure with the `struct Plugin`
274  * @param key key for the item
275  * @param size number of bytes in data
276  * @param data content stored
277  * @param type type of the content
278  * @param priority priority of the content
279  * @param anonymity anonymity-level for the content
280  * @param replication replication-level for the content
281  * @param expiration expiration time for the content
282  * @param cont continuation called with success or failure status
283  * @param cont_cls continuation closure
284  */
285 static void
286 postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
287                      const void *data, enum GNUNET_BLOCK_Type type,
288                      uint32_t priority, uint32_t anonymity,
289                      uint32_t replication,
290                      struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
291                      void *cont_cls)
292 {
293   struct Plugin *plugin = cls;
294   struct GNUNET_HashCode vhash;
295   PGresult *ret;
296   uint32_t btype = htonl (type);
297   uint32_t bprio = htonl (priority);
298   uint32_t banon = htonl (anonymity);
299   uint32_t brepl = htonl (replication);
300   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value_us__;
301
302   const char *paramValues[] = {
303     (const char *) &brepl,
304     (const char *) &btype,
305     (const char *) &bprio,
306     (const char *) &banon,
307     (const char *) &bexpi,
308     (const char *) key,
309     (const char *) &vhash,
310     (const char *) data
311   };
312   int paramLengths[] = {
313     sizeof (brepl),
314     sizeof (btype),
315     sizeof (bprio),
316     sizeof (banon),
317     sizeof (bexpi),
318     sizeof (struct GNUNET_HashCode),
319     sizeof (struct GNUNET_HashCode),
320     size
321   };
322   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
323
324   GNUNET_CRYPTO_hash (data, size, &vhash);
325   ret =
326       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
327                       paramFormats, 1);
328   if (GNUNET_OK !=
329       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
330   {
331     cont (cont_cls, key, size, GNUNET_SYSERR, _("Postgress exec failure"));
332     return;
333   }
334   PQclear (ret);
335   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
336   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
337                    "Stored %u bytes in database\n", (unsigned int) size);
338   cont (cont_cls, key, size, GNUNET_OK, NULL);
339 }
340
341
342 /**
343  * Function invoked to process the result and call
344  * the processor.
345  *
346  * @param plugin global plugin data
347  * @param proc function to call the value (once only).
348  * @param proc_cls closure for proc
349  * @param res result from exec
350  * @param filename filename for error messages
351  * @param line line number for error messages
352  */
353 static void
354 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
355                 void *proc_cls, PGresult * res,
356                 const char *filename, int line)
357 {
358   int iret;
359   enum GNUNET_BLOCK_Type type;
360   uint32_t anonymity;
361   uint32_t priority;
362   uint32_t size;
363   unsigned int rowid;
364   struct GNUNET_TIME_Absolute expiration_time;
365   struct GNUNET_HashCode key;
366
367   if (GNUNET_OK !=
368       GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
369                                      filename, line))
370   {
371     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
372                      "Ending iteration (postgres error)\n");
373     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
374     return;
375   }
376
377   if (0 == PQntuples (res))
378   {
379     /* no result */
380     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
381                      "Ending iteration (no more results)\n");
382     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
383     PQclear (res);
384     return;
385   }
386   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
387       (sizeof (uint32_t) != PQfsize (res, 0)) ||
388       (sizeof (uint32_t) != PQfsize (res, 6)))
389   {
390     GNUNET_break (0);
391     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
392     PQclear (res);
393     return;
394   }
395   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
396   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
397       (sizeof (uint32_t) != PQfsize (res, 1)) ||
398       (sizeof (uint32_t) != PQfsize (res, 2)) ||
399       (sizeof (uint64_t) != PQfsize (res, 3)) ||
400       (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 4)))
401   {
402     GNUNET_break (0);
403     PQclear (res);
404     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
405     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
406     return;
407   }
408
409   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
410   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
411   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
412   expiration_time.abs_value_us =
413       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
414   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (struct GNUNET_HashCode));
415   size = PQgetlength (res, 0, 5);
416   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
417                    "Found result of size %u bytes and type %u in database\n",
418                    (unsigned int) size, (unsigned int) type);
419   iret =
420       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
421             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
422             rowid);
423   PQclear (res);
424   if (iret == GNUNET_NO)
425   {
426     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427                 "Processor asked for item %u to be removed.\n", rowid);
428     if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
429     {
430       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
431                        "Deleting %u bytes from database\n",
432                        (unsigned int) size);
433       plugin->env->duc (plugin->env->cls,
434                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
435       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
436                        "Deleted %u bytes from database\n", (unsigned int) size);
437     }
438   }
439 }
440
441
442 /**
443  * Iterate over the results for a particular key
444  * in the datastore.
445  *
446  * @param cls closure with the 'struct Plugin'
447  * @param offset offset of the result (modulo num-results);
448  *        specific ordering does not matter for the offset
449  * @param key maybe NULL (to match all entries)
450  * @param vhash hash of the value, maybe NULL (to
451  *        match all values that have the right key).
452  *        Note that for DBlocks there is no difference
453  *        betwen key and vhash, but for other blocks
454  *        there may be!
455  * @param type entries of which type are relevant?
456  *     Use 0 for any type.
457  * @param proc function to call on the matching value;
458  *        will be called once with a NULL if no value matches
459  * @param proc_cls closure for iter
460  */
461 static void
462 postgres_plugin_get_key (void *cls, uint64_t offset,
463                          const struct GNUNET_HashCode * key,
464                          const struct GNUNET_HashCode * vhash,
465                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
466                          void *proc_cls)
467 {
468   struct Plugin *plugin = cls;
469   const int paramFormats[] = { 1, 1, 1, 1, 1 };
470   int paramLengths[4];
471   const char *paramValues[4];
472   int nparams;
473   const char *pname;
474   PGresult *ret;
475   uint64_t total;
476   uint64_t blimit_off;
477   uint32_t btype;
478
479   GNUNET_assert (key != NULL);
480   paramValues[0] = (const char *) key;
481   paramLengths[0] = sizeof (struct GNUNET_HashCode);
482   btype = htonl (type);
483   if (type != 0)
484   {
485     if (vhash != NULL)
486     {
487       paramValues[1] = (const char *) vhash;
488       paramLengths[1] = sizeof (struct GNUNET_HashCode);
489       paramValues[2] = (const char *) &btype;
490       paramLengths[2] = sizeof (btype);
491       paramValues[3] = (const char *) &blimit_off;
492       paramLengths[3] = sizeof (blimit_off);
493       nparams = 4;
494       pname = "getvt";
495       ret =
496           PQexecParams (plugin->dbh,
497                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
498                         3, NULL, paramValues, paramLengths, paramFormats, 1);
499     }
500     else
501     {
502       paramValues[1] = (const char *) &btype;
503       paramLengths[1] = sizeof (btype);
504       paramValues[2] = (const char *) &blimit_off;
505       paramLengths[2] = sizeof (blimit_off);
506       nparams = 3;
507       pname = "gett";
508       ret =
509           PQexecParams (plugin->dbh,
510                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
511                         2, NULL, paramValues, paramLengths, paramFormats, 1);
512     }
513   }
514   else
515   {
516     if (vhash != NULL)
517     {
518       paramValues[1] = (const char *) vhash;
519       paramLengths[1] = sizeof (struct GNUNET_HashCode);
520       paramValues[2] = (const char *) &blimit_off;
521       paramLengths[2] = sizeof (blimit_off);
522       nparams = 3;
523       pname = "getv";
524       ret =
525           PQexecParams (plugin->dbh,
526                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
527                         2, NULL, paramValues, paramLengths, paramFormats, 1);
528     }
529     else
530     {
531       paramValues[1] = (const char *) &blimit_off;
532       paramLengths[1] = sizeof (blimit_off);
533       nparams = 2;
534       pname = "get";
535       ret =
536           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
537                         1, NULL, paramValues, paramLengths, paramFormats, 1);
538     }
539   }
540   if (GNUNET_OK !=
541       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
542   {
543     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
544     return;
545   }
546   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
547       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
548   {
549     GNUNET_break (0);
550     PQclear (ret);
551     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
552     return;
553   }
554   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
555   PQclear (ret);
556   if (total == 0)
557   {
558     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
559     return;
560   }
561   blimit_off = GNUNET_htonll (offset % total);
562   ret =
563       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
564                       paramFormats, 1);
565   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
566 }
567
568
569 /**
570  * Select a subset of the items in the datastore and call
571  * the given iterator for each of them.
572  *
573  * @param cls our "struct Plugin*"
574  * @param offset offset of the result (modulo num-results);
575  *        specific ordering does not matter for the offset
576  * @param type entries of which type should be considered?
577  *        Use 0 for any type.
578  * @param proc function to call on the matching value;
579  *        will be called with a NULL if no value matches
580  * @param proc_cls closure for proc
581  */
582 static void
583 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
584                                     enum GNUNET_BLOCK_Type type,
585                                     PluginDatumProcessor proc, void *proc_cls)
586 {
587   struct Plugin *plugin = cls;
588   uint32_t btype;
589   uint64_t boff;
590   const int paramFormats[] = { 1, 1 };
591   int paramLengths[] = { sizeof (btype), sizeof (boff) };
592   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
593   PGresult *ret;
594
595   btype = htonl ((uint32_t) type);
596   boff = GNUNET_htonll (offset);
597   ret =
598       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
599                       paramLengths, paramFormats, 1);
600   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
601 }
602
603
604 /**
605  * Context for 'repl_iter' function.
606  */
607 struct ReplCtx
608 {
609
610   /**
611    * Plugin handle.
612    */
613   struct Plugin *plugin;
614
615   /**
616    * Function to call for the result (or the NULL).
617    */
618   PluginDatumProcessor proc;
619
620   /**
621    * Closure for proc.
622    */
623   void *proc_cls;
624 };
625
626
627 /**
628  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
629  * Decrements the replication counter and calls the original
630  * iterator.
631  *
632  * @param cls closure with the 'struct ReplCtx*'
633  * @param key key for the content
634  * @param size number of bytes in data
635  * @param data content stored
636  * @param type type of the content
637  * @param priority priority of the content
638  * @param anonymity anonymity-level for the content
639  * @param expiration expiration time for the content
640  * @param uid unique identifier for the datum;
641  *        maybe 0 if no unique identifier is available
642  *
643  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
644  *         (continue on call to "next", of course),
645  *         GNUNET_NO to delete the item and continue (if supported)
646  */
647 static int
648 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
649            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
650            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
651            uint64_t uid)
652 {
653   struct ReplCtx *rc = cls;
654   struct Plugin *plugin = rc->plugin;
655   int ret;
656   PGresult *qret;
657   uint32_t boid;
658
659   ret =
660       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
661                 expiration, uid);
662   if (NULL != key)
663   {
664     boid = htonl ((uint32_t) uid);
665     const char *paramValues[] = {
666       (const char *) &boid,
667     };
668     int paramLengths[] = {
669       sizeof (boid),
670     };
671     const int paramFormats[] = { 1 };
672     qret =
673         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
674                         paramFormats, 1);
675     if (GNUNET_OK !=
676         GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
677                       "decrepl"))
678       return GNUNET_SYSERR;
679     PQclear (qret);
680   }
681   return ret;
682 }
683
684
685 /**
686  * Get a random item for replication.  Returns a single, not expired, random item
687  * from those with the highest replication counters.  The item's
688  * replication counter is decremented by one IF it was positive before.
689  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
690  *
691  * @param cls closure with the 'struct Plugin'
692  * @param proc function to call the value (once only).
693  * @param proc_cls closure for proc
694  */
695 static void
696 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
697                                  void *proc_cls)
698 {
699   struct Plugin *plugin = cls;
700   struct ReplCtx rc;
701   PGresult *ret;
702
703   rc.plugin = plugin;
704   rc.proc = proc;
705   rc.proc_cls = proc_cls;
706   ret =
707       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
708                       NULL, 1);
709   process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
710 }
711
712
713 /**
714  * Get a random item for expiration.
715  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
716  *
717  * @param cls closure with the 'struct Plugin'
718  * @param proc function to call the value (once only).
719  * @param proc_cls closure for proc
720  */
721 static void
722 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
723                                 void *proc_cls)
724 {
725   struct Plugin *plugin = cls;
726   uint64_t btime;
727   const int paramFormats[] = { 1 };
728   int paramLengths[] = { sizeof (btime) };
729   const char *paramValues[] = { (const char *) &btime };
730   PGresult *ret;
731
732   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value_us);
733   ret =
734       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
735                       paramLengths, paramFormats, 1);
736   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
737 }
738
739
740 /**
741  * Update the priority for a particular key in the datastore.  If
742  * the expiration time in value is different than the time found in
743  * the datastore, the higher value should be kept.  For the
744  * anonymity level, the lower value is to be used.  The specified
745  * priority should be added to the existing priority, ignoring the
746  * priority in value.
747  *
748  * Note that it is possible for multiple values to match this put.
749  * In that case, all of the respective values are updated.
750  *
751  * @param cls our "struct Plugin*"
752  * @param uid unique identifier of the datum
753  * @param delta by how much should the priority
754  *     change?  If priority + delta < 0 the
755  *     priority should be set to 0 (never go
756  *     negative).
757  * @param expire new expiration time should be the
758  *     MAX of any existing expiration time and
759  *     this value
760  * @param cont continuation called with success or failure status
761  * @param cons_cls continuation closure
762  */
763 static void
764 postgres_plugin_update (void *cls, uint64_t uid, int delta,
765                         struct GNUNET_TIME_Absolute expire,
766                         PluginUpdateCont cont, void *cont_cls)
767 {
768   struct Plugin *plugin = cls;
769   PGresult *ret;
770   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
771   uint32_t boid = htonl ((uint32_t) uid);
772   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value_us__;
773
774   const char *paramValues[] = {
775     (const char *) &bdelta,
776     (const char *) &bexpire,
777     (const char *) &boid,
778   };
779   int paramLengths[] = {
780     sizeof (bdelta),
781     sizeof (bexpire),
782     sizeof (boid),
783   };
784   const int paramFormats[] = { 1, 1, 1 };
785
786   ret =
787       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
788                       paramFormats, 1);
789   if (GNUNET_OK !=
790       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
791   {
792     cont (cont_cls, GNUNET_SYSERR, NULL);
793     return;
794   }
795   PQclear (ret);
796   cont (cont_cls, GNUNET_OK, NULL);
797 }
798
799
800
801 /**
802  * Get all of the keys in the datastore.
803  *
804  * @param cls closure with the 'struct Plugin'
805  * @param proc function to call on each key
806  * @param proc_cls closure for proc
807  */
808 static void
809 postgres_plugin_get_keys (void *cls,
810                           PluginKeyProcessor proc,
811                           void *proc_cls)
812 {
813   struct Plugin *plugin = cls;
814   int ret;
815   int i;
816   struct GNUNET_HashCode key;
817   PGresult * res;
818
819   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
820   ret = PQntuples (res);
821   for (i=0;i<ret;i++)
822   {
823     if (sizeof (struct GNUNET_HashCode) != PQgetlength (res, i, 0))
824     {
825       memcpy (&key, PQgetvalue (res, i, 0), sizeof (struct GNUNET_HashCode));
826       proc (proc_cls, &key, 1);
827     }
828   }
829   PQclear (res);
830   proc (proc_cls, NULL, 0);
831 }
832
833
834
835 /**
836  * Drop database.
837  *
838  * @param cls closure with the 'struct Plugin'
839  */
840 static void
841 postgres_plugin_drop (void *cls)
842 {
843   struct Plugin *plugin = cls;
844
845   if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
846     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
847 }
848
849
850 /**
851  * Entry point for the plugin.
852  *
853  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
854  * @return our "struct Plugin*"
855  */
856 void *
857 libgnunet_plugin_datastore_postgres_init (void *cls)
858 {
859   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
860   struct GNUNET_DATASTORE_PluginFunctions *api;
861   struct Plugin *plugin;
862
863   plugin = GNUNET_new (struct Plugin);
864   plugin->env = env;
865   if (GNUNET_OK != init_connection (plugin))
866   {
867     GNUNET_free (plugin);
868     return NULL;
869   }
870   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
871   api->cls = plugin;
872   api->estimate_size = &postgres_plugin_estimate_size;
873   api->put = &postgres_plugin_put;
874   api->update = &postgres_plugin_update;
875   api->get_key = &postgres_plugin_get_key;
876   api->get_replication = &postgres_plugin_get_replication;
877   api->get_expiration = &postgres_plugin_get_expiration;
878   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
879   api->get_keys = &postgres_plugin_get_keys;
880   api->drop = &postgres_plugin_drop;
881   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
882                    _("Postgres database running\n"));
883   return api;
884 }
885
886
887 /**
888  * Exit point from the plugin.
889  * @param cls our "struct Plugin*"
890  * @return always NULL
891  */
892 void *
893 libgnunet_plugin_datastore_postgres_done (void *cls)
894 {
895   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
896   struct Plugin *plugin = api->cls;
897
898   PQfinish (plugin->dbh);
899   GNUNET_free (plugin);
900   GNUNET_free (api);
901   return NULL;
902 }
903
904 /* end of plugin_datastore_postgres.c */