Rename DEFAULT_QUALITY to MLP_DEFAULT_QUALITY to avoid conflicting with W32 GDI
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30
31
32 /**
33  * After how many ms "busy" should a DB operation fail for good?
34  * A low value makes sure that we are more responsive to requests
35  * (especially PUTs).  A high value guarantees a higher success
36  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
37  *
38  * The default value of 1s should ensure that users do not experience
39  * huge latencies while at the same time allowing operations to succeed
40  * with reasonable probability.
41  */
42 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
43
44
45 /**
46  * Context for all functions in this plugin.
47  */
48 struct Plugin
49 {
50   /**
51    * Our execution environment.
52    */
53   struct GNUNET_DATASTORE_PluginEnvironment *env;
54
55   /**
56    * Native Postgres database handle.
57    */
58   PGconn *dbh;
59
60 };
61
62
63 /**
64  * @brief Get a database handle
65  *
66  * @param plugin global context
67  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
68  */
69 static int
70 init_connection (struct Plugin *plugin)
71 {
72   PGresult *ret;
73
74   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
75   if (NULL == plugin->dbh)
76     return GNUNET_SYSERR;
77
78   ret =
79       PQexec (plugin->dbh,
80               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
81               "  type INTEGER NOT NULL DEFAULT 0,"
82               "  prio INTEGER NOT NULL DEFAULT 0,"
83               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
84               "  expire BIGINT NOT NULL DEFAULT 0,"
85               "  rvalue BIGINT NOT NULL DEFAULT 0,"
86               "  hash BYTEA NOT NULL DEFAULT '',"
87               "  vhash BYTEA NOT NULL DEFAULT '',"
88               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
89   if ( (NULL == ret) ||
90        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
91         (0 != strcmp ("42P07",    /* duplicate table */
92                       PQresultErrorField
93                       (ret,
94                        PG_DIAG_SQLSTATE)))))
95   {
96     (void) GNUNET_POSTGRES_check_result (plugin->dbh,
97                                          ret,
98                                          PGRES_COMMAND_OK,
99                                          "CREATE TABLE",
100                                          "gn090");
101     PQfinish (plugin->dbh);
102     plugin->dbh = NULL;
103     return GNUNET_SYSERR;
104   }
105
106   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
107   {
108     if ((GNUNET_OK !=
109          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
110         (GNUNET_OK !=
111          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
112         (GNUNET_OK !=
113          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
114         (GNUNET_OK !=
115          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
116         (GNUNET_OK !=
117          GNUNET_POSTGRES_exec (plugin->dbh,
118                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
119         (GNUNET_OK !=
120          GNUNET_POSTGRES_exec (plugin->dbh,
121                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
122         (GNUNET_OK !=
123          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
124         (GNUNET_OK !=
125          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
126     {
127       PQclear (ret);
128       PQfinish (plugin->dbh);
129       plugin->dbh = NULL;
130       return GNUNET_SYSERR;
131     }
132   }
133   PQclear (ret);
134
135   ret =
136       PQexec (plugin->dbh,
137               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
138   if (GNUNET_OK !=
139       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
140   {
141     PQfinish (plugin->dbh);
142     plugin->dbh = NULL;
143     return GNUNET_SYSERR;
144   }
145   PQclear (ret);
146   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
147   if (GNUNET_OK !=
148       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
149   {
150     PQfinish (plugin->dbh);
151     plugin->dbh = NULL;
152     return GNUNET_SYSERR;
153   }
154   PQclear (ret);
155   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
156   if (GNUNET_OK !=
157       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
158   {
159     PQfinish (plugin->dbh);
160     plugin->dbh = NULL;
161     return GNUNET_SYSERR;
162   }
163   PQclear (ret);
164   if ((GNUNET_OK !=
165        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
166                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
167                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
168                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
169       (GNUNET_OK !=
170        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
171                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
172                    "WHERE hash=$1 AND type=$2 "
173                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
174       (GNUNET_OK !=
175        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
176                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
177                    "WHERE hash=$1 AND vhash=$2 "
178                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
179       (GNUNET_OK !=
180        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
181                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
182                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
183       (GNUNET_OK !=
184        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
185                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
186                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
187       (GNUNET_OK !=
188        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
189                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
190                    "WHERE oid = $3", 3)) ||
191       (GNUNET_OK !=
192        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
193                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
194                    "WHERE oid = $1", 1)) ||
195       (GNUNET_OK !=
196        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
197                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
198                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
199                    1)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
202                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
203                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
204                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
205                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
206                    1)) ||
207       (GNUNET_OK !=
208        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
209                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
210                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
211       (GNUNET_OK !=
212        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
213       (GNUNET_OK !=
214        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
215   {
216     PQfinish (plugin->dbh);
217     plugin->dbh = NULL;
218     return GNUNET_SYSERR;
219   }
220   return GNUNET_OK;
221 }
222
223
224 /**
225  * Get an estimate of how much space the database is
226  * currently using.
227  *
228  * @param cls our `struct Plugin *`
229  * @return number of bytes used on disk
230  */
231 static unsigned long long
232 postgres_plugin_estimate_size (void *cls)
233 {
234   struct Plugin *plugin = cls;
235   unsigned long long total;
236   PGresult *ret;
237
238   ret =
239       PQexecParams (plugin->dbh,
240                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
241                     NULL, NULL, NULL, NULL, 1);
242   if (GNUNET_OK !=
243       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
244   {
245     return 0;
246   }
247   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
248   {
249     GNUNET_break (0);
250     PQclear (ret);
251     return 0;
252   }
253   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
254   {
255     GNUNET_break (0 == PQgetlength (ret, 0, 0));
256     PQclear (ret);
257     return 0;
258   }
259   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
260   PQclear (ret);
261   return total;
262 }
263
264
265 /**
266  * Store an item in the datastore.
267  *
268  * @param cls closure with the `struct Plugin`
269  * @param key key for the item
270  * @param size number of bytes in data
271  * @param data content stored
272  * @param type type of the content
273  * @param priority priority of the content
274  * @param anonymity anonymity-level for the content
275  * @param replication replication-level for the content
276  * @param expiration expiration time for the content
277  * @param msg set to error message
278  * @return #GNUNET_OK on success
279  */
280 static int
281 postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
282                      const void *data, enum GNUNET_BLOCK_Type type,
283                      uint32_t priority, uint32_t anonymity,
284                      uint32_t replication,
285                      struct GNUNET_TIME_Absolute expiration, char **msg)
286 {
287   struct Plugin *plugin = cls;
288   struct GNUNET_HashCode vhash;
289   PGresult *ret;
290   uint32_t btype = htonl (type);
291   uint32_t bprio = htonl (priority);
292   uint32_t banon = htonl (anonymity);
293   uint32_t brepl = htonl (replication);
294   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value_us__;
295
296   const char *paramValues[] = {
297     (const char *) &brepl,
298     (const char *) &btype,
299     (const char *) &bprio,
300     (const char *) &banon,
301     (const char *) &bexpi,
302     (const char *) key,
303     (const char *) &vhash,
304     (const char *) data
305   };
306   int paramLengths[] = {
307     sizeof (brepl),
308     sizeof (btype),
309     sizeof (bprio),
310     sizeof (banon),
311     sizeof (bexpi),
312     sizeof (struct GNUNET_HashCode),
313     sizeof (struct GNUNET_HashCode),
314     size
315   };
316   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
317
318   GNUNET_CRYPTO_hash (data, size, &vhash);
319   ret =
320       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
321                       paramFormats, 1);
322   if (GNUNET_OK !=
323       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
324     return GNUNET_SYSERR;
325   PQclear (ret);
326   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
327   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
328                    "Stored %u bytes in database\n", (unsigned int) size);
329   return GNUNET_OK;
330 }
331
332
333 /**
334  * Function invoked to process the result and call
335  * the processor.
336  *
337  * @param plugin global plugin data
338  * @param proc function to call the value (once only).
339  * @param proc_cls closure for proc
340  * @param res result from exec
341  * @param filename filename for error messages
342  * @param line line number for error messages
343  */
344 static void
345 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
346                 void *proc_cls, PGresult * res,
347                 const char *filename, int line)
348 {
349   int iret;
350   enum GNUNET_BLOCK_Type type;
351   uint32_t anonymity;
352   uint32_t priority;
353   uint32_t size;
354   unsigned int rowid;
355   struct GNUNET_TIME_Absolute expiration_time;
356   struct GNUNET_HashCode key;
357
358   if (GNUNET_OK !=
359       GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
360                                      filename, line))
361   {
362     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
363                      "Ending iteration (postgres error)\n");
364     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
365     return;
366   }
367
368   if (0 == PQntuples (res))
369   {
370     /* no result */
371     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
372                      "Ending iteration (no more results)\n");
373     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
374     PQclear (res);
375     return;
376   }
377   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
378       (sizeof (uint32_t) != PQfsize (res, 0)) ||
379       (sizeof (uint32_t) != PQfsize (res, 6)))
380   {
381     GNUNET_break (0);
382     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
383     PQclear (res);
384     return;
385   }
386   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
387   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
388       (sizeof (uint32_t) != PQfsize (res, 1)) ||
389       (sizeof (uint32_t) != PQfsize (res, 2)) ||
390       (sizeof (uint64_t) != PQfsize (res, 3)) ||
391       (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 4)))
392   {
393     GNUNET_break (0);
394     PQclear (res);
395     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
396     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
397     return;
398   }
399
400   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
401   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
402   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
403   expiration_time.abs_value_us =
404       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
405   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (struct GNUNET_HashCode));
406   size = PQgetlength (res, 0, 5);
407   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
408                    "Found result of size %u bytes and type %u in database\n",
409                    (unsigned int) size, (unsigned int) type);
410   iret =
411       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
412             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
413             rowid);
414   PQclear (res);
415   if (iret == GNUNET_NO)
416   {
417     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418                 "Processor asked for item %u to be removed.\n", rowid);
419     if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
420     {
421       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
422                        "Deleting %u bytes from database\n",
423                        (unsigned int) size);
424       plugin->env->duc (plugin->env->cls,
425                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
426       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
427                        "Deleted %u bytes from database\n", (unsigned int) size);
428     }
429   }
430 }
431
432
433 /**
434  * Iterate over the results for a particular key
435  * in the datastore.
436  *
437  * @param cls closure with the 'struct Plugin'
438  * @param offset offset of the result (modulo num-results);
439  *        specific ordering does not matter for the offset
440  * @param key maybe NULL (to match all entries)
441  * @param vhash hash of the value, maybe NULL (to
442  *        match all values that have the right key).
443  *        Note that for DBlocks there is no difference
444  *        betwen key and vhash, but for other blocks
445  *        there may be!
446  * @param type entries of which type are relevant?
447  *     Use 0 for any type.
448  * @param proc function to call on the matching value;
449  *        will be called once with a NULL if no value matches
450  * @param proc_cls closure for iter
451  */
452 static void
453 postgres_plugin_get_key (void *cls, uint64_t offset,
454                          const struct GNUNET_HashCode * key,
455                          const struct GNUNET_HashCode * vhash,
456                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
457                          void *proc_cls)
458 {
459   struct Plugin *plugin = cls;
460   const int paramFormats[] = { 1, 1, 1, 1, 1 };
461   int paramLengths[4];
462   const char *paramValues[4];
463   int nparams;
464   const char *pname;
465   PGresult *ret;
466   uint64_t total;
467   uint64_t blimit_off;
468   uint32_t btype;
469
470   GNUNET_assert (key != NULL);
471   paramValues[0] = (const char *) key;
472   paramLengths[0] = sizeof (struct GNUNET_HashCode);
473   btype = htonl (type);
474   if (type != 0)
475   {
476     if (vhash != NULL)
477     {
478       paramValues[1] = (const char *) vhash;
479       paramLengths[1] = sizeof (struct GNUNET_HashCode);
480       paramValues[2] = (const char *) &btype;
481       paramLengths[2] = sizeof (btype);
482       paramValues[3] = (const char *) &blimit_off;
483       paramLengths[3] = sizeof (blimit_off);
484       nparams = 4;
485       pname = "getvt";
486       ret =
487           PQexecParams (plugin->dbh,
488                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
489                         3, NULL, paramValues, paramLengths, paramFormats, 1);
490     }
491     else
492     {
493       paramValues[1] = (const char *) &btype;
494       paramLengths[1] = sizeof (btype);
495       paramValues[2] = (const char *) &blimit_off;
496       paramLengths[2] = sizeof (blimit_off);
497       nparams = 3;
498       pname = "gett";
499       ret =
500           PQexecParams (plugin->dbh,
501                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
502                         2, NULL, paramValues, paramLengths, paramFormats, 1);
503     }
504   }
505   else
506   {
507     if (vhash != NULL)
508     {
509       paramValues[1] = (const char *) vhash;
510       paramLengths[1] = sizeof (struct GNUNET_HashCode);
511       paramValues[2] = (const char *) &blimit_off;
512       paramLengths[2] = sizeof (blimit_off);
513       nparams = 3;
514       pname = "getv";
515       ret =
516           PQexecParams (plugin->dbh,
517                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
518                         2, NULL, paramValues, paramLengths, paramFormats, 1);
519     }
520     else
521     {
522       paramValues[1] = (const char *) &blimit_off;
523       paramLengths[1] = sizeof (blimit_off);
524       nparams = 2;
525       pname = "get";
526       ret =
527           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
528                         1, NULL, paramValues, paramLengths, paramFormats, 1);
529     }
530   }
531   if (GNUNET_OK !=
532       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
533   {
534     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
535     return;
536   }
537   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
538       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
539   {
540     GNUNET_break (0);
541     PQclear (ret);
542     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
543     return;
544   }
545   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
546   PQclear (ret);
547   if (total == 0)
548   {
549     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
550     return;
551   }
552   blimit_off = GNUNET_htonll (offset % total);
553   ret =
554       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
555                       paramFormats, 1);
556   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
557 }
558
559
560 /**
561  * Select a subset of the items in the datastore and call
562  * the given iterator for each of them.
563  *
564  * @param cls our "struct Plugin*"
565  * @param offset offset of the result (modulo num-results);
566  *        specific ordering does not matter for the offset
567  * @param type entries of which type should be considered?
568  *        Use 0 for any type.
569  * @param proc function to call on the matching value;
570  *        will be called with a NULL if no value matches
571  * @param proc_cls closure for proc
572  */
573 static void
574 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
575                                     enum GNUNET_BLOCK_Type type,
576                                     PluginDatumProcessor proc, void *proc_cls)
577 {
578   struct Plugin *plugin = cls;
579   uint32_t btype;
580   uint64_t boff;
581   const int paramFormats[] = { 1, 1 };
582   int paramLengths[] = { sizeof (btype), sizeof (boff) };
583   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
584   PGresult *ret;
585
586   btype = htonl ((uint32_t) type);
587   boff = GNUNET_htonll (offset);
588   ret =
589       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
590                       paramLengths, paramFormats, 1);
591   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
592 }
593
594
595 /**
596  * Context for 'repl_iter' function.
597  */
598 struct ReplCtx
599 {
600
601   /**
602    * Plugin handle.
603    */
604   struct Plugin *plugin;
605
606   /**
607    * Function to call for the result (or the NULL).
608    */
609   PluginDatumProcessor proc;
610
611   /**
612    * Closure for proc.
613    */
614   void *proc_cls;
615 };
616
617
618 /**
619  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
620  * Decrements the replication counter and calls the original
621  * iterator.
622  *
623  * @param cls closure with the 'struct ReplCtx*'
624  * @param key key for the content
625  * @param size number of bytes in data
626  * @param data content stored
627  * @param type type of the content
628  * @param priority priority of the content
629  * @param anonymity anonymity-level for the content
630  * @param expiration expiration time for the content
631  * @param uid unique identifier for the datum;
632  *        maybe 0 if no unique identifier is available
633  *
634  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
635  *         (continue on call to "next", of course),
636  *         GNUNET_NO to delete the item and continue (if supported)
637  */
638 static int
639 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
640            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
641            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
642            uint64_t uid)
643 {
644   struct ReplCtx *rc = cls;
645   struct Plugin *plugin = rc->plugin;
646   int ret;
647   PGresult *qret;
648   uint32_t boid;
649
650   ret =
651       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
652                 expiration, uid);
653   if (NULL != key)
654   {
655     boid = htonl ((uint32_t) uid);
656     const char *paramValues[] = {
657       (const char *) &boid,
658     };
659     int paramLengths[] = {
660       sizeof (boid),
661     };
662     const int paramFormats[] = { 1 };
663     qret =
664         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
665                         paramFormats, 1);
666     if (GNUNET_OK !=
667         GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
668                       "decrepl"))
669       return GNUNET_SYSERR;
670     PQclear (qret);
671   }
672   return ret;
673 }
674
675
676 /**
677  * Get a random item for replication.  Returns a single, not expired, random item
678  * from those with the highest replication counters.  The item's
679  * replication counter is decremented by one IF it was positive before.
680  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
681  *
682  * @param cls closure with the 'struct Plugin'
683  * @param proc function to call the value (once only).
684  * @param proc_cls closure for proc
685  */
686 static void
687 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
688                                  void *proc_cls)
689 {
690   struct Plugin *plugin = cls;
691   struct ReplCtx rc;
692   PGresult *ret;
693
694   rc.plugin = plugin;
695   rc.proc = proc;
696   rc.proc_cls = proc_cls;
697   ret =
698       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
699                       NULL, 1);
700   process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
701 }
702
703
704 /**
705  * Get a random item for expiration.
706  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
707  *
708  * @param cls closure with the 'struct Plugin'
709  * @param proc function to call the value (once only).
710  * @param proc_cls closure for proc
711  */
712 static void
713 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
714                                 void *proc_cls)
715 {
716   struct Plugin *plugin = cls;
717   uint64_t btime;
718   const int paramFormats[] = { 1 };
719   int paramLengths[] = { sizeof (btime) };
720   const char *paramValues[] = { (const char *) &btime };
721   PGresult *ret;
722
723   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value_us);
724   ret =
725       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
726                       paramLengths, paramFormats, 1);
727   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
728 }
729
730
731 /**
732  * Update the priority for a particular key in the datastore.  If
733  * the expiration time in value is different than the time found in
734  * the datastore, the higher value should be kept.  For the
735  * anonymity level, the lower value is to be used.  The specified
736  * priority should be added to the existing priority, ignoring the
737  * priority in value.
738  *
739  * Note that it is possible for multiple values to match this put.
740  * In that case, all of the respective values are updated.
741  *
742  * @param cls our "struct Plugin*"
743  * @param uid unique identifier of the datum
744  * @param delta by how much should the priority
745  *     change?  If priority + delta < 0 the
746  *     priority should be set to 0 (never go
747  *     negative).
748  * @param expire new expiration time should be the
749  *     MAX of any existing expiration time and
750  *     this value
751  * @param msg set to error message
752  * @return GNUNET_OK on success
753  */
754 static int
755 postgres_plugin_update (void *cls, uint64_t uid, int delta,
756                         struct GNUNET_TIME_Absolute expire, char **msg)
757 {
758   struct Plugin *plugin = cls;
759   PGresult *ret;
760   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
761   uint32_t boid = htonl ((uint32_t) uid);
762   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value_us__;
763
764   const char *paramValues[] = {
765     (const char *) &bdelta,
766     (const char *) &bexpire,
767     (const char *) &boid,
768   };
769   int paramLengths[] = {
770     sizeof (bdelta),
771     sizeof (bexpire),
772     sizeof (boid),
773   };
774   const int paramFormats[] = { 1, 1, 1 };
775
776   ret =
777       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
778                       paramFormats, 1);
779   if (GNUNET_OK !=
780       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
781     return GNUNET_SYSERR;
782   PQclear (ret);
783   return GNUNET_OK;
784 }
785
786
787
788 /**
789  * Get all of the keys in the datastore.
790  *
791  * @param cls closure with the 'struct Plugin'
792  * @param proc function to call on each key
793  * @param proc_cls closure for proc
794  */
795 static void
796 postgres_plugin_get_keys (void *cls,
797                           PluginKeyProcessor proc,
798                           void *proc_cls)
799 {
800   struct Plugin *plugin = cls;
801   int ret;
802   int i;
803   struct GNUNET_HashCode key;
804   PGresult * res;
805
806   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
807   ret = PQntuples (res);
808   for (i=0;i<ret;i++)
809   {
810     if (sizeof (struct GNUNET_HashCode) != PQgetlength (res, i, 0))
811     {
812       memcpy (&key, PQgetvalue (res, i, 0), sizeof (struct GNUNET_HashCode));
813       proc (proc_cls, &key, 1);
814     }
815   }
816   PQclear (res);
817 }
818
819
820
821 /**
822  * Drop database.
823  *
824  * @param cls closure with the 'struct Plugin'
825  */
826 static void
827 postgres_plugin_drop (void *cls)
828 {
829   struct Plugin *plugin = cls;
830
831   if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
832     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
833 }
834
835
836 /**
837  * Entry point for the plugin.
838  *
839  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
840  * @return our "struct Plugin*"
841  */
842 void *
843 libgnunet_plugin_datastore_postgres_init (void *cls)
844 {
845   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
846   struct GNUNET_DATASTORE_PluginFunctions *api;
847   struct Plugin *plugin;
848
849   plugin = GNUNET_new (struct Plugin);
850   plugin->env = env;
851   if (GNUNET_OK != init_connection (plugin))
852   {
853     GNUNET_free (plugin);
854     return NULL;
855   }
856   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
857   api->cls = plugin;
858   api->estimate_size = &postgres_plugin_estimate_size;
859   api->put = &postgres_plugin_put;
860   api->update = &postgres_plugin_update;
861   api->get_key = &postgres_plugin_get_key;
862   api->get_replication = &postgres_plugin_get_replication;
863   api->get_expiration = &postgres_plugin_get_expiration;
864   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
865   api->get_keys = &postgres_plugin_get_keys;
866   api->drop = &postgres_plugin_drop;
867   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
868                    _("Postgres database running\n"));
869   return api;
870 }
871
872
873 /**
874  * Exit point from the plugin.
875  * @param cls our "struct Plugin*"
876  * @return always NULL
877  */
878 void *
879 libgnunet_plugin_datastore_postgres_done (void *cls)
880 {
881   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
882   struct Plugin *plugin = api->cls;
883
884   PQfinish (plugin->dbh);
885   GNUNET_free (plugin);
886   GNUNET_free (api);
887   return NULL;
888 }
889
890 /* end of plugin_datastore_postgres.c */