include plugin in gnunet-transport output
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_EXTRA_LOGGING
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * Check if the result obtained from Postgres has
66  * the desired status code.  If not, log an error, clear the
67  * result and return GNUNET_SYSERR.
68  *
69  * @param plugin global context
70  * @param ret result to check
71  * @param expected_status expected return value
72  * @param command name of SQL command that was run
73  * @param args arguments to SQL command
74  * @param line line number for error reporting
75  * @return GNUNET_OK if the result is acceptable
76  */
77 static int
78 check_result (struct Plugin *plugin, PGresult * ret, int expected_status,
79               const char *command, const char *args, int line)
80 {
81   if (ret == NULL)
82   {
83     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
84                      "datastore-postgres",
85                      "Postgres failed to allocate result for `%s:%s' at %d\n",
86                      command, args, line);
87     return GNUNET_SYSERR;
88   }
89   if (PQresultStatus (ret) != expected_status)
90   {
91     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
92                      "datastore-postgres",
93                      _("`%s:%s' failed at %s:%d with error: %s"), command, args,
94                      __FILE__, line, PQerrorMessage (plugin->dbh));
95     PQclear (ret);
96     return GNUNET_SYSERR;
97   }
98   return GNUNET_OK;
99 }
100
101 /**
102  * Run simple SQL statement (without results).
103  *
104  * @param plugin global context
105  * @param sql statement to run
106  * @param line code line for error reporting
107  */
108 static int
109 pq_exec (struct Plugin *plugin, const char *sql, int line)
110 {
111   PGresult *ret;
112
113   ret = PQexec (plugin->dbh, sql);
114   if (GNUNET_OK !=
115       check_result (plugin, ret, PGRES_COMMAND_OK, "PQexec", sql, line))
116     return GNUNET_SYSERR;
117   PQclear (ret);
118   return GNUNET_OK;
119 }
120
121 /**
122  * Prepare SQL statement.
123  *
124  * @param plugin global context
125  * @param name name for the prepared SQL statement
126  * @param sql SQL code to prepare
127  * @param nparams number of parameters in sql
128  * @param line code line for error reporting
129  * @return GNUNET_OK on success
130  */
131 static int
132 pq_prepare (struct Plugin *plugin, const char *name, const char *sql,
133             int nparams, int line)
134 {
135   PGresult *ret;
136
137   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
138   if (GNUNET_OK !=
139       check_result (plugin, ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
140     return GNUNET_SYSERR;
141   PQclear (ret);
142   return GNUNET_OK;
143 }
144
145 /**
146  * @brief Get a database handle
147  *
148  * @param plugin global context
149  * @return GNUNET_OK on success, GNUNET_SYSERR on error
150  */
151 static int
152 init_connection (struct Plugin *plugin)
153 {
154   char *conninfo;
155   PGresult *ret;
156
157   /* Open database and precompile statements */
158   conninfo = NULL;
159   (void) GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
160                                                 "datastore-postgres", "CONFIG",
161                                                 &conninfo);
162   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
163   if (NULL == plugin->dbh)
164   {
165     /* FIXME: warn about out-of-memory? */
166     GNUNET_free_non_null (conninfo);
167     return GNUNET_SYSERR;
168   }
169   if (PQstatus (plugin->dbh) != CONNECTION_OK)
170   {
171     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "datastore-postgres",
172                      _
173                      ("Unable to initialize Postgres with configuration `%s': %s"),
174                      conninfo, PQerrorMessage (plugin->dbh));
175     PQfinish (plugin->dbh);
176     plugin->dbh = NULL;
177     GNUNET_free_non_null (conninfo);
178     return GNUNET_SYSERR;
179   }
180   GNUNET_free_non_null (conninfo);
181   ret =
182       PQexec (plugin->dbh,
183               "CREATE TABLE gn090 (" "  repl INTEGER NOT NULL DEFAULT 0,"
184               "  type INTEGER NOT NULL DEFAULT 0,"
185               "  prio INTEGER NOT NULL DEFAULT 0,"
186               "  anonLevel INTEGER NOT NULL DEFAULT 0,"
187               "  expire BIGINT NOT NULL DEFAULT 0,"
188               "  rvalue BIGINT NOT NULL DEFAULT 0,"
189               "  hash BYTEA NOT NULL DEFAULT '',"
190               "  vhash BYTEA NOT NULL DEFAULT '',"
191               "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
192   if ((ret == NULL) || ((PQresultStatus (ret) != PGRES_COMMAND_OK) && (0 != strcmp ("42P07",    /* duplicate table */
193                                                                                     PQresultErrorField
194                                                                                     (ret,
195                                                                                      PG_DIAG_SQLSTATE)))))
196   {
197     (void) check_result (plugin, ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090",
198                          __LINE__);
199     PQfinish (plugin->dbh);
200     plugin->dbh = NULL;
201     return GNUNET_SYSERR;
202   }
203   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
204   {
205     if ((GNUNET_OK !=
206          pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
207         (GNUNET_OK !=
208          pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
209                   __LINE__)) ||
210         (GNUNET_OK !=
211          pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__)) ||
212         (GNUNET_OK !=
213          pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)",
214                   __LINE__)) ||
215         (GNUNET_OK !=
216          pq_exec (plugin,
217                   "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)",
218                   __LINE__)) ||
219         (GNUNET_OK !=
220          pq_exec (plugin,
221                   "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)",
222                   __LINE__)) ||
223         (GNUNET_OK !=
224          pq_exec (plugin, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)",
225                   __LINE__)) ||
226         (GNUNET_OK !=
227          pq_exec (plugin, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)",
228                   __LINE__)))
229     {
230       PQclear (ret);
231       PQfinish (plugin->dbh);
232       plugin->dbh = NULL;
233       return GNUNET_SYSERR;
234     }
235   }
236   PQclear (ret);
237   ret =
238       PQexec (plugin->dbh,
239               "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
240   if (GNUNET_OK !=
241       check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090",
242                     __LINE__))
243   {
244     PQfinish (plugin->dbh);
245     plugin->dbh = NULL;
246     return GNUNET_SYSERR;
247   }
248   PQclear (ret);
249   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
250   if (GNUNET_OK !=
251       check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090",
252                     __LINE__))
253   {
254     PQfinish (plugin->dbh);
255     plugin->dbh = NULL;
256     return GNUNET_SYSERR;
257   }
258   PQclear (ret);
259   ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
260   if (GNUNET_OK !=
261       check_result (plugin, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090",
262                     __LINE__))
263   {
264     PQfinish (plugin->dbh);
265     plugin->dbh = NULL;
266     return GNUNET_SYSERR;
267   }
268   PQclear (ret);
269   if ((GNUNET_OK !=
270        pq_prepare (plugin, "getvt",
271                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
272                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
273                    "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4, __LINE__)) ||
274       (GNUNET_OK !=
275        pq_prepare (plugin, "gett",
276                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
277                    "WHERE hash=$1 AND type=$2 "
278                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3, __LINE__)) ||
279       (GNUNET_OK !=
280        pq_prepare (plugin, "getv",
281                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
282                    "WHERE hash=$1 AND vhash=$2 "
283                    "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3, __LINE__)) ||
284       (GNUNET_OK !=
285        pq_prepare (plugin, "get",
286                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
287                    "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2,
288                    __LINE__)) ||
289       (GNUNET_OK !=
290        pq_prepare (plugin, "put",
291                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
292                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9,
293                    __LINE__)) ||
294       (GNUNET_OK !=
295        pq_prepare (plugin, "update",
296                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
297                    "WHERE oid = $3", 3, __LINE__)) ||
298       (GNUNET_OK !=
299        pq_prepare (plugin, "decrepl",
300                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
301                    "WHERE oid = $1", 1, __LINE__)) ||
302       (GNUNET_OK !=
303        pq_prepare (plugin, "select_non_anonymous",
304                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
305                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
306                    1, __LINE__)) ||
307       (GNUNET_OK !=
308        pq_prepare (plugin, "select_expiration_order",
309                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
310                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
311                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
312                    "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
313                    1, __LINE__)) ||
314       (GNUNET_OK !=
315        pq_prepare (plugin, "select_replication_order",
316                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
317                    "ORDER BY repl DESC,RANDOM() LIMIT 1", 0, __LINE__)) ||
318       (GNUNET_OK !=
319        pq_prepare (plugin, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1,
320                    __LINE__)) ||
321       (GNUNET_OK !=
322        pq_prepare (plugin, "get_keys", "SELECT hash FROM gn090", 0,
323                    __LINE__)))
324   {
325     PQfinish (plugin->dbh);
326     plugin->dbh = NULL;
327     return GNUNET_SYSERR;
328   }
329   return GNUNET_OK;
330 }
331
332
333 /**
334  * Delete the row identified by the given rowid (qid
335  * in postgres).
336  *
337  * @param plugin global context
338  * @param rowid which row to delete
339  * @return GNUNET_OK on success
340  */
341 static int
342 delete_by_rowid (struct Plugin *plugin, unsigned int rowid)
343 {
344   uint32_t browid;
345   const char *paramValues[] = { (const char *) &browid };
346   int paramLengths[] = { sizeof (browid) };
347   const int paramFormats[] = { 1 };
348   PGresult *ret;
349
350   browid = htonl (rowid);
351   ret =
352       PQexecPrepared (plugin->dbh, "delrow", 1, paramValues, paramLengths,
353                       paramFormats, 1);
354   if (GNUNET_OK !=
355       check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
356                     __LINE__))
357   {
358     return GNUNET_SYSERR;
359   }
360   PQclear (ret);
361   return GNUNET_OK;
362 }
363
364
365 /**
366  * Get an estimate of how much space the database is
367  * currently using.
368  *
369  * @param cls our "struct Plugin*"
370  * @return number of bytes used on disk
371  */
372 static unsigned long long
373 postgres_plugin_estimate_size (void *cls)
374 {
375   struct Plugin *plugin = cls;
376   unsigned long long total;
377   PGresult *ret;
378
379   ret =
380       PQexecParams (plugin->dbh,
381                     "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
382                     NULL, NULL, NULL, NULL, 1);
383   if (GNUNET_OK !=
384       check_result (plugin, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size",
385                     __LINE__))
386   {
387     return 0;
388   }
389   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
390       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
391   {
392     GNUNET_break (0);
393     PQclear (ret);
394     return 0;
395   }
396   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
397   PQclear (ret);
398   return total;
399 }
400
401
402 /**
403  * Store an item in the datastore.
404  *
405  * @param cls closure
406  * @param key key for the item
407  * @param size number of bytes in data
408  * @param data content stored
409  * @param type type of the content
410  * @param priority priority of the content
411  * @param anonymity anonymity-level for the content
412  * @param replication replication-level for the content
413  * @param expiration expiration time for the content
414  * @param msg set to error message
415  * @return GNUNET_OK on success
416  */
417 static int
418 postgres_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size,
419                      const void *data, enum GNUNET_BLOCK_Type type,
420                      uint32_t priority, uint32_t anonymity,
421                      uint32_t replication,
422                      struct GNUNET_TIME_Absolute expiration, char **msg)
423 {
424   struct Plugin *plugin = cls;
425   GNUNET_HashCode vhash;
426   PGresult *ret;
427   uint32_t btype = htonl (type);
428   uint32_t bprio = htonl (priority);
429   uint32_t banon = htonl (anonymity);
430   uint32_t brepl = htonl (replication);
431   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
432
433   const char *paramValues[] = {
434     (const char *) &brepl,
435     (const char *) &btype,
436     (const char *) &bprio,
437     (const char *) &banon,
438     (const char *) &bexpi,
439     (const char *) key,
440     (const char *) &vhash,
441     (const char *) data
442   };
443   int paramLengths[] = {
444     sizeof (brepl),
445     sizeof (btype),
446     sizeof (bprio),
447     sizeof (banon),
448     sizeof (bexpi),
449     sizeof (GNUNET_HashCode),
450     sizeof (GNUNET_HashCode),
451     size
452   };
453   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
454
455   GNUNET_CRYPTO_hash (data, size, &vhash);
456   ret =
457       PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
458                       paramFormats, 1);
459   if (GNUNET_OK !=
460       check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put",
461                     __LINE__))
462     return GNUNET_SYSERR;
463   PQclear (ret);
464   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
465 #if DEBUG_POSTGRES
466   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
467                    "Stored %u bytes in database\n", (unsigned int) size);
468 #endif
469   return GNUNET_OK;
470 }
471
472
473 /**
474  * Function invoked to process the result and call
475  * the processor.
476  *
477  * @param plugin global plugin data
478  * @param proc function to call the value (once only).
479  * @param proc_cls closure for proc
480  * @param res result from exec
481  * @param line line number for error messages
482  */
483 static void
484 process_result (struct Plugin *plugin, PluginDatumProcessor proc,
485                 void *proc_cls, PGresult * res, int line)
486 {
487   int iret;
488   enum GNUNET_BLOCK_Type type;
489   uint32_t anonymity;
490   uint32_t priority;
491   uint32_t size;
492   unsigned int rowid;
493   struct GNUNET_TIME_Absolute expiration_time;
494   GNUNET_HashCode key;
495
496   if (GNUNET_OK !=
497       check_result (plugin, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
498                     line))
499   {
500 #if DEBUG_POSTGRES
501     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
502                      "Ending iteration (postgres error)\n");
503 #endif
504     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
505     return;
506   }
507
508   if (0 == PQntuples (res))
509   {
510     /* no result */
511 #if DEBUG_POSTGRES
512     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
513                      "Ending iteration (no more results)\n");
514 #endif
515     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
516     PQclear (res);
517     return;
518   }
519   if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
520       (sizeof (uint32_t) != PQfsize (res, 0)) ||
521       (sizeof (uint32_t) != PQfsize (res, 6)))
522   {
523     GNUNET_break (0);
524     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
525     PQclear (res);
526     return;
527   }
528   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
529   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
530       (sizeof (uint32_t) != PQfsize (res, 1)) ||
531       (sizeof (uint32_t) != PQfsize (res, 2)) ||
532       (sizeof (uint64_t) != PQfsize (res, 3)) ||
533       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)))
534   {
535     GNUNET_break (0);
536     PQclear (res);
537     delete_by_rowid (plugin, rowid);
538     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
539     return;
540   }
541
542   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
543   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
544   anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
545   expiration_time.abs_value =
546       GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
547   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
548   size = PQgetlength (res, 0, 5);
549 #if DEBUG_POSTGRES
550   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
551                    "Found result of size %u bytes and type %u in database\n",
552                    (unsigned int) size, (unsigned int) type);
553 #endif
554   iret =
555       proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
556             (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
557             rowid);
558   PQclear (res);
559   if (iret == GNUNET_NO)
560   {
561 #if DEBUG_POSTGRES
562     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563                 "Processor asked for item %u to be removed.\n", rowid);
564 #endif
565     if (GNUNET_OK == delete_by_rowid (plugin, rowid))
566     {
567 #if DEBUG_POSTGRES
568       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
569                        "Deleting %u bytes from database\n",
570                        (unsigned int) size);
571 #endif
572       plugin->env->duc (plugin->env->cls,
573                         -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
574 #if DEBUG_POSTGRES
575       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
576                        "Deleted %u bytes from database\n", (unsigned int) size);
577 #endif
578     }
579   }
580 }
581
582
583 /**
584  * Iterate over the results for a particular key
585  * in the datastore.
586  *
587  * @param cls closure
588  * @param offset offset of the result (modulo num-results);
589  *        specific ordering does not matter for the offset
590  * @param key maybe NULL (to match all entries)
591  * @param vhash hash of the value, maybe NULL (to
592  *        match all values that have the right key).
593  *        Note that for DBlocks there is no difference
594  *        betwen key and vhash, but for other blocks
595  *        there may be!
596  * @param type entries of which type are relevant?
597  *     Use 0 for any type.
598  * @param proc function to call on the matching value;
599  *        will be called once with a NULL if no value matches
600  * @param proc_cls closure for iter
601  */
602 static void
603 postgres_plugin_get_key (void *cls, uint64_t offset,
604                          const GNUNET_HashCode * key,
605                          const GNUNET_HashCode * vhash,
606                          enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
607                          void *proc_cls)
608 {
609   struct Plugin *plugin = cls;
610   const int paramFormats[] = { 1, 1, 1, 1, 1 };
611   int paramLengths[4];
612   const char *paramValues[4];
613   int nparams;
614   const char *pname;
615   PGresult *ret;
616   uint64_t total;
617   uint64_t blimit_off;
618   uint32_t btype;
619
620   GNUNET_assert (key != NULL);
621   paramValues[0] = (const char *) key;
622   paramLengths[0] = sizeof (GNUNET_HashCode);
623   btype = htonl (type);
624   if (type != 0)
625   {
626     if (vhash != NULL)
627     {
628       paramValues[1] = (const char *) vhash;
629       paramLengths[1] = sizeof (GNUNET_HashCode);
630       paramValues[2] = (const char *) &btype;
631       paramLengths[2] = sizeof (btype);
632       paramValues[3] = (const char *) &blimit_off;
633       paramLengths[3] = sizeof (blimit_off);
634       nparams = 4;
635       pname = "getvt";
636       ret =
637           PQexecParams (plugin->dbh,
638                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
639                         3, NULL, paramValues, paramLengths, paramFormats, 1);
640     }
641     else
642     {
643       paramValues[1] = (const char *) &btype;
644       paramLengths[1] = sizeof (btype);
645       paramValues[2] = (const char *) &blimit_off;
646       paramLengths[2] = sizeof (blimit_off);
647       nparams = 3;
648       pname = "gett";
649       ret =
650           PQexecParams (plugin->dbh,
651                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
652                         2, NULL, paramValues, paramLengths, paramFormats, 1);
653     }
654   }
655   else
656   {
657     if (vhash != NULL)
658     {
659       paramValues[1] = (const char *) vhash;
660       paramLengths[1] = sizeof (GNUNET_HashCode);
661       paramValues[2] = (const char *) &blimit_off;
662       paramLengths[2] = sizeof (blimit_off);
663       nparams = 3;
664       pname = "getv";
665       ret =
666           PQexecParams (plugin->dbh,
667                         "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
668                         2, NULL, paramValues, paramLengths, paramFormats, 1);
669     }
670     else
671     {
672       paramValues[1] = (const char *) &blimit_off;
673       paramLengths[1] = sizeof (blimit_off);
674       nparams = 2;
675       pname = "get";
676       ret =
677           PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
678                         1, NULL, paramValues, paramLengths, paramFormats, 1);
679     }
680   }
681   if (GNUNET_OK !=
682       check_result (plugin, ret, PGRES_TUPLES_OK, "PQexecParams", pname,
683                     __LINE__))
684   {
685     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
686     return;
687   }
688   if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
689       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
690   {
691     GNUNET_break (0);
692     PQclear (ret);
693     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
694     return;
695   }
696   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
697   PQclear (ret);
698   if (total == 0)
699   {
700     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
701     return;
702   }
703   blimit_off = GNUNET_htonll (offset % total);
704   ret =
705       PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
706                       paramFormats, 1);
707   process_result (plugin, proc, proc_cls, ret, __LINE__);
708 }
709
710
711 /**
712  * Select a subset of the items in the datastore and call
713  * the given iterator for each of them.
714  *
715  * @param cls our "struct Plugin*"
716  * @param offset offset of the result (modulo num-results);
717  *        specific ordering does not matter for the offset
718  * @param type entries of which type should be considered?
719  *        Use 0 for any type.
720  * @param proc function to call on the matching value;
721  *        will be called with a NULL if no value matches
722  * @param proc_cls closure for proc
723  */
724 static void
725 postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
726                                     enum GNUNET_BLOCK_Type type,
727                                     PluginDatumProcessor proc, void *proc_cls)
728 {
729   struct Plugin *plugin = cls;
730   uint32_t btype;
731   uint64_t boff;
732   const int paramFormats[] = { 1, 1 };
733   int paramLengths[] = { sizeof (btype), sizeof (boff) };
734   const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
735   PGresult *ret;
736
737   btype = htonl ((uint32_t) type);
738   boff = GNUNET_htonll (offset);
739   ret =
740       PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
741                       paramLengths, paramFormats, 1);
742   process_result (plugin, proc, proc_cls, ret, __LINE__);
743 }
744
745
746 /**
747  * Context for 'repl_iter' function.
748  */
749 struct ReplCtx
750 {
751
752   /**
753    * Plugin handle.
754    */
755   struct Plugin *plugin;
756
757   /**
758    * Function to call for the result (or the NULL).
759    */
760   PluginDatumProcessor proc;
761
762   /**
763    * Closure for proc.
764    */
765   void *proc_cls;
766 };
767
768
769 /**
770  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
771  * Decrements the replication counter and calls the original
772  * iterator.
773  *
774  * @param cls closure
775  * @param key key for the content
776  * @param size number of bytes in data
777  * @param data content stored
778  * @param type type of the content
779  * @param priority priority of the content
780  * @param anonymity anonymity-level for the content
781  * @param expiration expiration time for the content
782  * @param uid unique identifier for the datum;
783  *        maybe 0 if no unique identifier is available
784  *
785  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
786  *         (continue on call to "next", of course),
787  *         GNUNET_NO to delete the item and continue (if supported)
788  */
789 static int
790 repl_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
791            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
792            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
793            uint64_t uid)
794 {
795   struct ReplCtx *rc = cls;
796   struct Plugin *plugin = rc->plugin;
797   int ret;
798   PGresult *qret;
799   uint32_t boid;
800
801   ret =
802       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
803                 expiration, uid);
804   if (NULL != key)
805   {
806     boid = htonl ((uint32_t) uid);
807     const char *paramValues[] = {
808       (const char *) &boid,
809     };
810     int paramLengths[] = {
811       sizeof (boid),
812     };
813     const int paramFormats[] = { 1 };
814     qret =
815         PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
816                         paramFormats, 1);
817     if (GNUNET_OK !=
818         check_result (plugin, qret, PGRES_COMMAND_OK, "PQexecPrepared",
819                       "decrepl", __LINE__))
820       return GNUNET_SYSERR;
821     PQclear (qret);
822   }
823   return ret;
824 }
825
826
827 /**
828  * Get a random item for replication.  Returns a single, not expired, random item
829  * from those with the highest replication counters.  The item's
830  * replication counter is decremented by one IF it was positive before.
831  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
832  *
833  * @param cls closure
834  * @param proc function to call the value (once only).
835  * @param proc_cls closure for proc
836  */
837 static void
838 postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
839                                  void *proc_cls)
840 {
841   struct Plugin *plugin = cls;
842   struct ReplCtx rc;
843   PGresult *ret;
844
845   rc.plugin = plugin;
846   rc.proc = proc;
847   rc.proc_cls = proc_cls;
848   ret =
849       PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
850                       NULL, 1);
851   process_result (plugin, &repl_proc, &rc, ret, __LINE__);
852 }
853
854
855 /**
856  * Get a random item for expiration.
857  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
858  *
859  * @param cls closure
860  * @param proc function to call the value (once only).
861  * @param proc_cls closure for proc
862  */
863 static void
864 postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
865                                 void *proc_cls)
866 {
867   struct Plugin *plugin = cls;
868   uint64_t btime;
869   const int paramFormats[] = { 1 };
870   int paramLengths[] = { sizeof (btime) };
871   const char *paramValues[] = { (const char *) &btime };
872   PGresult *ret;
873
874   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
875   ret =
876       PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
877                       paramLengths, paramFormats, 1);
878   process_result (plugin, proc, proc_cls, ret, __LINE__);
879 }
880
881
882 /**
883  * Update the priority for a particular key in the datastore.  If
884  * the expiration time in value is different than the time found in
885  * the datastore, the higher value should be kept.  For the
886  * anonymity level, the lower value is to be used.  The specified
887  * priority should be added to the existing priority, ignoring the
888  * priority in value.
889  *
890  * Note that it is possible for multiple values to match this put.
891  * In that case, all of the respective values are updated.
892  *
893  * @param cls our "struct Plugin*"
894  * @param uid unique identifier of the datum
895  * @param delta by how much should the priority
896  *     change?  If priority + delta < 0 the
897  *     priority should be set to 0 (never go
898  *     negative).
899  * @param expire new expiration time should be the
900  *     MAX of any existing expiration time and
901  *     this value
902  * @param msg set to error message
903  * @return GNUNET_OK on success
904  */
905 static int
906 postgres_plugin_update (void *cls, uint64_t uid, int delta,
907                         struct GNUNET_TIME_Absolute expire, char **msg)
908 {
909   struct Plugin *plugin = cls;
910   PGresult *ret;
911   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
912   uint32_t boid = htonl ((uint32_t) uid);
913   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
914
915   const char *paramValues[] = {
916     (const char *) &bdelta,
917     (const char *) &bexpire,
918     (const char *) &boid,
919   };
920   int paramLengths[] = {
921     sizeof (bdelta),
922     sizeof (bexpire),
923     sizeof (boid),
924   };
925   const int paramFormats[] = { 1, 1, 1 };
926
927   ret =
928       PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
929                       paramFormats, 1);
930   if (GNUNET_OK !=
931       check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update",
932                     __LINE__))
933     return GNUNET_SYSERR;
934   PQclear (ret);
935   return GNUNET_OK;
936 }
937
938
939
940 /**
941  * Get all of the keys in the datastore.
942  *
943  * @param cls closure
944  * @param proc function to call on each key
945  * @param proc_cls closure for proc
946  */
947 static void
948 postgres_plugin_get_keys (void *cls,
949                           PluginKeyProcessor proc,
950                           void *proc_cls)
951 {
952   struct Plugin *plugin = cls;
953   int ret;
954   int i;
955   GNUNET_HashCode key;
956   PGresult * res;
957
958   res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
959   ret = PQntuples (res);
960   for (i=0;i<ret;i++)
961   {
962     if (sizeof (GNUNET_HashCode) != PQgetlength (res, i, 0))
963     {
964       memcpy (&key, PQgetvalue (res, i, 0), sizeof (GNUNET_HashCode));
965       proc (proc_cls, &key, 1);    
966     }
967   }
968   PQclear (res);
969 }
970
971
972
973 /**
974  * Drop database.
975  */
976 static void
977 postgres_plugin_drop (void *cls)
978 {
979   struct Plugin *plugin = cls;
980
981   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
982 }
983
984
985 /**
986  * Entry point for the plugin.
987  *
988  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
989  * @return our "struct Plugin*"
990  */
991 void *
992 libgnunet_plugin_datastore_postgres_init (void *cls)
993 {
994   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
995   struct GNUNET_DATASTORE_PluginFunctions *api;
996   struct Plugin *plugin;
997
998   plugin = GNUNET_malloc (sizeof (struct Plugin));
999   plugin->env = env;
1000   if (GNUNET_OK != init_connection (plugin))
1001   {
1002     GNUNET_free (plugin);
1003     return NULL;
1004   }
1005   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1006   api->cls = plugin;
1007   api->estimate_size = &postgres_plugin_estimate_size;
1008   api->put = &postgres_plugin_put;
1009   api->update = &postgres_plugin_update;
1010   api->get_key = &postgres_plugin_get_key;
1011   api->get_replication = &postgres_plugin_get_replication;
1012   api->get_expiration = &postgres_plugin_get_expiration;
1013   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1014   api->get_keys = &postgres_plugin_get_keys;
1015   api->drop = &postgres_plugin_drop;
1016   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
1017                    _("Postgres database running\n"));
1018   return api;
1019 }
1020
1021
1022 /**
1023  * Exit point from the plugin.
1024  * @param cls our "struct Plugin*"
1025  * @return always NULL
1026  */
1027 void *
1028 libgnunet_plugin_datastore_postgres_done (void *cls)
1029 {
1030   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1031   struct Plugin *plugin = api->cls;
1032
1033   PQfinish (plugin->dbh);
1034   GNUNET_free (plugin);
1035   GNUNET_free (api);
1036   return NULL;
1037 }
1038
1039 /* end of plugin_datastore_postgres.c */