adding configure code for --enable-benchmarks, --enable-expensive-tests, some clean up
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin 
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * Check if the result obtained from Postgres has
66  * the desired status code.  If not, log an error, clear the
67  * result and return GNUNET_SYSERR.
68  * 
69  * @param plugin global context
70  * @param ret result to check
71  * @param expected_status expected return value
72  * @param command name of SQL command that was run
73  * @param args arguments to SQL command
74  * @param line line number for error reporting
75  * @return GNUNET_OK if the result is acceptable
76  */
77 static int
78 check_result (struct Plugin *plugin,
79               PGresult * ret,
80               int expected_status,
81               const char *command, const char *args, int line)
82 {
83   if (ret == NULL)
84     {
85       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
86                        "datastore-postgres",
87                        "Postgres failed to allocate result for `%s:%s' at %d\n",
88                        command, args, line);
89       return GNUNET_SYSERR;
90     }
91   if (PQresultStatus (ret) != expected_status)
92     {
93       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
94                        "datastore-postgres",
95                        _("`%s:%s' failed at %s:%d with error: %s"),
96                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
97       PQclear (ret);
98       return GNUNET_SYSERR;
99     }
100   return GNUNET_OK;
101 }
102
103 /**
104  * Run simple SQL statement (without results).
105  *
106  * @param plugin global context
107  * @param sql statement to run
108  * @param line code line for error reporting
109  */
110 static int
111 pq_exec (struct Plugin *plugin,
112          const char *sql, int line)
113 {
114   PGresult *ret;
115   ret = PQexec (plugin->dbh, sql);
116   if (GNUNET_OK != check_result (plugin,
117                                  ret, 
118                                  PGRES_COMMAND_OK, "PQexec", sql, line))
119     return GNUNET_SYSERR;
120   PQclear (ret);
121   return GNUNET_OK;
122 }
123
124 /**
125  * Prepare SQL statement.
126  *
127  * @param plugin global context
128  * @param name name for the prepared SQL statement
129  * @param sql SQL code to prepare
130  * @param nparams number of parameters in sql
131  * @param line code line for error reporting
132  * @return GNUNET_OK on success
133  */
134 static int
135 pq_prepare (struct Plugin *plugin,
136             const char *name, const char *sql, int nparams, int line)
137 {
138   PGresult *ret;
139   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
140   if (GNUNET_OK !=
141       check_result (plugin, 
142                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
143     return GNUNET_SYSERR;
144   PQclear (ret);
145   return GNUNET_OK;
146 }
147
148 /**
149  * @brief Get a database handle
150  *
151  * @param plugin global context
152  * @return GNUNET_OK on success, GNUNET_SYSERR on error
153  */
154 static int
155 init_connection (struct Plugin *plugin)
156 {
157   char *conninfo;
158   PGresult *ret;
159
160   /* Open database and precompile statements */
161   conninfo = NULL;
162   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
163                                          "datastore-postgres",
164                                          "CONFIG",
165                                          &conninfo);
166   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
167   if (NULL == plugin->dbh)
168     {
169       /* FIXME: warn about out-of-memory? */
170       GNUNET_free_non_null (conninfo);
171       return GNUNET_SYSERR;
172     }
173   if (PQstatus (plugin->dbh) != CONNECTION_OK)
174     {
175       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
176                        "datastore-postgres",
177                        _("Unable to initialize Postgres with configuration `%s': %s"),
178                        conninfo,
179                        PQerrorMessage (plugin->dbh));
180       PQfinish (plugin->dbh);
181       plugin->dbh = NULL;
182       GNUNET_free_non_null (conninfo);
183       return GNUNET_SYSERR;
184     }
185   GNUNET_free_non_null (conninfo);
186   ret = PQexec (plugin->dbh,
187                 "CREATE TABLE gn090 ("
188                 "  repl INTEGER NOT NULL DEFAULT 0,"
189                 "  type INTEGER NOT NULL DEFAULT 0,"
190                 "  prio INTEGER NOT NULL DEFAULT 0,"
191                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
192                 "  expire BIGINT NOT NULL DEFAULT 0,"
193                 "  hash BYTEA NOT NULL DEFAULT '',"
194                 "  vhash BYTEA NOT NULL DEFAULT '',"
195                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
196   if ( (ret == NULL) || 
197        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
198          (0 != strcmp ("42P07",    /* duplicate table */
199                        PQresultErrorField
200                        (ret,
201                         PG_DIAG_SQLSTATE)))))
202     {
203       (void) check_result (plugin,
204                            ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
205       PQfinish (plugin->dbh);
206       plugin->dbh = NULL;
207       return GNUNET_SYSERR;
208     }
209   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
210     {
211       if ((GNUNET_OK !=
212            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
213           (GNUNET_OK !=
214            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
215                     __LINE__))
216           || (GNUNET_OK !=
217               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
218           || (GNUNET_OK !=
219               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
220           || (GNUNET_OK !=
221               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
222                        __LINE__))
223           || (GNUNET_OK !=
224               pq_exec
225               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
226                __LINE__))
227           || (GNUNET_OK !=
228               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
229                        __LINE__)))
230         {
231           PQclear (ret);
232           PQfinish (plugin->dbh);
233           plugin->dbh = NULL;
234           return GNUNET_SYSERR;
235         }
236     }
237   PQclear (ret);
238   ret = PQexec (plugin->dbh,
239                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
240   if (GNUNET_OK != 
241       check_result (plugin,
242                     ret, PGRES_COMMAND_OK,
243                     "ALTER TABLE", "gn090", __LINE__))
244     {
245       PQfinish (plugin->dbh);
246       plugin->dbh = NULL;
247       return GNUNET_SYSERR;
248     }
249   PQclear (ret);
250   ret = PQexec (plugin->dbh,
251                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
252   if (GNUNET_OK !=
253       check_result (plugin,
254                     ret, PGRES_COMMAND_OK,
255                     "ALTER TABLE", "gn090", __LINE__))
256     {
257       PQfinish (plugin->dbh);
258       plugin->dbh = NULL;
259       return GNUNET_SYSERR;
260     }
261   PQclear (ret);
262   ret = PQexec (plugin->dbh,
263                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
264   if (GNUNET_OK !=
265       check_result (plugin,
266                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
267     {
268       PQfinish (plugin->dbh);
269       plugin->dbh = NULL;
270       return GNUNET_SYSERR;
271     }
272   PQclear (ret);
273   if ((GNUNET_OK !=
274        pq_prepare (plugin,
275                    "getvt",
276                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
277                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
278                    "ORDER BY oid ASC LIMIT 1 OFFSET $4",
279                    4,
280                    __LINE__)) ||
281       (GNUNET_OK !=
282        pq_prepare (plugin,
283                    "gett",
284                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
285                    "WHERE hash=$1 AND type=$2 "
286                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
287                    3,
288                    __LINE__)) ||
289       (GNUNET_OK !=
290        pq_prepare (plugin,
291                    "getv",
292                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
293                    "WHERE hash=$1 AND vhash=$2 "
294                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
295                    3,
296                    __LINE__)) ||
297       (GNUNET_OK !=
298        pq_prepare (plugin,
299                    "get",
300                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
301                    "WHERE hash=$1 "
302                    "ORDER BY oid ASC LIMIT 1 OFFSET $2",
303                    2,
304                    __LINE__)) ||
305       (GNUNET_OK !=
306        pq_prepare (plugin,
307                    "put",
308                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) "
309                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
310                    8,
311                    __LINE__)) ||
312       (GNUNET_OK !=
313        pq_prepare (plugin,
314                    "update",
315                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
316                    "WHERE oid = $3",
317                    3,
318                    __LINE__)) ||
319       (GNUNET_OK !=
320        pq_prepare (plugin,
321                    "decrepl",
322                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
323                    "WHERE oid = $1",
324                    1,
325                    __LINE__)) ||
326       (GNUNET_OK !=
327        pq_prepare (plugin,
328                    "select_non_anonymous",
329                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
330                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
331                    1,
332                    __LINE__)) ||
333       (GNUNET_OK !=
334        pq_prepare (plugin,
335                    "select_expiration_order",
336                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
337                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "                      
338                    "UNION "                                             
339                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
340                    "ORDER BY prio ASC LIMIT 1) "                        
341                    "ORDER BY expire ASC LIMIT 1",
342                    1,
343                    __LINE__)) ||
344       (GNUNET_OK !=
345        pq_prepare (plugin,
346                    "select_replication_order",
347                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
348                    "ORDER BY repl DESC,RANDOM() LIMIT 1",
349                    0,
350                    __LINE__)) ||
351       (GNUNET_OK !=
352        pq_prepare (plugin,
353                    "delrow",
354                    "DELETE FROM gn090 " "WHERE oid=$1", 
355                    1,
356                    __LINE__)))
357     {
358       PQfinish (plugin->dbh);
359       plugin->dbh = NULL;
360       return GNUNET_SYSERR;
361     }
362   return GNUNET_OK;
363 }
364
365
366 /**
367  * Delete the row identified by the given rowid (qid
368  * in postgres).
369  *
370  * @param plugin global context
371  * @param rowid which row to delete
372  * @return GNUNET_OK on success
373  */
374 static int
375 delete_by_rowid (struct Plugin *plugin,
376                  unsigned int rowid)
377 {
378   uint32_t browid;
379   const char *paramValues[] = { (const char *) &browid };
380   int paramLengths[] = { sizeof (browid) };
381   const int paramFormats[] = { 1 };
382   PGresult *ret;
383
384   browid = htonl (rowid);
385   ret = PQexecPrepared (plugin->dbh,
386                         "delrow",
387                         1, paramValues, paramLengths, paramFormats, 1);
388   if (GNUNET_OK !=
389       check_result (plugin,
390                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
391                     __LINE__))
392     {
393       return GNUNET_SYSERR;
394     }
395   PQclear (ret);
396   return GNUNET_OK;
397 }
398
399
400 /**
401  * Get an estimate of how much space the database is
402  * currently using.
403  *
404  * @param cls our "struct Plugin*"
405  * @return number of bytes used on disk
406  */
407 static unsigned long long
408 postgres_plugin_estimate_size (void *cls)
409 {
410   struct Plugin *plugin = cls;
411   unsigned long long total;
412   PGresult *ret;
413
414   ret = PQexecParams (plugin->dbh,
415                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
416                       0, NULL, NULL, NULL, NULL, 1);
417   if (GNUNET_OK != check_result (plugin,
418                                  ret,
419                                  PGRES_TUPLES_OK,
420                                  "PQexecParams",
421                                  "get_size",
422                                  __LINE__))
423     {
424       return 0;
425     }
426   if ((PQntuples (ret) != 1) ||
427       (PQnfields (ret) != 1) ||
428       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
429     {
430       GNUNET_break (0);
431       PQclear (ret);
432       return 0;
433     }
434   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
435   PQclear (ret);
436   return total;
437 }
438
439
440 /**
441  * Store an item in the datastore.
442  *
443  * @param cls closure
444  * @param key key for the item
445  * @param size number of bytes in data
446  * @param data content stored
447  * @param type type of the content
448  * @param priority priority of the content
449  * @param anonymity anonymity-level for the content
450  * @param replication replication-level for the content
451  * @param expiration expiration time for the content
452  * @param msg set to error message
453  * @return GNUNET_OK on success
454  */
455 static int
456 postgres_plugin_put (void *cls,
457                      const GNUNET_HashCode * key,
458                      uint32_t size,
459                      const void *data,
460                      enum GNUNET_BLOCK_Type type,
461                      uint32_t priority,
462                      uint32_t anonymity,
463                      uint32_t replication,
464                      struct GNUNET_TIME_Absolute expiration,
465                      char **msg)
466 {
467   struct Plugin *plugin = cls;
468   GNUNET_HashCode vhash;
469   PGresult *ret;
470   uint32_t btype = htonl (type);
471   uint32_t bprio = htonl (priority);
472   uint32_t banon = htonl (anonymity);
473   uint32_t brepl = htonl (replication);
474   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
475   const char *paramValues[] = {
476     (const char *) &brepl,
477     (const char *) &btype,
478     (const char *) &bprio,
479     (const char *) &banon,
480     (const char *) &bexpi,
481     (const char *) key,
482     (const char *) &vhash,
483     (const char *) data
484   };
485   int paramLengths[] = {
486     sizeof (brepl),
487     sizeof (btype),
488     sizeof (bprio),
489     sizeof (banon),
490     sizeof (bexpi),
491     sizeof (GNUNET_HashCode),
492     sizeof (GNUNET_HashCode),
493     size
494   };
495   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
496
497   GNUNET_CRYPTO_hash (data, size, &vhash);
498   ret = PQexecPrepared (plugin->dbh,
499                         "put", 8, paramValues, paramLengths, paramFormats, 1);
500   if (GNUNET_OK != check_result (plugin, ret,
501                                  PGRES_COMMAND_OK,
502                                  "PQexecPrepared", "put", __LINE__))
503     return GNUNET_SYSERR;
504   PQclear (ret);
505   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
506 #if DEBUG_POSTGRES
507   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
508                    "datastore-postgres",
509                    "Stored %u bytes in database\n",
510                    (unsigned int) size);
511 #endif
512   return GNUNET_OK;
513 }
514
515
516 /**
517  * Function invoked to process the result and call
518  * the processor.
519  *
520  * @param plugin global plugin data
521  * @param proc function to call the value (once only).
522  * @param proc_cls closure for proc
523  * @param res result from exec
524  */
525 static void 
526 process_result (struct Plugin *plugin,
527                 PluginDatumProcessor proc, void *proc_cls,
528                 PGresult *res,
529                 int line)
530 {
531   int iret;
532   enum GNUNET_BLOCK_Type type;
533   uint32_t anonymity;
534   uint32_t priority;
535   uint32_t size;
536   unsigned int rowid;
537   struct GNUNET_TIME_Absolute expiration_time;
538   GNUNET_HashCode key;
539
540   if (GNUNET_OK != check_result (plugin,
541                                  res,
542                                  PGRES_TUPLES_OK,
543                                  "PQexecPrepared",
544                                  "select",
545                                  line))
546     {
547 #if DEBUG_POSTGRES
548       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
549                        "datastore-postgres",
550                        "Ending iteration (postgres error)\n");
551 #endif
552       proc (proc_cls, 
553             NULL, 0, NULL, 0, 0, 0, 
554             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
555       return;
556     }
557
558   if (0 == PQntuples (res))
559     {
560       /* no result */
561 #if DEBUG_POSTGRES
562       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
563                        "datastore-postgres",
564                        "Ending iteration (no more results)\n");
565 #endif
566       proc (proc_cls, 
567             NULL, 0, NULL, 0, 0, 0, 
568             GNUNET_TIME_UNIT_ZERO_ABS, 0);
569       PQclear (res);
570       return; 
571     }
572   if ((1 != PQntuples (res)) ||
573       (7 != PQnfields (res)) ||
574       (sizeof (uint32_t) != PQfsize (res, 0)) ||
575       (sizeof (uint32_t) != PQfsize (res, 6)))
576     {
577       GNUNET_break (0);
578       proc (proc_cls, 
579             NULL, 0, NULL, 0, 0, 0, 
580             GNUNET_TIME_UNIT_ZERO_ABS, 0);
581       PQclear (res);
582       return;
583     }
584   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
585   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
586       (sizeof (uint32_t) != PQfsize (res, 1)) ||
587       (sizeof (uint32_t) != PQfsize (res, 2)) ||
588       (sizeof (uint64_t) != PQfsize (res, 3)) ||
589       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
590     {
591       GNUNET_break (0);
592       PQclear (res);
593       delete_by_rowid (plugin, rowid);
594       proc (proc_cls, 
595             NULL, 0, NULL, 0, 0, 0, 
596             GNUNET_TIME_UNIT_ZERO_ABS, 0);
597       return;
598     }
599
600   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
601   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
602   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
603   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
604   memcpy (&key, 
605           PQgetvalue (res, 0, 4), 
606           sizeof (GNUNET_HashCode));
607   size = PQgetlength (res, 0, 5);
608 #if DEBUG_POSTGRES
609   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
610                    "datastore-postgres",
611                    "Found result of size %u bytes and type %u in database\n",
612                    (unsigned int) size,
613                    (unsigned int) type);
614 #endif
615   iret = proc (proc_cls,
616                &key,
617                size,
618                PQgetvalue (res, 0, 5),
619                (enum GNUNET_BLOCK_Type) type,
620                priority,
621                anonymity,
622                expiration_time,
623                rowid);
624   PQclear (res);
625   if (iret == GNUNET_NO)
626     {
627 #if DEBUG_POSTGRES
628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629                   "Processor asked for item %u to be removed.\n",
630                   rowid);
631 #endif
632       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
633         {
634 #if DEBUG_POSTGRES
635           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
636                            "datastore-postgres",
637                            "Deleting %u bytes from database\n",
638                            (unsigned int) size);
639 #endif
640           plugin->env->duc (plugin->env->cls,
641                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
642 #if DEBUG_POSTGRES
643           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
644                            "datastore-postgres",
645                            "Deleted %u bytes from database\n",
646                            (unsigned int) size);
647 #endif
648         }
649     }
650 }
651
652
653 /**
654  * Iterate over the results for a particular key
655  * in the datastore.
656  *
657  * @param cls closure
658  * @param key maybe NULL (to match all entries)
659  * @param vhash hash of the value, maybe NULL (to
660  *        match all values that have the right key).
661  *        Note that for DBlocks there is no difference
662  *        betwen key and vhash, but for other blocks
663  *        there may be!
664  * @param type entries of which type are relevant?
665  *     Use 0 for any type.
666  * @param iter function to call on each matching value;
667  *        will be called once with a NULL value at the end
668  * @param iter_cls closure for iter
669  */
670 static void
671 postgres_plugin_get_key (void *cls,
672                          uint64_t offset,
673                          const GNUNET_HashCode *key,
674                          const GNUNET_HashCode *vhash,
675                          enum GNUNET_BLOCK_Type type,
676                          PluginDatumProcessor proc, void *proc_cls)
677 {
678   struct Plugin *plugin = cls;
679   const int paramFormats[] = { 1, 1, 1, 1, 1 };
680   int paramLengths[4];
681   const char *paramValues[4];
682   int nparams;
683   const char *pname;
684   PGresult *ret;
685   uint64_t total;
686   uint64_t blimit_off;
687   uint32_t btype;
688
689   GNUNET_assert (key != NULL);
690   paramValues[0] = (const char*) key;
691   paramLengths[0] = sizeof (GNUNET_HashCode);
692   btype = htonl (type);
693   if (type != 0)
694     {
695       if (vhash != NULL)
696         {
697           paramValues[1] = (const char *) vhash;
698           paramLengths[1] = sizeof (GNUNET_HashCode);
699           paramValues[2] = (const char *) &btype;
700           paramLengths[2] = sizeof (btype);
701           paramValues[3] = (const char *) &blimit_off;
702           paramLengths[3] = sizeof (blimit_off);
703           nparams = 4;
704           pname = "getvt";
705           ret = PQexecParams (plugin->dbh,
706                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
707                               3,
708                               NULL,
709                               paramValues, 
710                               paramLengths,
711                               paramFormats, 1);
712         }
713       else
714         {
715           paramValues[1] = (const char *) &btype;
716           paramLengths[1] = sizeof (btype);
717           paramValues[2] = (const char *) &blimit_off;
718           paramLengths[2] = sizeof (blimit_off);
719           nparams = 3;
720           pname = "gett";
721           ret = PQexecParams (plugin->dbh,
722                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
723                               2,
724                               NULL,
725                               paramValues, 
726                               paramLengths, 
727                               paramFormats, 1);
728         }
729     }
730   else
731     {
732       if (vhash != NULL)
733         {
734           paramValues[1] = (const char *) vhash;
735           paramLengths[1] = sizeof (GNUNET_HashCode);
736           paramValues[2] = (const char *) &blimit_off;
737           paramLengths[2] = sizeof (blimit_off);
738           nparams = 3;
739           pname = "getv";
740           ret = PQexecParams (plugin->dbh,
741                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
742                               2,
743                               NULL,
744                               paramValues, 
745                               paramLengths,
746                               paramFormats, 1);
747         }
748       else
749         {
750           paramValues[1] = (const char *) &blimit_off;
751           paramLengths[1] = sizeof (blimit_off);
752           nparams = 2;
753           pname = "get";
754           ret = PQexecParams (plugin->dbh,
755                               "SELECT count(*) FROM gn090 WHERE hash=$1",
756                               1,
757                               NULL,
758                               paramValues, 
759                               paramLengths,
760                               paramFormats, 1);
761         }
762     }
763   if (GNUNET_OK != check_result (plugin,
764                                  ret,
765                                  PGRES_TUPLES_OK,
766                                  "PQexecParams",
767                                  pname,
768                                  __LINE__))
769     {
770       proc (proc_cls, 
771             NULL, 0, NULL, 0, 0, 0, 
772             GNUNET_TIME_UNIT_ZERO_ABS, 0);
773       return;
774     }
775   if ((PQntuples (ret) != 1) ||
776       (PQnfields (ret) != 1) ||
777       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
778     {
779       GNUNET_break (0);
780       PQclear (ret);
781       proc (proc_cls, 
782             NULL, 0, NULL, 0, 0, 0, 
783             GNUNET_TIME_UNIT_ZERO_ABS, 0);
784       return;
785     }
786   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
787   PQclear (ret);
788   if (total == 0)
789     {
790       proc (proc_cls, 
791             NULL, 0, NULL, 0, 0, 0, 
792             GNUNET_TIME_UNIT_ZERO_ABS, 0);
793       return;
794     }
795   blimit_off = GNUNET_htonll (offset % total);
796   ret = PQexecPrepared (plugin->dbh,
797                         pname,
798                         nparams,
799                         paramValues, 
800                         paramLengths,
801                         paramFormats, 1);
802   process_result (plugin,
803                   proc, proc_cls,
804                   ret, __LINE__);
805 }
806
807
808 /**
809  * Select a subset of the items in the datastore and call
810  * the given iterator for each of them.
811  *
812  * @param cls our "struct Plugin*"
813  * @param type entries of which type should be considered?
814  *        Use 0 for any type.
815  * @param iter function to call on each matching value;
816  *        will be called once with a NULL value at the end
817  * @param iter_cls closure for iter
818  */
819 static void
820 postgres_plugin_get_zero_anonymity (void *cls,
821                                     uint64_t offset,
822                                     enum GNUNET_BLOCK_Type type,
823                                     PluginDatumProcessor proc, void *proc_cls)
824 {
825   struct Plugin *plugin = cls;
826   uint32_t btype;
827   uint64_t boff;
828   const int paramFormats[] = { 1, 1 };
829   int paramLengths[] = { sizeof (btype), sizeof (boff) };
830   const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
831   PGresult *ret;
832
833   btype = htonl ((uint32_t) type);
834   boff = GNUNET_htonll (offset);
835   ret = PQexecPrepared (plugin->dbh,
836                         "select_non_anonymous",
837                         2,
838                         paramValues, 
839                         paramLengths,
840                         paramFormats, 1);
841   process_result (plugin,
842                   proc, proc_cls,
843                   ret, __LINE__);
844 }
845
846
847 /**
848  * Context for 'repl_iter' function.
849  */
850 struct ReplCtx
851 {
852   
853   /**
854    * Plugin handle.
855    */
856   struct Plugin *plugin;
857   
858   /**
859    * Function to call for the result (or the NULL).
860    */
861   PluginDatumProcessor proc;
862   
863   /**
864    * Closure for proc.
865    */
866   void *proc_cls;
867 };
868
869
870 /**
871  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
872  * Decrements the replication counter and calls the original
873  * iterator.
874  *
875  * @param cls closure
876  * @param next_cls closure to pass to the "next" function.
877  * @param key key for the content
878  * @param size number of bytes in data
879  * @param data content stored
880  * @param type type of the content
881  * @param priority priority of the content
882  * @param anonymity anonymity-level for the content
883  * @param expiration expiration time for the content
884  * @param uid unique identifier for the datum;
885  *        maybe 0 if no unique identifier is available
886  *
887  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
888  *         (continue on call to "next", of course),
889  *         GNUNET_NO to delete the item and continue (if supported)
890  */
891 static int
892 repl_proc (void *cls,
893            const GNUNET_HashCode *key,
894            uint32_t size,
895            const void *data,
896            enum GNUNET_BLOCK_Type type,
897            uint32_t priority,
898            uint32_t anonymity,
899            struct GNUNET_TIME_Absolute expiration, 
900            uint64_t uid)
901 {
902   struct ReplCtx *rc = cls;
903   struct Plugin *plugin = rc->plugin;
904   int ret;
905   PGresult *qret;
906   uint32_t boid;
907
908   ret = rc->proc (rc->proc_cls,
909                   key,
910                   size, data, 
911                   type, priority, anonymity, expiration,
912                   uid);
913   if (NULL != key)
914     {
915       boid = htonl ( (uint32_t) uid);
916       const char *paramValues[] = {
917         (const char *) &boid,
918       };
919       int paramLengths[] = {
920         sizeof (boid),
921       };
922       const int paramFormats[] = { 1 };
923       qret = PQexecPrepared (plugin->dbh,
924                             "decrepl",
925                             1, paramValues, paramLengths, paramFormats, 1);
926       if (GNUNET_OK != check_result (plugin,
927                                      qret,
928                                      PGRES_COMMAND_OK,
929                                      "PQexecPrepared", 
930                                      "decrepl", __LINE__))
931         return GNUNET_SYSERR;
932       PQclear (qret);
933     }
934   return ret;
935 }
936
937
938 /**
939  * Get a random item for replication.  Returns a single, not expired, random item
940  * from those with the highest replication counters.  The item's 
941  * replication counter is decremented by one IF it was positive before.
942  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
943  *
944  * @param cls closure
945  * @param proc function to call the value (once only).
946  * @param proc_cls closure for iter
947  */
948 static void
949 postgres_plugin_get_replication (void *cls,
950                                  PluginDatumProcessor proc, void *proc_cls)
951 {
952   struct Plugin *plugin = cls;
953   struct ReplCtx rc;
954   PGresult *ret;
955
956   rc.plugin = plugin;
957   rc.proc = proc;
958   rc.proc_cls = proc_cls;
959   ret = PQexecPrepared (plugin->dbh,
960                         "select_replication_order",
961                         0,
962                         NULL, NULL, NULL, 1);
963   process_result (plugin,
964                   &repl_proc, &rc,
965                   ret, __LINE__);
966 }
967
968
969 /**
970  * Get a random item for expiration.
971  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
972  *
973  * @param cls closure
974  * @param proc function to call the value (once only).
975  * @param proc_cls closure for iter
976  */
977 static void
978 postgres_plugin_get_expiration (void *cls,
979                                 PluginDatumProcessor proc, void *proc_cls)
980 {
981   struct Plugin *plugin = cls;
982   uint64_t btime;
983   const int paramFormats[] = { 1 };
984   int paramLengths[] = { sizeof (btime) };
985   const char *paramValues[] = { (const char*) &btime };
986   PGresult *ret;
987   
988   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
989   ret = PQexecPrepared (plugin->dbh,
990                         "select_expiration_order",
991                         1,
992                         paramValues,
993                         paramLengths,
994                         paramFormats, 
995                         1);
996   process_result (plugin,
997                   proc, proc_cls,
998                   ret, __LINE__);
999 }
1000
1001
1002 /**
1003  * Update the priority for a particular key in the datastore.  If
1004  * the expiration time in value is different than the time found in
1005  * the datastore, the higher value should be kept.  For the
1006  * anonymity level, the lower value is to be used.  The specified
1007  * priority should be added to the existing priority, ignoring the
1008  * priority in value.
1009  *
1010  * Note that it is possible for multiple values to match this put.
1011  * In that case, all of the respective values are updated.
1012  *
1013  * @param cls our "struct Plugin*"
1014  * @param uid unique identifier of the datum
1015  * @param delta by how much should the priority
1016  *     change?  If priority + delta < 0 the
1017  *     priority should be set to 0 (never go
1018  *     negative).
1019  * @param expire new expiration time should be the
1020  *     MAX of any existing expiration time and
1021  *     this value
1022  * @param msg set to error message
1023  * @return GNUNET_OK on success
1024  */
1025 static int
1026 postgres_plugin_update (void *cls,
1027                         uint64_t uid,
1028                         int delta, struct GNUNET_TIME_Absolute expire,
1029                         char **msg)
1030 {
1031   struct Plugin *plugin = cls;
1032   PGresult *ret;
1033   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1034   uint32_t boid = htonl ( (uint32_t) uid);
1035   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1036   const char *paramValues[] = {
1037     (const char *) &bdelta,
1038     (const char *) &bexpire,
1039     (const char *) &boid,
1040   };
1041   int paramLengths[] = {
1042     sizeof (bdelta),
1043     sizeof (bexpire),
1044     sizeof (boid),
1045   };
1046   const int paramFormats[] = { 1, 1, 1 };
1047
1048   ret = PQexecPrepared (plugin->dbh,
1049                         "update",
1050                         3, paramValues, paramLengths, paramFormats, 1);
1051   if (GNUNET_OK != check_result (plugin,
1052                                  ret,
1053                                  PGRES_COMMAND_OK,
1054                                  "PQexecPrepared", "update", __LINE__))
1055     return GNUNET_SYSERR;
1056   PQclear (ret);
1057   return GNUNET_OK;
1058 }
1059
1060
1061 /**
1062  * Drop database.
1063  */
1064 static void 
1065 postgres_plugin_drop (void *cls)
1066 {
1067   struct Plugin *plugin = cls;
1068
1069   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1070 }
1071
1072
1073 /**
1074  * Entry point for the plugin.
1075  *
1076  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1077  * @return our "struct Plugin*"
1078  */
1079 void *
1080 libgnunet_plugin_datastore_postgres_init (void *cls)
1081 {
1082   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1083   struct GNUNET_DATASTORE_PluginFunctions *api;
1084   struct Plugin *plugin;
1085
1086   plugin = GNUNET_malloc (sizeof (struct Plugin));
1087   plugin->env = env;
1088   if (GNUNET_OK != init_connection (plugin))
1089     {
1090       GNUNET_free (plugin);
1091       return NULL;
1092     }
1093   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1094   api->cls = plugin;
1095   api->estimate_size = &postgres_plugin_estimate_size;
1096   api->put = &postgres_plugin_put;
1097   api->update = &postgres_plugin_update;
1098   api->get_key = &postgres_plugin_get_key;
1099   api->get_replication = &postgres_plugin_get_replication;
1100   api->get_expiration = &postgres_plugin_get_expiration;
1101   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1102   api->drop = &postgres_plugin_drop;
1103   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1104                    "datastore-postgres",
1105                    _("Postgres database running\n"));
1106   return api;
1107 }
1108
1109
1110 /**
1111  * Exit point from the plugin.
1112  * @param cls our "struct Plugin*"
1113  * @return always NULL
1114  */
1115 void *
1116 libgnunet_plugin_datastore_postgres_done (void *cls)
1117 {
1118   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1119   struct Plugin *plugin = api->cls;
1120   
1121   PQfinish (plugin->dbh);
1122   GNUNET_free (plugin);
1123   GNUNET_free (api);
1124   return NULL;
1125 }
1126
1127 /* end of plugin_datastore_postgres.c */