-regex
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include <postgresql/libpq-fe.h>
31
32 #define DEBUG_POSTGRES GNUNET_EXTRA_LOGGING
33
34 /**
35  * After how many ms "busy" should a DB operation fail for good?
36  * A low value makes sure that we are more responsive to requests
37  * (especially PUTs).  A high value guarantees a higher success
38  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
39  *
40  * The default value of 1s should ensure that users do not experience
41  * huge latencies while at the same time allowing operations to succeed
42  * with reasonable probability.
43  */
44 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
45
46
47 /**
48  * Context for all functions in this plugin.
49  */
50 struct Plugin
51 {
52   /**
53    * Our execution environment.
54    */
55   struct GNUNET_DATASTORE_PluginEnvironment *env;
56
57   /**
58    * Native Postgres database handle.
59    */
60   PGconn *dbh;
61
62 };
63
64
65 /**
66  * @brief Get a database handle
67  *
68  * @param plugin global context
69  * @return GNUNET_OK on success, GNUNET_SYSERR on error
70  */
71 static int
72 init_connection (struct Plugin *plugin)
73 {
74   PGresult *ret;
75
76   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
77   if (NULL == plugin->dbh)
78     return GNUNET_SYSERR;
79   ret =
80       PQexec (plugin->dbh,
81               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
82               "  type INTEGER NOT NULL DEFAULT 0,"
83               "  prio INTEGER NOT NULL DEFAULT 0,"
84               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
85               "  expire BIGINT NOT NULL DEFAULT 0,"
86               "  rvalue BIGINT NOT NULL DEFAULT 0,"
87               "  hash BYTEA NOT NULL DEFAULT '',"
88               "  vhash BYTEA NOT NULL DEFAULT '',"
89               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
90   if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07",    /* duplicate table */
91                                                                                     PQresultErrorField
92                                                                                     (ret,
93                                                                                      PG_DIAG_SQLSTATE)))))
94   {
95     (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090");
96     PQfinish (plugin->dbh);
97     plugin->dbh = NULL;
98     return GNUNET_SYSERR;
99   }
100   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
101   {
102     if ((GNUNET_OK !=
103          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
104         (GNUNET_OK !=
105          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
106         (GNUNET_OK !=
107          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
108         (GNUNET_OK !=
109          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
110         (GNUNET_OK !=
111          GNUNET_POSTGRES_exec (plugin->dbh,
112                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
113         (GNUNET_OK !=
114          GNUNET_POSTGRES_exec (plugin->dbh,
115                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
116         (GNUNET_OK !=
117          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
118         (GNUNET_OK !=
119          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
120     {
121       PQclear (ret);
122       PQfinish (plugin->dbh);
123       plugin->dbh = NULL;
124       return GNUNET_SYSERR;
125     }
126   }
127   PQclear (ret);
128   ret =
129       PQexec (plugin->dbh,
130               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
131   if (GNUNET_OK !=
132       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
133   {
134     PQfinish (plugin->dbh);
135     plugin->dbh = NULL;
136     return GNUNET_SYSERR;
137   }
138   PQclear (ret);
139   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
140   if (GNUNET_OK !=
141       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
142   {
143     PQfinish (plugin->dbh);
144     plugin->dbh = NULL;
145     return GNUNET_SYSERR;
146   }
147   PQclear (ret);
148   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
149   if (GNUNET_OK !=
150       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
151   {
152     PQfinish (plugin->dbh);
153     plugin->dbh = NULL;
154     return GNUNET_SYSERR;
155   }
156   PQclear (ret);
157   if ((GNUNET_OK !=
158        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
159                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
160                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
161                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
162       (GNUNET_OK !=
163        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
164                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
165                    "WHERE hash=$1 AND type=$2 "
166                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
167       (GNUNET_OK !=
168        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
169                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
170                    "WHERE hash=$1 AND vhash=$2 "
171                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
172       (GNUNET_OK !=
173        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
174                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
175                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
176       (GNUNET_OK !=
177        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
178                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
179                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
180       (GNUNET_OK !=
181        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
182                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
183                    "WHERE oid = $3", 3)) ||
184       (GNUNET_OK !=
185        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
186                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
187                    "WHERE oid = $1", 1)) ||
188       (GNUNET_OK !=
189        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
190                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
191                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
192                    1)) ||
193       (GNUNET_OK !=
194        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
195                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
196                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
197                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
198                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
199                    1)) ||
200       (GNUNET_OK !=
201        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
202                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
203                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
204       (GNUNET_OK !=
205        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
206       (GNUNET_OK !=
207        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
208   {
209     PQfinish (plugin->dbh);
210     plugin->dbh = NULL;
211     return GNUNET_SYSERR;
212   }
213   return GNUNET_OK;
214 }
215
216
217 /**
218  * Get an estimate of how much space the database is
219  * currently using.
220  *
221  * @param cls our "struct Plugin*"
222  * @return number of bytes used on disk
223  */
224 static unsigned long long
225 postgres_plugin_estimate_size (void *cls)
226 {
227   struct Plugin *plugin = cls;
228   unsigned long long total;
229   PGresult *ret;
230
231   ret =
232       PQexecParams (plugin->dbh,
233                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
234                     NULL, NULL, NULL, NULL, 1);
235   if (GNUNET_OK !=
236       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
237   {
238     return 0;
239   }
240   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
241       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
242   {
243     GNUNET_break (0);
244     PQclear (ret);
245     return 0;
246   }
247   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
248   PQclear (ret);
249   return total;
250 }
251
252
253 /**
254  * Store an item in the datastore.
255  *
256  * @param cls closure with the 'struct Plugin' 
257  * @param key key for the item
258  * @param size number of bytes in data
259  * @param data content stored
260  * @param type type of the content
261  * @param priority priority of the content
262  * @param anonymity anonymity-level for the content
263  * @param replication replication-level for the content
264  * @param expiration expiration time for the content
265  * @param msg set to error message
266  * @return GNUNET_OK on success
267  */
268 static int
269 postgres_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size,
270                      const void *data, enum GNUNET_BLOCK_Type type,
271                      uint32_t priority, uint32_t anonymity,
272                      uint32_t replication,
273                      struct GNUNET_TIME_Absolute expiration, char **msg)
274 {
275   struct Plugin *plugin = cls;
276   GNUNET_HashCode vhash;
277   PGresult *ret;
278   uint32_t btype = htonl (type);
279   uint32_t bprio = htonl (priority);
280   uint32_t banon = htonl (anonymity);
281   uint32_t brepl = htonl (replication);
282   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
283
284   const char *paramValues[] = {
285     (const char *) &brepl,
286     (const char *) &btype,
287     (const char *) &bprio,
288     (const char *) &banon,
289     (const char *) &bexpi,
290     (const char *) key,
291     (const char *) &vhash,
292     (const char *) data
293   };
294   int paramLengths[] = {
295     sizeof (brepl),
296     sizeof (btype),
297     sizeof (bprio),
298     sizeof (banon),
299     sizeof (bexpi),
300     sizeof (GNUNET_HashCode),
301     sizeof (GNUNET_HashCode),
302     size
303   };
304   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
305
306   GNUNET_CRYPTO_hash (data, size, &vhash);
307   ret =
308       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
309                       paramFormats, 1);
310   if (GNUNET_OK !=
311       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
312     return GNUNET_SYSERR;
313   PQclear (ret);
314   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
315 #if DEBUG_POSTGRES
316   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
317                    "Stored %u bytes in database\n", (unsigned int) size);
318 #endif
319   return GNUNET_OK;
320 }
321
322
323 /**
324  * Function invoked to process the result and call
325  * the processor.
326  *
327  * @param plugin global plugin data
328  * @param proc function to call the value (once only).
329  * @param proc_cls closure for proc
330  * @param res result from exec
331  * @param filename filename for error messages
332  * @param line line number for error messages
333  */
334 static void
335 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
336                 void *proc_cls, PGresult * res, 
337                 const char *filename, int line)
338 {
339   int iret;
340   enum GNUNET_BLOCK_Type type;
341   uint32_t anonymity;
342   uint32_t priority;
343   uint32_t size;
344   unsigned int rowid;
345   struct GNUNET_TIME_Absolute expiration_time;
346   GNUNET_HashCode key;
347
348   if (GNUNET_OK !=
349       GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
350                                      filename, line))
351   {
352 #if DEBUG_POSTGRES
353     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
354                      "Ending iteration (postgres error)\n");
355 #endif
356     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
357     return;
358   }
359
360   if (0 == PQntuples (res))
361   {
362     /* no result */
363 #if DEBUG_POSTGRES
364     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
365                      "Ending iteration (no more results)\n");
366 #endif
367     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
368     PQclear (res);
369     return;
370   }
371   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
372       (sizeof (uint32_t) != PQfsize (res, 0)) ||
373       (sizeof (uint32_t) != PQfsize (res, 6)))
374   {
375     GNUNET_break (0);
376     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
377     PQclear (res);
378     return;
379   }
380   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
381   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
382       (sizeof (uint32_t) != PQfsize (res, 1)) ||
383       (sizeof (uint32_t) != PQfsize (res, 2)) ||
384       (sizeof (uint64_t) != PQfsize (res, 3)) ||
385       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)))
386   {
387     GNUNET_break (0);
388     PQclear (res);
389     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
390     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
391     return;
392   }
393
394   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
395   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
396   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
397   expiration_time.abs_value =
398       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
399   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
400   size = PQgetlength (res, 0, 5);
401 #if DEBUG_POSTGRES
402   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
403                    "Found result of size %u bytes and type %u in database\n",
404                    (unsigned int) size, (unsigned int) type);
405 #endif
406   iret =
407       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
408             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
409             rowid);
410   PQclear (res);
411   if (iret == GNUNET_NO)
412   {
413 #if DEBUG_POSTGRES
414     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
415                 "Processor asked for item %u to be removed.\n", rowid);
416 #endif
417     if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
418     {
419 #if DEBUG_POSTGRES
420       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
421                        "Deleting %u bytes from database\n",
422                        (unsigned int) size);
423 #endif
424       plugin->env->duc (plugin->env->cls,
425                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
426 #if DEBUG_POSTGRES
427       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
428                        "Deleted %u bytes from database\n", (unsigned int) size);
429 #endif
430     }
431   }
432 }
433
434
435 /**
436  * Iterate over the results for a particular key
437  * in the datastore.
438  *
439  * @param cls closure with the 'struct Plugin'
440  * @param offset offset of the result (modulo num-results);
441  *        specific ordering does not matter for the offset
442  * @param key maybe NULL (to match all entries)
443  * @param vhash hash of the value, maybe NULL (to
444  *        match all values that have the right key).
445  *        Note that for DBlocks there is no difference
446  *        betwen key and vhash, but for other blocks
447  *        there may be!
448  * @param type entries of which type are relevant?
449  *     Use 0 for any type.
450  * @param proc function to call on the matching value;
451  *        will be called once with a NULL if no value matches
452  * @param proc_cls closure for iter
453  */
454 static void
455 postgres_plugin_get_key (void *cls, uint64_t offset,
456                          const GNUNET_HashCode * key,
457                          const GNUNET_HashCode * vhash,
458                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
459                          void *proc_cls)
460 {
461   struct Plugin *plugin = cls;
462   const int paramFormats[] = { 1, 1, 1, 1, 1 };
463   int paramLengths[4];
464   const char *paramValues[4];
465   int nparams;
466   const char *pname;
467   PGresult *ret;
468   uint64_t total;
469   uint64_t blimit_off;
470   uint32_t btype;
471
472   GNUNET_assert (key != NULL);
473   paramValues[0] = (const char *) key;
474   paramLengths[0] = sizeof (GNUNET_HashCode);
475   btype = htonl (type);
476   if (type != 0)
477   {
478     if (vhash != NULL)
479     {
480       paramValues[1] = (const char *) vhash;
481       paramLengths[1] = sizeof (GNUNET_HashCode);
482       paramValues[2] = (const char *) &btype;
483       paramLengths[2] = sizeof (btype);
484       paramValues[3] = (const char *) &blimit_off;
485       paramLengths[3] = sizeof (blimit_off);
486       nparams = 4;
487       pname = "getvt";
488       ret =
489           PQexecParams (plugin->dbh,
490                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
491                         3, NULL, paramValues, paramLengths, paramFormats, 1);
492     }
493     else
494     {
495       paramValues[1] = (const char *) &btype;
496       paramLengths[1] = sizeof (btype);
497       paramValues[2] = (const char *) &blimit_off;
498       paramLengths[2] = sizeof (blimit_off);
499       nparams = 3;
500       pname = "gett";
501       ret =
502           PQexecParams (plugin->dbh,
503                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
504                         2, NULL, paramValues, paramLengths, paramFormats, 1);
505     }
506   }
507   else
508   {
509     if (vhash != NULL)
510     {
511       paramValues[1] = (const char *) vhash;
512       paramLengths[1] = sizeof (GNUNET_HashCode);
513       paramValues[2] = (const char *) &blimit_off;
514       paramLengths[2] = sizeof (blimit_off);
515       nparams = 3;
516       pname = "getv";
517       ret =
518           PQexecParams (plugin->dbh,
519                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
520                         2, NULL, paramValues, paramLengths, paramFormats, 1);
521     }
522     else
523     {
524       paramValues[1] = (const char *) &blimit_off;
525       paramLengths[1] = sizeof (blimit_off);
526       nparams = 2;
527       pname = "get";
528       ret =
529           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
530                         1, NULL, paramValues, paramLengths, paramFormats, 1);
531     }
532   }
533   if (GNUNET_OK !=
534       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
535   {
536     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
537     return;
538   }
539   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
540       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
541   {
542     GNUNET_break (0);
543     PQclear (ret);
544     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
545     return;
546   }
547   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
548   PQclear (ret);
549   if (total == 0)
550   {
551     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
552     return;
553   }
554   blimit_off = GNUNET_htonll (offset % total);
555   ret =
556       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
557                       paramFormats, 1);
558   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
559 }
560
561
562 /**
563  * Select a subset of the items in the datastore and call
564  * the given iterator for each of them.
565  *
566  * @param cls our "struct Plugin*"
567  * @param offset offset of the result (modulo num-results);
568  *        specific ordering does not matter for the offset
569  * @param type entries of which type should be considered?
570  *        Use 0 for any type.
571  * @param proc function to call on the matching value;
572  *        will be called with a NULL if no value matches
573  * @param proc_cls closure for proc
574  */
575 static void
576 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
577                                     enum GNUNET_BLOCK_Type type,
578                                     PluginDatumProcessor proc, void *proc_cls)
579 {
580   struct Plugin *plugin = cls;
581   uint32_t btype;
582   uint64_t boff;
583   const int paramFormats[] = { 1, 1 };
584   int paramLengths[] = { sizeof (btype), sizeof (boff) };
585   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
586   PGresult *ret;
587
588   btype = htonl ((uint32_t) type);
589   boff = GNUNET_htonll (offset);
590   ret =
591       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
592                       paramLengths, paramFormats, 1);
593   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
594 }
595
596
597 /**
598  * Context for 'repl_iter' function.
599  */
600 struct ReplCtx
601 {
602
603   /**
604    * Plugin handle.
605    */
606   struct Plugin *plugin;
607
608   /**
609    * Function to call for the result (or the NULL).
610    */
611   PluginDatumProcessor proc;
612
613   /**
614    * Closure for proc.
615    */
616   void *proc_cls;
617 };
618
619
620 /**
621  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
622  * Decrements the replication counter and calls the original
623  * iterator.
624  *
625  * @param cls closure with the 'struct ReplCtx*'
626  * @param key key for the content
627  * @param size number of bytes in data
628  * @param data content stored
629  * @param type type of the content
630  * @param priority priority of the content
631  * @param anonymity anonymity-level for the content
632  * @param expiration expiration time for the content
633  * @param uid unique identifier for the datum;
634  *        maybe 0 if no unique identifier is available
635  *
636  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
637  *         (continue on call to "next", of course),
638  *         GNUNET_NO to delete the item and continue (if supported)
639  */
640 static int
641 repl_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
642            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
643            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
644            uint64_t uid)
645 {
646   struct ReplCtx *rc = cls;
647   struct Plugin *plugin = rc->plugin;
648   int ret;
649   PGresult *qret;
650   uint32_t boid;
651
652   ret =
653       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
654                 expiration, uid);
655   if (NULL != key)
656   {
657     boid = htonl ((uint32_t) uid);
658     const char *paramValues[] = {
659       (const char *) &boid,
660     };
661     int paramLengths[] = {
662       sizeof (boid),
663     };
664     const int paramFormats[] = { 1 };
665     qret =
666         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
667                         paramFormats, 1);
668     if (GNUNET_OK !=
669         GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
670                       "decrepl"))
671       return GNUNET_SYSERR;
672     PQclear (qret);
673   }
674   return ret;
675 }
676
677
678 /**
679  * Get a random item for replication.  Returns a single, not expired, random item
680  * from those with the highest replication counters.  The item's
681  * replication counter is decremented by one IF it was positive before.
682  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
683  *
684  * @param cls closure with the 'struct Plugin'
685  * @param proc function to call the value (once only).
686  * @param proc_cls closure for proc
687  */
688 static void
689 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
690                                  void *proc_cls)
691 {
692   struct Plugin *plugin = cls;
693   struct ReplCtx rc;
694   PGresult *ret;
695
696   rc.plugin = plugin;
697   rc.proc = proc;
698   rc.proc_cls = proc_cls;
699   ret =
700       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
701                       NULL, 1);
702   process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
703 }
704
705
706 /**
707  * Get a random item for expiration.
708  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
709  *
710  * @param cls closure with the 'struct Plugin'
711  * @param proc function to call the value (once only).
712  * @param proc_cls closure for proc
713  */
714 static void
715 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
716                                 void *proc_cls)
717 {
718   struct Plugin *plugin = cls;
719   uint64_t btime;
720   const int paramFormats[] = { 1 };
721   int paramLengths[] = { sizeof (btime) };
722   const char *paramValues[] = { (const char *) &btime };
723   PGresult *ret;
724
725   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
726   ret =
727       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
728                       paramLengths, paramFormats, 1);
729   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
730 }
731
732
733 /**
734  * Update the priority for a particular key in the datastore.  If
735  * the expiration time in value is different than the time found in
736  * the datastore, the higher value should be kept.  For the
737  * anonymity level, the lower value is to be used.  The specified
738  * priority should be added to the existing priority, ignoring the
739  * priority in value.
740  *
741  * Note that it is possible for multiple values to match this put.
742  * In that case, all of the respective values are updated.
743  *
744  * @param cls our "struct Plugin*"
745  * @param uid unique identifier of the datum
746  * @param delta by how much should the priority
747  *     change?  If priority + delta < 0 the
748  *     priority should be set to 0 (never go
749  *     negative).
750  * @param expire new expiration time should be the
751  *     MAX of any existing expiration time and
752  *     this value
753  * @param msg set to error message
754  * @return GNUNET_OK on success
755  */
756 static int
757 postgres_plugin_update (void *cls, uint64_t uid, int delta,
758                         struct GNUNET_TIME_Absolute expire, char **msg)
759 {
760   struct Plugin *plugin = cls;
761   PGresult *ret;
762   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
763   uint32_t boid = htonl ((uint32_t) uid);
764   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
765
766   const char *paramValues[] = {
767     (const char *) &bdelta,
768     (const char *) &bexpire,
769     (const char *) &boid,
770   };
771   int paramLengths[] = {
772     sizeof (bdelta),
773     sizeof (bexpire),
774     sizeof (boid),
775   };
776   const int paramFormats[] = { 1, 1, 1 };
777
778   ret =
779       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
780                       paramFormats, 1);
781   if (GNUNET_OK !=
782       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
783     return GNUNET_SYSERR;
784   PQclear (ret);
785   return GNUNET_OK;
786 }
787
788
789
790 /**
791  * Get all of the keys in the datastore.
792  *
793  * @param cls closure with the 'struct Plugin'
794  * @param proc function to call on each key
795  * @param proc_cls closure for proc
796  */
797 static void
798 postgres_plugin_get_keys (void *cls,
799                           PluginKeyProcessor proc,
800                           void *proc_cls)
801 {
802   struct Plugin *plugin = cls;
803   int ret;
804   int i;
805   GNUNET_HashCode key;
806   PGresult * res;
807
808   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
809   ret = PQntuples (res);
810   for (i=0;i<ret;i++)
811   {
812     if (sizeof (GNUNET_HashCode) != PQgetlength (res, i, 0))
813     {
814       memcpy (&key, PQgetvalue (res, i, 0), sizeof (GNUNET_HashCode));
815       proc (proc_cls, &key, 1);    
816     }
817   }
818   PQclear (res);
819 }
820
821
822
823 /**
824  * Drop database.
825  *
826  * @param cls closure with the 'struct Plugin'
827  */
828 static void
829 postgres_plugin_drop (void *cls)
830 {
831   struct Plugin *plugin = cls;
832   
833   if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
834     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
835 }
836
837
838 /**
839  * Entry point for the plugin.
840  *
841  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
842  * @return our "struct Plugin*"
843  */
844 void *
845 libgnunet_plugin_datastore_postgres_init (void *cls)
846 {
847   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
848   struct GNUNET_DATASTORE_PluginFunctions *api;
849   struct Plugin *plugin;
850
851   plugin = GNUNET_malloc (sizeof (struct Plugin));
852   plugin->env = env;
853   if (GNUNET_OK != init_connection (plugin))
854   {
855     GNUNET_free (plugin);
856     return NULL;
857   }
858   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
859   api->cls = plugin;
860   api->estimate_size = &postgres_plugin_estimate_size;
861   api->put = &postgres_plugin_put;
862   api->update = &postgres_plugin_update;
863   api->get_key = &postgres_plugin_get_key;
864   api->get_replication = &postgres_plugin_get_replication;
865   api->get_expiration = &postgres_plugin_get_expiration;
866   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
867   api->get_keys = &postgres_plugin_get_keys;
868   api->drop = &postgres_plugin_drop;
869   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
870                    _("Postgres database running\n"));
871   return api;
872 }
873
874
875 /**
876  * Exit point from the plugin.
877  * @param cls our "struct Plugin*"
878  * @return always NULL
879  */
880 void *
881 libgnunet_plugin_datastore_postgres_done (void *cls)
882 {
883   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
884   struct Plugin *plugin = api->cls;
885
886   PQfinish (plugin->dbh);
887   GNUNET_free (plugin);
888   GNUNET_free (api);
889   return NULL;
890 }
891
892 /* end of plugin_datastore_postgres.c */