fix use-after-free on exit
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30
31
32 /**
33  * After how many ms "busy" should a DB operation fail for good?
34  * A low value makes sure that we are more responsive to requests
35  * (especially PUTs).  A high value guarantees a higher success
36  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
37  *
38  * The default value of 1s should ensure that users do not experience
39  * huge latencies while at the same time allowing operations to succeed
40  * with reasonable probability.
41  */
42 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
43
44
45 /**
46  * Context for all functions in this plugin.
47  */
48 struct Plugin
49 {
50   /**
51    * Our execution environment.
52    */
53   struct GNUNET_DATASTORE_PluginEnvironment *env;
54
55   /**
56    * Native Postgres database handle.
57    */
58   PGconn *dbh;
59
60 };
61
62
63 /**
64  * @brief Get a database handle
65  *
66  * @param plugin global context
67  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
68  */
69 static int
70 init_connection (struct Plugin *plugin)
71 {
72   PGresult *ret;
73
74   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
75   if (NULL == plugin->dbh)
76     return GNUNET_SYSERR;
77
78   ret =
79       PQexec (plugin->dbh,
80               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
81               "  type INTEGER NOT NULL DEFAULT 0,"
82               "  prio INTEGER NOT NULL DEFAULT 0,"
83               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
84               "  expire BIGINT NOT NULL DEFAULT 0,"
85               "  rvalue BIGINT NOT NULL DEFAULT 0,"
86               "  hash BYTEA NOT NULL DEFAULT '',"
87               "  vhash BYTEA NOT NULL DEFAULT '',"
88               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
89   if ( (NULL == ret) ||
90        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
91         (0 != strcmp ("42P07",    /* duplicate table */
92                       PQresultErrorField
93                       (ret,
94                        PG_DIAG_SQLSTATE)))))
95   {
96     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
97                                          ret,
98                                          PGRES_COMMAND_OK,
99                                          "CREATE TABLE",
100                                          "gn090");
101     PQfinish (plugin->dbh);
102     plugin->dbh = NULL;
103     return GNUNET_SYSERR;
104   }
105
106   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
107   {
108     if ((GNUNET_OK !=
109          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
110         (GNUNET_OK !=
111          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
112         (GNUNET_OK !=
113          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
114         (GNUNET_OK !=
115          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
116         (GNUNET_OK !=
117          GNUNET_POSTGRES_exec (plugin->dbh,
118                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
119         (GNUNET_OK !=
120          GNUNET_POSTGRES_exec (plugin->dbh,
121                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
122         (GNUNET_OK !=
123          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
124         (GNUNET_OK !=
125          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
126     {
127       PQclear (ret);
128       PQfinish (plugin->dbh);
129       plugin->dbh = NULL;
130       return GNUNET_SYSERR;
131     }
132   }
133   PQclear (ret);
134
135   ret =
136       PQexec (plugin->dbh,
137               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
138   if (GNUNET_OK !=
139       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
140   {
141     PQfinish (plugin->dbh);
142     plugin->dbh = NULL;
143     return GNUNET_SYSERR;
144   }
145   PQclear (ret);
146   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
147   if (GNUNET_OK !=
148       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
149   {
150     PQfinish (plugin->dbh);
151     plugin->dbh = NULL;
152     return GNUNET_SYSERR;
153   }
154   PQclear (ret);
155   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
156   if (GNUNET_OK !=
157       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
158   {
159     PQfinish (plugin->dbh);
160     plugin->dbh = NULL;
161     return GNUNET_SYSERR;
162   }
163   PQclear (ret);
164   if ((GNUNET_OK !=
165        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
166                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
167                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
168                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
169       (GNUNET_OK !=
170        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
171                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
172                    "WHERE hash=$1 AND type=$2 "
173                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
174       (GNUNET_OK !=
175        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
176                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
177                    "WHERE hash=$1 AND vhash=$2 "
178                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
179       (GNUNET_OK !=
180        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
181                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
182                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
183       (GNUNET_OK !=
184        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
185                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
186                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
187       (GNUNET_OK !=
188        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
189                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
190                    "WHERE oid = $3", 3)) ||
191       (GNUNET_OK !=
192        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
193                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
194                    "WHERE oid = $1", 1)) ||
195       (GNUNET_OK !=
196        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
197                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
198                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
199                    1)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
202                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
203                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
204                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
205                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
206                    1)) ||
207       (GNUNET_OK !=
208        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
209                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
210                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
211       (GNUNET_OK !=
212        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
213       (GNUNET_OK !=
214        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
215   {
216     PQfinish (plugin->dbh);
217     plugin->dbh = NULL;
218     return GNUNET_SYSERR;
219   }
220   return GNUNET_OK;
221 }
222
223
224 /**
225  * Get an estimate of how much space the database is
226  * currently using.
227  *
228  * @param cls our `struct Plugin *`
229  * @return number of bytes used on disk
230  */
231 static void
232 postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
233 {
234   struct Plugin *plugin = cls;
235   unsigned long long total;
236   PGresult *ret;
237
238   if (NULL == estimate)
239     return;
240   ret =
241       PQexecParams (plugin->dbh,
242                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
243                     NULL, NULL, NULL, NULL, 1);
244   if (GNUNET_OK !=
245       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
246   {
247     *estimate = 0;
248     return;
249   }
250   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
251   {
252     GNUNET_break (0);
253     PQclear (ret);
254     *estimate = 0;
255     return;
256   }
257   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
258   {
259     GNUNET_break (0 == PQgetlength (ret, 0, 0));
260     PQclear (ret);
261     *estimate = 0;
262     return;
263   }
264   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
265   PQclear (ret);
266   *estimate = total;
267 }
268
269
270 /**
271  * Store an item in the datastore.
272  *
273  * @param cls closure with the `struct Plugin`
274  * @param key key for the item
275  * @param size number of bytes in data
276  * @param data content stored
277  * @param type type of the content
278  * @param priority priority of the content
279  * @param anonymity anonymity-level for the content
280  * @param replication replication-level for the content
281  * @param expiration expiration time for the content
282  * @param msg set to error message
283  * @return #GNUNET_OK on success
284  */
285 static int
286 postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
287                      const void *data, enum GNUNET_BLOCK_Type type,
288                      uint32_t priority, uint32_t anonymity,
289                      uint32_t replication,
290                      struct GNUNET_TIME_Absolute expiration, char **msg)
291 {
292   struct Plugin *plugin = cls;
293   struct GNUNET_HashCode vhash;
294   PGresult *ret;
295   uint32_t btype = htonl (type);
296   uint32_t bprio = htonl (priority);
297   uint32_t banon = htonl (anonymity);
298   uint32_t brepl = htonl (replication);
299   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value_us__;
300
301   const char *paramValues[] = {
302     (const char *) &brepl,
303     (const char *) &btype,
304     (const char *) &bprio,
305     (const char *) &banon,
306     (const char *) &bexpi,
307     (const char *) key,
308     (const char *) &vhash,
309     (const char *) data
310   };
311   int paramLengths[] = {
312     sizeof (brepl),
313     sizeof (btype),
314     sizeof (bprio),
315     sizeof (banon),
316     sizeof (bexpi),
317     sizeof (struct GNUNET_HashCode),
318     sizeof (struct GNUNET_HashCode),
319     size
320   };
321   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
322
323   GNUNET_CRYPTO_hash (data, size, &vhash);
324   ret =
325       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
326                       paramFormats, 1);
327   if (GNUNET_OK !=
328       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
329     return GNUNET_SYSERR;
330   PQclear (ret);
331   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
332   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
333                    "Stored %u bytes in database\n", (unsigned int) size);
334   return GNUNET_OK;
335 }
336
337
338 /**
339  * Function invoked to process the result and call
340  * the processor.
341  *
342  * @param plugin global plugin data
343  * @param proc function to call the value (once only).
344  * @param proc_cls closure for proc
345  * @param res result from exec
346  * @param filename filename for error messages
347  * @param line line number for error messages
348  */
349 static void
350 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
351                 void *proc_cls, PGresult * res,
352                 const char *filename, int line)
353 {
354   int iret;
355   enum GNUNET_BLOCK_Type type;
356   uint32_t anonymity;
357   uint32_t priority;
358   uint32_t size;
359   unsigned int rowid;
360   struct GNUNET_TIME_Absolute expiration_time;
361   struct GNUNET_HashCode key;
362
363   if (GNUNET_OK !=
364       GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
365                                      filename, line))
366   {
367     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
368                      "Ending iteration (postgres error)\n");
369     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
370     return;
371   }
372
373   if (0 == PQntuples (res))
374   {
375     /* no result */
376     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
377                      "Ending iteration (no more results)\n");
378     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
379     PQclear (res);
380     return;
381   }
382   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
383       (sizeof (uint32_t) != PQfsize (res, 0)) ||
384       (sizeof (uint32_t) != PQfsize (res, 6)))
385   {
386     GNUNET_break (0);
387     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
388     PQclear (res);
389     return;
390   }
391   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
392   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
393       (sizeof (uint32_t) != PQfsize (res, 1)) ||
394       (sizeof (uint32_t) != PQfsize (res, 2)) ||
395       (sizeof (uint64_t) != PQfsize (res, 3)) ||
396       (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 4)))
397   {
398     GNUNET_break (0);
399     PQclear (res);
400     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
401     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
402     return;
403   }
404
405   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
406   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
407   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
408   expiration_time.abs_value_us =
409       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
410   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (struct GNUNET_HashCode));
411   size = PQgetlength (res, 0, 5);
412   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
413                    "Found result of size %u bytes and type %u in database\n",
414                    (unsigned int) size, (unsigned int) type);
415   iret =
416       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
417             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
418             rowid);
419   PQclear (res);
420   if (iret == GNUNET_NO)
421   {
422     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423                 "Processor asked for item %u to be removed.\n", rowid);
424     if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
425     {
426       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
427                        "Deleting %u bytes from database\n",
428                        (unsigned int) size);
429       plugin->env->duc (plugin->env->cls,
430                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
431       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
432                        "Deleted %u bytes from database\n", (unsigned int) size);
433     }
434   }
435 }
436
437
438 /**
439  * Iterate over the results for a particular key
440  * in the datastore.
441  *
442  * @param cls closure with the 'struct Plugin'
443  * @param offset offset of the result (modulo num-results);
444  *        specific ordering does not matter for the offset
445  * @param key maybe NULL (to match all entries)
446  * @param vhash hash of the value, maybe NULL (to
447  *        match all values that have the right key).
448  *        Note that for DBlocks there is no difference
449  *        betwen key and vhash, but for other blocks
450  *        there may be!
451  * @param type entries of which type are relevant?
452  *     Use 0 for any type.
453  * @param proc function to call on the matching value;
454  *        will be called once with a NULL if no value matches
455  * @param proc_cls closure for iter
456  */
457 static void
458 postgres_plugin_get_key (void *cls, uint64_t offset,
459                          const struct GNUNET_HashCode * key,
460                          const struct GNUNET_HashCode * vhash,
461                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
462                          void *proc_cls)
463 {
464   struct Plugin *plugin = cls;
465   const int paramFormats[] = { 1, 1, 1, 1, 1 };
466   int paramLengths[4];
467   const char *paramValues[4];
468   int nparams;
469   const char *pname;
470   PGresult *ret;
471   uint64_t total;
472   uint64_t blimit_off;
473   uint32_t btype;
474
475   GNUNET_assert (key != NULL);
476   paramValues[0] = (const char *) key;
477   paramLengths[0] = sizeof (struct GNUNET_HashCode);
478   btype = htonl (type);
479   if (type != 0)
480   {
481     if (vhash != NULL)
482     {
483       paramValues[1] = (const char *) vhash;
484       paramLengths[1] = sizeof (struct GNUNET_HashCode);
485       paramValues[2] = (const char *) &btype;
486       paramLengths[2] = sizeof (btype);
487       paramValues[3] = (const char *) &blimit_off;
488       paramLengths[3] = sizeof (blimit_off);
489       nparams = 4;
490       pname = "getvt";
491       ret =
492           PQexecParams (plugin->dbh,
493                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
494                         3, NULL, paramValues, paramLengths, paramFormats, 1);
495     }
496     else
497     {
498       paramValues[1] = (const char *) &btype;
499       paramLengths[1] = sizeof (btype);
500       paramValues[2] = (const char *) &blimit_off;
501       paramLengths[2] = sizeof (blimit_off);
502       nparams = 3;
503       pname = "gett";
504       ret =
505           PQexecParams (plugin->dbh,
506                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
507                         2, NULL, paramValues, paramLengths, paramFormats, 1);
508     }
509   }
510   else
511   {
512     if (vhash != NULL)
513     {
514       paramValues[1] = (const char *) vhash;
515       paramLengths[1] = sizeof (struct GNUNET_HashCode);
516       paramValues[2] = (const char *) &blimit_off;
517       paramLengths[2] = sizeof (blimit_off);
518       nparams = 3;
519       pname = "getv";
520       ret =
521           PQexecParams (plugin->dbh,
522                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
523                         2, NULL, paramValues, paramLengths, paramFormats, 1);
524     }
525     else
526     {
527       paramValues[1] = (const char *) &blimit_off;
528       paramLengths[1] = sizeof (blimit_off);
529       nparams = 2;
530       pname = "get";
531       ret =
532           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
533                         1, NULL, paramValues, paramLengths, paramFormats, 1);
534     }
535   }
536   if (GNUNET_OK !=
537       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
538   {
539     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
540     return;
541   }
542   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
543       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
544   {
545     GNUNET_break (0);
546     PQclear (ret);
547     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
548     return;
549   }
550   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
551   PQclear (ret);
552   if (total == 0)
553   {
554     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
555     return;
556   }
557   blimit_off = GNUNET_htonll (offset % total);
558   ret =
559       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
560                       paramFormats, 1);
561   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
562 }
563
564
565 /**
566  * Select a subset of the items in the datastore and call
567  * the given iterator for each of them.
568  *
569  * @param cls our "struct Plugin*"
570  * @param offset offset of the result (modulo num-results);
571  *        specific ordering does not matter for the offset
572  * @param type entries of which type should be considered?
573  *        Use 0 for any type.
574  * @param proc function to call on the matching value;
575  *        will be called with a NULL if no value matches
576  * @param proc_cls closure for proc
577  */
578 static void
579 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
580                                     enum GNUNET_BLOCK_Type type,
581                                     PluginDatumProcessor proc, void *proc_cls)
582 {
583   struct Plugin *plugin = cls;
584   uint32_t btype;
585   uint64_t boff;
586   const int paramFormats[] = { 1, 1 };
587   int paramLengths[] = { sizeof (btype), sizeof (boff) };
588   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
589   PGresult *ret;
590
591   btype = htonl ((uint32_t) type);
592   boff = GNUNET_htonll (offset);
593   ret =
594       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
595                       paramLengths, paramFormats, 1);
596   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
597 }
598
599
600 /**
601  * Context for 'repl_iter' function.
602  */
603 struct ReplCtx
604 {
605
606   /**
607    * Plugin handle.
608    */
609   struct Plugin *plugin;
610
611   /**
612    * Function to call for the result (or the NULL).
613    */
614   PluginDatumProcessor proc;
615
616   /**
617    * Closure for proc.
618    */
619   void *proc_cls;
620 };
621
622
623 /**
624  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
625  * Decrements the replication counter and calls the original
626  * iterator.
627  *
628  * @param cls closure with the 'struct ReplCtx*'
629  * @param key key for the content
630  * @param size number of bytes in data
631  * @param data content stored
632  * @param type type of the content
633  * @param priority priority of the content
634  * @param anonymity anonymity-level for the content
635  * @param expiration expiration time for the content
636  * @param uid unique identifier for the datum;
637  *        maybe 0 if no unique identifier is available
638  *
639  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
640  *         (continue on call to "next", of course),
641  *         GNUNET_NO to delete the item and continue (if supported)
642  */
643 static int
644 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
645            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
646            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
647            uint64_t uid)
648 {
649   struct ReplCtx *rc = cls;
650   struct Plugin *plugin = rc->plugin;
651   int ret;
652   PGresult *qret;
653   uint32_t boid;
654
655   ret =
656       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
657                 expiration, uid);
658   if (NULL != key)
659   {
660     boid = htonl ((uint32_t) uid);
661     const char *paramValues[] = {
662       (const char *) &boid,
663     };
664     int paramLengths[] = {
665       sizeof (boid),
666     };
667     const int paramFormats[] = { 1 };
668     qret =
669         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
670                         paramFormats, 1);
671     if (GNUNET_OK !=
672         GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
673                       "decrepl"))
674       return GNUNET_SYSERR;
675     PQclear (qret);
676   }
677   return ret;
678 }
679
680
681 /**
682  * Get a random item for replication.  Returns a single, not expired, random item
683  * from those with the highest replication counters.  The item's
684  * replication counter is decremented by one IF it was positive before.
685  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
686  *
687  * @param cls closure with the 'struct Plugin'
688  * @param proc function to call the value (once only).
689  * @param proc_cls closure for proc
690  */
691 static void
692 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
693                                  void *proc_cls)
694 {
695   struct Plugin *plugin = cls;
696   struct ReplCtx rc;
697   PGresult *ret;
698
699   rc.plugin = plugin;
700   rc.proc = proc;
701   rc.proc_cls = proc_cls;
702   ret =
703       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
704                       NULL, 1);
705   process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
706 }
707
708
709 /**
710  * Get a random item for expiration.
711  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
712  *
713  * @param cls closure with the 'struct Plugin'
714  * @param proc function to call the value (once only).
715  * @param proc_cls closure for proc
716  */
717 static void
718 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
719                                 void *proc_cls)
720 {
721   struct Plugin *plugin = cls;
722   uint64_t btime;
723   const int paramFormats[] = { 1 };
724   int paramLengths[] = { sizeof (btime) };
725   const char *paramValues[] = { (const char *) &btime };
726   PGresult *ret;
727
728   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value_us);
729   ret =
730       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
731                       paramLengths, paramFormats, 1);
732   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
733 }
734
735
736 /**
737  * Update the priority for a particular key in the datastore.  If
738  * the expiration time in value is different than the time found in
739  * the datastore, the higher value should be kept.  For the
740  * anonymity level, the lower value is to be used.  The specified
741  * priority should be added to the existing priority, ignoring the
742  * priority in value.
743  *
744  * Note that it is possible for multiple values to match this put.
745  * In that case, all of the respective values are updated.
746  *
747  * @param cls our "struct Plugin*"
748  * @param uid unique identifier of the datum
749  * @param delta by how much should the priority
750  *     change?  If priority + delta < 0 the
751  *     priority should be set to 0 (never go
752  *     negative).
753  * @param expire new expiration time should be the
754  *     MAX of any existing expiration time and
755  *     this value
756  * @param msg set to error message
757  * @return GNUNET_OK on success
758  */
759 static int
760 postgres_plugin_update (void *cls, uint64_t uid, int delta,
761                         struct GNUNET_TIME_Absolute expire, char **msg)
762 {
763   struct Plugin *plugin = cls;
764   PGresult *ret;
765   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
766   uint32_t boid = htonl ((uint32_t) uid);
767   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value_us__;
768
769   const char *paramValues[] = {
770     (const char *) &bdelta,
771     (const char *) &bexpire,
772     (const char *) &boid,
773   };
774   int paramLengths[] = {
775     sizeof (bdelta),
776     sizeof (bexpire),
777     sizeof (boid),
778   };
779   const int paramFormats[] = { 1, 1, 1 };
780
781   ret =
782       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
783                       paramFormats, 1);
784   if (GNUNET_OK !=
785       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
786     return GNUNET_SYSERR;
787   PQclear (ret);
788   return GNUNET_OK;
789 }
790
791
792
793 /**
794  * Get all of the keys in the datastore.
795  *
796  * @param cls closure with the 'struct Plugin'
797  * @param proc function to call on each key
798  * @param proc_cls closure for proc
799  */
800 static void
801 postgres_plugin_get_keys (void *cls,
802                           PluginKeyProcessor proc,
803                           void *proc_cls)
804 {
805   struct Plugin *plugin = cls;
806   int ret;
807   int i;
808   struct GNUNET_HashCode key;
809   PGresult * res;
810
811   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
812   ret = PQntuples (res);
813   for (i=0;i<ret;i++)
814   {
815     if (sizeof (struct GNUNET_HashCode) != PQgetlength (res, i, 0))
816     {
817       memcpy (&key, PQgetvalue (res, i, 0), sizeof (struct GNUNET_HashCode));
818       proc (proc_cls, &key, 1);
819     }
820   }
821   PQclear (res);
822 }
823
824
825
826 /**
827  * Drop database.
828  *
829  * @param cls closure with the 'struct Plugin'
830  */
831 static void
832 postgres_plugin_drop (void *cls)
833 {
834   struct Plugin *plugin = cls;
835
836   if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
837     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
838 }
839
840
841 /**
842  * Entry point for the plugin.
843  *
844  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
845  * @return our "struct Plugin*"
846  */
847 void *
848 libgnunet_plugin_datastore_postgres_init (void *cls)
849 {
850   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
851   struct GNUNET_DATASTORE_PluginFunctions *api;
852   struct Plugin *plugin;
853
854   plugin = GNUNET_new (struct Plugin);
855   plugin->env = env;
856   if (GNUNET_OK != init_connection (plugin))
857   {
858     GNUNET_free (plugin);
859     return NULL;
860   }
861   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
862   api->cls = plugin;
863   api->estimate_size = &postgres_plugin_estimate_size;
864   api->put = &postgres_plugin_put;
865   api->update = &postgres_plugin_update;
866   api->get_key = &postgres_plugin_get_key;
867   api->get_replication = &postgres_plugin_get_replication;
868   api->get_expiration = &postgres_plugin_get_expiration;
869   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
870   api->get_keys = &postgres_plugin_get_keys;
871   api->drop = &postgres_plugin_drop;
872   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
873                    _("Postgres database running\n"));
874   return api;
875 }
876
877
878 /**
879  * Exit point from the plugin.
880  * @param cls our "struct Plugin*"
881  * @return always NULL
882  */
883 void *
884 libgnunet_plugin_datastore_postgres_done (void *cls)
885 {
886   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
887   struct Plugin *plugin = api->cls;
888
889   PQfinish (plugin->dbh);
890   GNUNET_free (plugin);
891   GNUNET_free (api);
892   return NULL;
893 }
894
895 /* end of plugin_datastore_postgres.c */