use enum GNUNET_ATS_Network_Type instead of uint32_t where appropriate
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include "gnunet_postgres_lib.h"
30 #include <postgresql/libpq-fe.h>
31
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * @brief Get a database handle
66  *
67  * @param plugin global context
68  * @return GNUNET_OK on success, GNUNET_SYSERR on error
69  */
70 static int
71 init_connection (struct Plugin *plugin)
72 {
73   PGresult *ret;
74
75   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
76   if (NULL == plugin->dbh)
77     return GNUNET_SYSERR;
78   ret =
79       PQexec (plugin->dbh,
80               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
81               "  type INTEGER NOT NULL DEFAULT 0,"
82               "  prio INTEGER NOT NULL DEFAULT 0,"
83               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
84               "  expire BIGINT NOT NULL DEFAULT 0,"
85               "  rvalue BIGINT NOT NULL DEFAULT 0,"
86               "  hash BYTEA NOT NULL DEFAULT '',"
87               "  vhash BYTEA NOT NULL DEFAULT '',"
88               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
89   if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07",    /* duplicate table */
90                                                                                     PQresultErrorField
91                                                                                     (ret,
92                                                                                      PG_DIAG_SQLSTATE)))))
93   {
94     (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090");
95     PQfinish (plugin->dbh);
96     plugin->dbh = NULL;
97     return GNUNET_SYSERR;
98   }
99   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
100   {
101     if ((GNUNET_OK !=
102          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
103         (GNUNET_OK !=
104          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
105         (GNUNET_OK !=
106          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
107         (GNUNET_OK !=
108          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
109         (GNUNET_OK !=
110          GNUNET_POSTGRES_exec (plugin->dbh,
111                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
112         (GNUNET_OK !=
113          GNUNET_POSTGRES_exec (plugin->dbh,
114                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
115         (GNUNET_OK !=
116          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
117         (GNUNET_OK !=
118          GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
119     {
120       PQclear (ret);
121       PQfinish (plugin->dbh);
122       plugin->dbh = NULL;
123       return GNUNET_SYSERR;
124     }
125   }
126   PQclear (ret);
127   ret =
128       PQexec (plugin->dbh,
129               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
130   if (GNUNET_OK !=
131       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
132   {
133     PQfinish (plugin->dbh);
134     plugin->dbh = NULL;
135     return GNUNET_SYSERR;
136   }
137   PQclear (ret);
138   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
139   if (GNUNET_OK !=
140       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
141   {
142     PQfinish (plugin->dbh);
143     plugin->dbh = NULL;
144     return GNUNET_SYSERR;
145   }
146   PQclear (ret);
147   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
148   if (GNUNET_OK !=
149       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
150   {
151     PQfinish (plugin->dbh);
152     plugin->dbh = NULL;
153     return GNUNET_SYSERR;
154   }
155   PQclear (ret);
156   if ((GNUNET_OK !=
157        GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
158                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
159                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
160                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
161       (GNUNET_OK !=
162        GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
163                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
164                    "WHERE hash=$1 AND type=$2 "
165                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
166       (GNUNET_OK !=
167        GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
168                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
169                    "WHERE hash=$1 AND vhash=$2 "
170                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
171       (GNUNET_OK !=
172        GNUNET_POSTGRES_prepare (plugin->dbh, "get",
173                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
174                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
175       (GNUNET_OK !=
176        GNUNET_POSTGRES_prepare (plugin->dbh, "put",
177                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
178                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
179       (GNUNET_OK !=
180        GNUNET_POSTGRES_prepare (plugin->dbh, "update",
181                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
182                    "WHERE oid = $3", 3)) ||
183       (GNUNET_OK !=
184        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
185                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
186                    "WHERE oid = $1", 1)) ||
187       (GNUNET_OK !=
188        GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
189                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
190                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
191                    1)) ||
192       (GNUNET_OK !=
193        GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
194                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
195                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
196                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
197                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
198                    1)) ||
199       (GNUNET_OK !=
200        GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
201                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
202                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
203       (GNUNET_OK !=
204        GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
205       (GNUNET_OK !=
206        GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
207   {
208     PQfinish (plugin->dbh);
209     plugin->dbh = NULL;
210     return GNUNET_SYSERR;
211   }
212   return GNUNET_OK;
213 }
214
215
216 /**
217  * Get an estimate of how much space the database is
218  * currently using.
219  *
220  * @param cls our "struct Plugin*"
221  * @return number of bytes used on disk
222  */
223 static unsigned long long
224 postgres_plugin_estimate_size (void *cls)
225 {
226   struct Plugin *plugin = cls;
227   unsigned long long total;
228   PGresult *ret;
229
230   ret =
231       PQexecParams (plugin->dbh,
232                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
233                     NULL, NULL, NULL, NULL, 1);
234   if (GNUNET_OK !=
235       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
236   {
237     return 0;
238   }
239   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
240   {
241     GNUNET_break (0);
242     PQclear (ret);
243     return 0;
244   }
245   if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
246   {
247     GNUNET_break (0 == PQgetlength (ret, 0, 0));
248     PQclear (ret);
249     return 0;
250   }
251   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
252   PQclear (ret);
253   return total;
254 }
255
256
257 /**
258  * Store an item in the datastore.
259  *
260  * @param cls closure with the 'struct Plugin'
261  * @param key key for the item
262  * @param size number of bytes in data
263  * @param data content stored
264  * @param type type of the content
265  * @param priority priority of the content
266  * @param anonymity anonymity-level for the content
267  * @param replication replication-level for the content
268  * @param expiration expiration time for the content
269  * @param msg set to error message
270  * @return GNUNET_OK on success
271  */
272 static int
273 postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
274                      const void *data, enum GNUNET_BLOCK_Type type,
275                      uint32_t priority, uint32_t anonymity,
276                      uint32_t replication,
277                      struct GNUNET_TIME_Absolute expiration, char **msg)
278 {
279   struct Plugin *plugin = cls;
280   struct GNUNET_HashCode vhash;
281   PGresult *ret;
282   uint32_t btype = htonl (type);
283   uint32_t bprio = htonl (priority);
284   uint32_t banon = htonl (anonymity);
285   uint32_t brepl = htonl (replication);
286   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value_us__;
287
288   const char *paramValues[] = {
289     (const char *) &brepl,
290     (const char *) &btype,
291     (const char *) &bprio,
292     (const char *) &banon,
293     (const char *) &bexpi,
294     (const char *) key,
295     (const char *) &vhash,
296     (const char *) data
297   };
298   int paramLengths[] = {
299     sizeof (brepl),
300     sizeof (btype),
301     sizeof (bprio),
302     sizeof (banon),
303     sizeof (bexpi),
304     sizeof (struct GNUNET_HashCode),
305     sizeof (struct GNUNET_HashCode),
306     size
307   };
308   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
309
310   GNUNET_CRYPTO_hash (data, size, &vhash);
311   ret =
312       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
313                       paramFormats, 1);
314   if (GNUNET_OK !=
315       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
316     return GNUNET_SYSERR;
317   PQclear (ret);
318   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
319   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
320                    "Stored %u bytes in database\n", (unsigned int) size);
321   return GNUNET_OK;
322 }
323
324
325 /**
326  * Function invoked to process the result and call
327  * the processor.
328  *
329  * @param plugin global plugin data
330  * @param proc function to call the value (once only).
331  * @param proc_cls closure for proc
332  * @param res result from exec
333  * @param filename filename for error messages
334  * @param line line number for error messages
335  */
336 static void
337 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
338                 void *proc_cls, PGresult * res,
339                 const char *filename, int line)
340 {
341   int iret;
342   enum GNUNET_BLOCK_Type type;
343   uint32_t anonymity;
344   uint32_t priority;
345   uint32_t size;
346   unsigned int rowid;
347   struct GNUNET_TIME_Absolute expiration_time;
348   struct GNUNET_HashCode key;
349
350   if (GNUNET_OK !=
351       GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
352                                      filename, line))
353   {
354     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
355                      "Ending iteration (postgres error)\n");
356     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
357     return;
358   }
359
360   if (0 == PQntuples (res))
361   {
362     /* no result */
363     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
364                      "Ending iteration (no more results)\n");
365     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
366     PQclear (res);
367     return;
368   }
369   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
370       (sizeof (uint32_t) != PQfsize (res, 0)) ||
371       (sizeof (uint32_t) != PQfsize (res, 6)))
372   {
373     GNUNET_break (0);
374     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
375     PQclear (res);
376     return;
377   }
378   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
379   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
380       (sizeof (uint32_t) != PQfsize (res, 1)) ||
381       (sizeof (uint32_t) != PQfsize (res, 2)) ||
382       (sizeof (uint64_t) != PQfsize (res, 3)) ||
383       (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 4)))
384   {
385     GNUNET_break (0);
386     PQclear (res);
387     GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
388     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
389     return;
390   }
391
392   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
393   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
394   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
395   expiration_time.abs_value_us =
396       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
397   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (struct GNUNET_HashCode));
398   size = PQgetlength (res, 0, 5);
399   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
400                    "Found result of size %u bytes and type %u in database\n",
401                    (unsigned int) size, (unsigned int) type);
402   iret =
403       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
404             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
405             rowid);
406   PQclear (res);
407   if (iret == GNUNET_NO)
408   {
409     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
410                 "Processor asked for item %u to be removed.\n", rowid);
411     if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
412     {
413       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
414                        "Deleting %u bytes from database\n",
415                        (unsigned int) size);
416       plugin->env->duc (plugin->env->cls,
417                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
418       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
419                        "Deleted %u bytes from database\n", (unsigned int) size);
420     }
421   }
422 }
423
424
425 /**
426  * Iterate over the results for a particular key
427  * in the datastore.
428  *
429  * @param cls closure with the 'struct Plugin'
430  * @param offset offset of the result (modulo num-results);
431  *        specific ordering does not matter for the offset
432  * @param key maybe NULL (to match all entries)
433  * @param vhash hash of the value, maybe NULL (to
434  *        match all values that have the right key).
435  *        Note that for DBlocks there is no difference
436  *        betwen key and vhash, but for other blocks
437  *        there may be!
438  * @param type entries of which type are relevant?
439  *     Use 0 for any type.
440  * @param proc function to call on the matching value;
441  *        will be called once with a NULL if no value matches
442  * @param proc_cls closure for iter
443  */
444 static void
445 postgres_plugin_get_key (void *cls, uint64_t offset,
446                          const struct GNUNET_HashCode * key,
447                          const struct GNUNET_HashCode * vhash,
448                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
449                          void *proc_cls)
450 {
451   struct Plugin *plugin = cls;
452   const int paramFormats[] = { 1, 1, 1, 1, 1 };
453   int paramLengths[4];
454   const char *paramValues[4];
455   int nparams;
456   const char *pname;
457   PGresult *ret;
458   uint64_t total;
459   uint64_t blimit_off;
460   uint32_t btype;
461
462   GNUNET_assert (key != NULL);
463   paramValues[0] = (const char *) key;
464   paramLengths[0] = sizeof (struct GNUNET_HashCode);
465   btype = htonl (type);
466   if (type != 0)
467   {
468     if (vhash != NULL)
469     {
470       paramValues[1] = (const char *) vhash;
471       paramLengths[1] = sizeof (struct GNUNET_HashCode);
472       paramValues[2] = (const char *) &btype;
473       paramLengths[2] = sizeof (btype);
474       paramValues[3] = (const char *) &blimit_off;
475       paramLengths[3] = sizeof (blimit_off);
476       nparams = 4;
477       pname = "getvt";
478       ret =
479           PQexecParams (plugin->dbh,
480                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
481                         3, NULL, paramValues, paramLengths, paramFormats, 1);
482     }
483     else
484     {
485       paramValues[1] = (const char *) &btype;
486       paramLengths[1] = sizeof (btype);
487       paramValues[2] = (const char *) &blimit_off;
488       paramLengths[2] = sizeof (blimit_off);
489       nparams = 3;
490       pname = "gett";
491       ret =
492           PQexecParams (plugin->dbh,
493                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
494                         2, NULL, paramValues, paramLengths, paramFormats, 1);
495     }
496   }
497   else
498   {
499     if (vhash != NULL)
500     {
501       paramValues[1] = (const char *) vhash;
502       paramLengths[1] = sizeof (struct GNUNET_HashCode);
503       paramValues[2] = (const char *) &blimit_off;
504       paramLengths[2] = sizeof (blimit_off);
505       nparams = 3;
506       pname = "getv";
507       ret =
508           PQexecParams (plugin->dbh,
509                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
510                         2, NULL, paramValues, paramLengths, paramFormats, 1);
511     }
512     else
513     {
514       paramValues[1] = (const char *) &blimit_off;
515       paramLengths[1] = sizeof (blimit_off);
516       nparams = 2;
517       pname = "get";
518       ret =
519           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
520                         1, NULL, paramValues, paramLengths, paramFormats, 1);
521     }
522   }
523   if (GNUNET_OK !=
524       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
525   {
526     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
527     return;
528   }
529   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
530       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
531   {
532     GNUNET_break (0);
533     PQclear (ret);
534     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
535     return;
536   }
537   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
538   PQclear (ret);
539   if (total == 0)
540   {
541     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
542     return;
543   }
544   blimit_off = GNUNET_htonll (offset % total);
545   ret =
546       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
547                       paramFormats, 1);
548   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
549 }
550
551
552 /**
553  * Select a subset of the items in the datastore and call
554  * the given iterator for each of them.
555  *
556  * @param cls our "struct Plugin*"
557  * @param offset offset of the result (modulo num-results);
558  *        specific ordering does not matter for the offset
559  * @param type entries of which type should be considered?
560  *        Use 0 for any type.
561  * @param proc function to call on the matching value;
562  *        will be called with a NULL if no value matches
563  * @param proc_cls closure for proc
564  */
565 static void
566 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
567                                     enum GNUNET_BLOCK_Type type,
568                                     PluginDatumProcessor proc, void *proc_cls)
569 {
570   struct Plugin *plugin = cls;
571   uint32_t btype;
572   uint64_t boff;
573   const int paramFormats[] = { 1, 1 };
574   int paramLengths[] = { sizeof (btype), sizeof (boff) };
575   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
576   PGresult *ret;
577
578   btype = htonl ((uint32_t) type);
579   boff = GNUNET_htonll (offset);
580   ret =
581       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
582                       paramLengths, paramFormats, 1);
583   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
584 }
585
586
587 /**
588  * Context for 'repl_iter' function.
589  */
590 struct ReplCtx
591 {
592
593   /**
594    * Plugin handle.
595    */
596   struct Plugin *plugin;
597
598   /**
599    * Function to call for the result (or the NULL).
600    */
601   PluginDatumProcessor proc;
602
603   /**
604    * Closure for proc.
605    */
606   void *proc_cls;
607 };
608
609
610 /**
611  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
612  * Decrements the replication counter and calls the original
613  * iterator.
614  *
615  * @param cls closure with the 'struct ReplCtx*'
616  * @param key key for the content
617  * @param size number of bytes in data
618  * @param data content stored
619  * @param type type of the content
620  * @param priority priority of the content
621  * @param anonymity anonymity-level for the content
622  * @param expiration expiration time for the content
623  * @param uid unique identifier for the datum;
624  *        maybe 0 if no unique identifier is available
625  *
626  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
627  *         (continue on call to "next", of course),
628  *         GNUNET_NO to delete the item and continue (if supported)
629  */
630 static int
631 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
632            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
633            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
634            uint64_t uid)
635 {
636   struct ReplCtx *rc = cls;
637   struct Plugin *plugin = rc->plugin;
638   int ret;
639   PGresult *qret;
640   uint32_t boid;
641
642   ret =
643       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
644                 expiration, uid);
645   if (NULL != key)
646   {
647     boid = htonl ((uint32_t) uid);
648     const char *paramValues[] = {
649       (const char *) &boid,
650     };
651     int paramLengths[] = {
652       sizeof (boid),
653     };
654     const int paramFormats[] = { 1 };
655     qret =
656         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
657                         paramFormats, 1);
658     if (GNUNET_OK !=
659         GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
660                       "decrepl"))
661       return GNUNET_SYSERR;
662     PQclear (qret);
663   }
664   return ret;
665 }
666
667
668 /**
669  * Get a random item for replication.  Returns a single, not expired, random item
670  * from those with the highest replication counters.  The item's
671  * replication counter is decremented by one IF it was positive before.
672  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
673  *
674  * @param cls closure with the 'struct Plugin'
675  * @param proc function to call the value (once only).
676  * @param proc_cls closure for proc
677  */
678 static void
679 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
680                                  void *proc_cls)
681 {
682   struct Plugin *plugin = cls;
683   struct ReplCtx rc;
684   PGresult *ret;
685
686   rc.plugin = plugin;
687   rc.proc = proc;
688   rc.proc_cls = proc_cls;
689   ret =
690       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
691                       NULL, 1);
692   process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
693 }
694
695
696 /**
697  * Get a random item for expiration.
698  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
699  *
700  * @param cls closure with the 'struct Plugin'
701  * @param proc function to call the value (once only).
702  * @param proc_cls closure for proc
703  */
704 static void
705 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
706                                 void *proc_cls)
707 {
708   struct Plugin *plugin = cls;
709   uint64_t btime;
710   const int paramFormats[] = { 1 };
711   int paramLengths[] = { sizeof (btime) };
712   const char *paramValues[] = { (const char *) &btime };
713   PGresult *ret;
714
715   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value_us);
716   ret =
717       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
718                       paramLengths, paramFormats, 1);
719   process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
720 }
721
722
723 /**
724  * Update the priority for a particular key in the datastore.  If
725  * the expiration time in value is different than the time found in
726  * the datastore, the higher value should be kept.  For the
727  * anonymity level, the lower value is to be used.  The specified
728  * priority should be added to the existing priority, ignoring the
729  * priority in value.
730  *
731  * Note that it is possible for multiple values to match this put.
732  * In that case, all of the respective values are updated.
733  *
734  * @param cls our "struct Plugin*"
735  * @param uid unique identifier of the datum
736  * @param delta by how much should the priority
737  *     change?  If priority + delta < 0 the
738  *     priority should be set to 0 (never go
739  *     negative).
740  * @param expire new expiration time should be the
741  *     MAX of any existing expiration time and
742  *     this value
743  * @param msg set to error message
744  * @return GNUNET_OK on success
745  */
746 static int
747 postgres_plugin_update (void *cls, uint64_t uid, int delta,
748                         struct GNUNET_TIME_Absolute expire, char **msg)
749 {
750   struct Plugin *plugin = cls;
751   PGresult *ret;
752   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
753   uint32_t boid = htonl ((uint32_t) uid);
754   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value_us__;
755
756   const char *paramValues[] = {
757     (const char *) &bdelta,
758     (const char *) &bexpire,
759     (const char *) &boid,
760   };
761   int paramLengths[] = {
762     sizeof (bdelta),
763     sizeof (bexpire),
764     sizeof (boid),
765   };
766   const int paramFormats[] = { 1, 1, 1 };
767
768   ret =
769       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
770                       paramFormats, 1);
771   if (GNUNET_OK !=
772       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
773     return GNUNET_SYSERR;
774   PQclear (ret);
775   return GNUNET_OK;
776 }
777
778
779
780 /**
781  * Get all of the keys in the datastore.
782  *
783  * @param cls closure with the 'struct Plugin'
784  * @param proc function to call on each key
785  * @param proc_cls closure for proc
786  */
787 static void
788 postgres_plugin_get_keys (void *cls,
789                           PluginKeyProcessor proc,
790                           void *proc_cls)
791 {
792   struct Plugin *plugin = cls;
793   int ret;
794   int i;
795   struct GNUNET_HashCode key;
796   PGresult * res;
797
798   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
799   ret = PQntuples (res);
800   for (i=0;i<ret;i++)
801   {
802     if (sizeof (struct GNUNET_HashCode) != PQgetlength (res, i, 0))
803     {
804       memcpy (&key, PQgetvalue (res, i, 0), sizeof (struct GNUNET_HashCode));
805       proc (proc_cls, &key, 1);
806     }
807   }
808   PQclear (res);
809 }
810
811
812
813 /**
814  * Drop database.
815  *
816  * @param cls closure with the 'struct Plugin'
817  */
818 static void
819 postgres_plugin_drop (void *cls)
820 {
821   struct Plugin *plugin = cls;
822
823   if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
824     GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
825 }
826
827
828 /**
829  * Entry point for the plugin.
830  *
831  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
832  * @return our "struct Plugin*"
833  */
834 void *
835 libgnunet_plugin_datastore_postgres_init (void *cls)
836 {
837   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
838   struct GNUNET_DATASTORE_PluginFunctions *api;
839   struct Plugin *plugin;
840
841   plugin = GNUNET_malloc (sizeof (struct Plugin));
842   plugin->env = env;
843   if (GNUNET_OK != init_connection (plugin))
844   {
845     GNUNET_free (plugin);
846     return NULL;
847   }
848   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
849   api->cls = plugin;
850   api->estimate_size = &postgres_plugin_estimate_size;
851   api->put = &postgres_plugin_put;
852   api->update = &postgres_plugin_update;
853   api->get_key = &postgres_plugin_get_key;
854   api->get_replication = &postgres_plugin_get_replication;
855   api->get_expiration = &postgres_plugin_get_expiration;
856   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
857   api->get_keys = &postgres_plugin_get_keys;
858   api->drop = &postgres_plugin_drop;
859   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
860                    _("Postgres database running\n"));
861   return api;
862 }
863
864
865 /**
866  * Exit point from the plugin.
867  * @param cls our "struct Plugin*"
868  * @return always NULL
869  */
870 void *
871 libgnunet_plugin_datastore_postgres_done (void *cls)
872 {
873   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
874   struct Plugin *plugin = api->cls;
875
876   PQfinish (plugin->dbh);
877   GNUNET_free (plugin);
878   GNUNET_free (api);
879   return NULL;
880 }
881
882 /* end of plugin_datastore_postgres.c */