fix
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin 
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * Check if the result obtained from Postgres has
66  * the desired status code.  If not, log an error, clear the
67  * result and return GNUNET_SYSERR.
68  * 
69  * @param plugin global context
70  * @param ret result to check
71  * @param expected_status expected return value
72  * @param command name of SQL command that was run
73  * @param args arguments to SQL command
74  * @param line line number for error reporting
75  * @return GNUNET_OK if the result is acceptable
76  */
77 static int
78 check_result (struct Plugin *plugin,
79               PGresult * ret,
80               int expected_status,
81               const char *command, const char *args, int line)
82 {
83   if (ret == NULL)
84     {
85       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
86                        "datastore-postgres",
87                        "Postgres failed to allocate result for `%s:%s' at %d\n",
88                        command, args, line);
89       return GNUNET_SYSERR;
90     }
91   if (PQresultStatus (ret) != expected_status)
92     {
93       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
94                        "datastore-postgres",
95                        _("`%s:%s' failed at %s:%d with error: %s"),
96                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
97       PQclear (ret);
98       return GNUNET_SYSERR;
99     }
100   return GNUNET_OK;
101 }
102
103 /**
104  * Run simple SQL statement (without results).
105  *
106  * @param plugin global context
107  * @param sql statement to run
108  * @param line code line for error reporting
109  */
110 static int
111 pq_exec (struct Plugin *plugin,
112          const char *sql, int line)
113 {
114   PGresult *ret;
115   ret = PQexec (plugin->dbh, sql);
116   if (GNUNET_OK != check_result (plugin,
117                                  ret, 
118                                  PGRES_COMMAND_OK, "PQexec", sql, line))
119     return GNUNET_SYSERR;
120   PQclear (ret);
121   return GNUNET_OK;
122 }
123
124 /**
125  * Prepare SQL statement.
126  *
127  * @param plugin global context
128  * @param name name for the prepared SQL statement
129  * @param sql SQL code to prepare
130  * @param nparams number of parameters in sql
131  * @param line code line for error reporting
132  * @return GNUNET_OK on success
133  */
134 static int
135 pq_prepare (struct Plugin *plugin,
136             const char *name, const char *sql, int nparams, int line)
137 {
138   PGresult *ret;
139   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
140   if (GNUNET_OK !=
141       check_result (plugin, 
142                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
143     return GNUNET_SYSERR;
144   PQclear (ret);
145   return GNUNET_OK;
146 }
147
148 /**
149  * @brief Get a database handle
150  *
151  * @param plugin global context
152  * @return GNUNET_OK on success, GNUNET_SYSERR on error
153  */
154 static int
155 init_connection (struct Plugin *plugin)
156 {
157   char *conninfo;
158   PGresult *ret;
159
160   /* Open database and precompile statements */
161   conninfo = NULL;
162   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
163                                          "datastore-postgres",
164                                          "CONFIG",
165                                          &conninfo);
166   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
167   if (NULL == plugin->dbh)
168     {
169       /* FIXME: warn about out-of-memory? */
170       GNUNET_free_non_null (conninfo);
171       return GNUNET_SYSERR;
172     }
173   if (PQstatus (plugin->dbh) != CONNECTION_OK)
174     {
175       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
176                        "datastore-postgres",
177                        _("Unable to initialize Postgres with configuration `%s': %s"),
178                        conninfo,
179                        PQerrorMessage (plugin->dbh));
180       PQfinish (plugin->dbh);
181       plugin->dbh = NULL;
182       GNUNET_free_non_null (conninfo);
183       return GNUNET_SYSERR;
184     }
185   GNUNET_free_non_null (conninfo);
186   ret = PQexec (plugin->dbh,
187                 "CREATE TABLE gn090 ("
188                 "  repl INTEGER NOT NULL DEFAULT 0,"
189                 "  type INTEGER NOT NULL DEFAULT 0,"
190                 "  prio INTEGER NOT NULL DEFAULT 0,"
191                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
192                 "  expire BIGINT NOT NULL DEFAULT 0,"
193                 "  hash BYTEA NOT NULL DEFAULT '',"
194                 "  vhash BYTEA NOT NULL DEFAULT '',"
195                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
196   if ( (ret == NULL) || 
197        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
198          (0 != strcmp ("42P07",    /* duplicate table */
199                        PQresultErrorField
200                        (ret,
201                         PG_DIAG_SQLSTATE)))))
202     {
203       (void) check_result (plugin,
204                            ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
205       PQfinish (plugin->dbh);
206       plugin->dbh = NULL;
207       return GNUNET_SYSERR;
208     }
209   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
210     {
211       if ((GNUNET_OK !=
212            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
213           (GNUNET_OK !=
214            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
215                     __LINE__))
216           || (GNUNET_OK !=
217               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
218           || (GNUNET_OK !=
219               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
220           || (GNUNET_OK !=
221               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
222                        __LINE__))
223           || (GNUNET_OK !=
224               pq_exec
225               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
226                __LINE__))
227           || (GNUNET_OK !=
228               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
229                        __LINE__)))
230         {
231           PQclear (ret);
232           PQfinish (plugin->dbh);
233           plugin->dbh = NULL;
234           return GNUNET_SYSERR;
235         }
236     }
237   PQclear (ret);
238   ret = PQexec (plugin->dbh,
239                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
240   if (GNUNET_OK != 
241       check_result (plugin,
242                     ret, PGRES_COMMAND_OK,
243                     "ALTER TABLE", "gn090", __LINE__))
244     {
245       PQfinish (plugin->dbh);
246       plugin->dbh = NULL;
247       return GNUNET_SYSERR;
248     }
249   PQclear (ret);
250   ret = PQexec (plugin->dbh,
251                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
252   if (GNUNET_OK !=
253       check_result (plugin,
254                     ret, PGRES_COMMAND_OK,
255                     "ALTER TABLE", "gn090", __LINE__))
256     {
257       PQfinish (plugin->dbh);
258       plugin->dbh = NULL;
259       return GNUNET_SYSERR;
260     }
261   PQclear (ret);
262   ret = PQexec (plugin->dbh,
263                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
264   if (GNUNET_OK !=
265       check_result (plugin,
266                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
267     {
268       PQfinish (plugin->dbh);
269       plugin->dbh = NULL;
270       return GNUNET_SYSERR;
271     }
272   PQclear (ret);
273   if ((GNUNET_OK !=
274        pq_prepare (plugin,
275                    "getvt",
276                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
277                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
278                    "ORDER BY oid ASC LIMIT 1 OFFSET $4",
279                    4,
280                    __LINE__)) ||
281       (GNUNET_OK !=
282        pq_prepare (plugin,
283                    "gett",
284                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
285                    "WHERE hash=$1 AND type=$2 "
286                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
287                    3,
288                    __LINE__)) ||
289       (GNUNET_OK !=
290        pq_prepare (plugin,
291                    "getv",
292                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
293                    "WHERE hash=$1 AND vhash=$2 "
294                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
295                    3,
296                    __LINE__)) ||
297       (GNUNET_OK !=
298        pq_prepare (plugin,
299                    "get",
300                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
301                    "WHERE hash=$1 "
302                    "ORDER BY oid ASC LIMIT 1 OFFSET $2",
303                    2,
304                    __LINE__)) ||
305       (GNUNET_OK !=
306        pq_prepare (plugin,
307                    "put",
308                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) "
309                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
310                    8,
311                    __LINE__)) ||
312       (GNUNET_OK !=
313        pq_prepare (plugin,
314                    "update",
315                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
316                    "WHERE oid = $3",
317                    3,
318                    __LINE__)) ||
319       (GNUNET_OK !=
320        pq_prepare (plugin,
321                    "decrepl",
322                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
323                    "WHERE oid = $1",
324                    1,
325                    __LINE__)) ||
326       (GNUNET_OK !=
327        pq_prepare (plugin,
328                    "select_non_anonymous",
329                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
330                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
331                    1,
332                    __LINE__)) ||
333       (GNUNET_OK !=
334        pq_prepare (plugin,
335                    "select_expiration_order",
336                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
337                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "                      
338                    "UNION "                                             
339                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
340                    "ORDER BY prio ASC LIMIT 1) "                        
341                    "ORDER BY expire ASC LIMIT 1",
342                    1,
343                    __LINE__)) ||
344       (GNUNET_OK !=
345        pq_prepare (plugin,
346                    "select_replication_order",
347                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
348                    "ORDER BY repl DESC,RANDOM() LIMIT 1",
349                    0,
350                    __LINE__)) ||
351       (GNUNET_OK !=
352        pq_prepare (plugin,
353                    "delrow",
354                    "DELETE FROM gn090 " "WHERE oid=$1", 
355                    1,
356                    __LINE__)))
357     {
358       PQfinish (plugin->dbh);
359       plugin->dbh = NULL;
360       return GNUNET_SYSERR;
361     }
362   return GNUNET_OK;
363 }
364
365
366 /**
367  * Delete the row identified by the given rowid (qid
368  * in postgres).
369  *
370  * @param plugin global context
371  * @param rowid which row to delete
372  * @return GNUNET_OK on success
373  */
374 static int
375 delete_by_rowid (struct Plugin *plugin,
376                  unsigned int rowid)
377 {
378   uint32_t browid;
379   const char *paramValues[] = { (const char *) &browid };
380   int paramLengths[] = { sizeof (browid) };
381   const int paramFormats[] = { 1 };
382   PGresult *ret;
383
384   browid = htonl (rowid);
385   ret = PQexecPrepared (plugin->dbh,
386                         "delrow",
387                         1, paramValues, paramLengths, paramFormats, 1);
388   if (GNUNET_OK !=
389       check_result (plugin,
390                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
391                     __LINE__))
392     {
393       return GNUNET_SYSERR;
394     }
395   PQclear (ret);
396   return GNUNET_OK;
397 }
398
399
400 /**
401  * Get an estimate of how much space the database is
402  * currently using.
403  *
404  * @param cls our "struct Plugin*"
405  * @return number of bytes used on disk
406  */
407 static unsigned long long
408 postgres_plugin_estimate_size (void *cls)
409 {
410   struct Plugin *plugin = cls;
411   unsigned long long total;
412   PGresult *ret;
413
414   ret = PQexecParams (plugin->dbh,
415                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
416                       0, NULL, NULL, NULL, NULL, 1);
417   if (GNUNET_OK != check_result (plugin,
418                                  ret,
419                                  PGRES_TUPLES_OK,
420                                  "PQexecParams",
421                                  "get_size",
422                                  __LINE__))
423     {
424       return 0;
425     }
426   if ((PQntuples (ret) != 1) ||
427       (PQnfields (ret) != 1) ||
428       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
429     {
430       GNUNET_break (0);
431       PQclear (ret);
432       return 0;
433     }
434   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
435   PQclear (ret);
436   return total;
437 }
438
439
440 /**
441  * Store an item in the datastore.
442  *
443  * @param cls closure
444  * @param key key for the item
445  * @param size number of bytes in data
446  * @param data content stored
447  * @param type type of the content
448  * @param priority priority of the content
449  * @param anonymity anonymity-level for the content
450  * @param replication replication-level for the content
451  * @param expiration expiration time for the content
452  * @param msg set to error message
453  * @return GNUNET_OK on success
454  */
455 static int
456 postgres_plugin_put (void *cls,
457                      const GNUNET_HashCode * key,
458                      uint32_t size,
459                      const void *data,
460                      enum GNUNET_BLOCK_Type type,
461                      uint32_t priority,
462                      uint32_t anonymity,
463                      uint32_t replication,
464                      struct GNUNET_TIME_Absolute expiration,
465                      char **msg)
466 {
467   struct Plugin *plugin = cls;
468   GNUNET_HashCode vhash;
469   PGresult *ret;
470   uint32_t btype = htonl (type);
471   uint32_t bprio = htonl (priority);
472   uint32_t banon = htonl (anonymity);
473   uint32_t brepl = htonl (replication);
474   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
475   const char *paramValues[] = {
476     (const char *) &brepl,
477     (const char *) &btype,
478     (const char *) &bprio,
479     (const char *) &banon,
480     (const char *) &bexpi,
481     (const char *) key,
482     (const char *) &vhash,
483     (const char *) data
484   };
485   int paramLengths[] = {
486     sizeof (brepl),
487     sizeof (btype),
488     sizeof (bprio),
489     sizeof (banon),
490     sizeof (bexpi),
491     sizeof (GNUNET_HashCode),
492     sizeof (GNUNET_HashCode),
493     size
494   };
495   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
496
497   GNUNET_CRYPTO_hash (data, size, &vhash);
498   ret = PQexecPrepared (plugin->dbh,
499                         "put", 8, paramValues, paramLengths, paramFormats, 1);
500   if (GNUNET_OK != check_result (plugin, ret,
501                                  PGRES_COMMAND_OK,
502                                  "PQexecPrepared", "put", __LINE__))
503     return GNUNET_SYSERR;
504   PQclear (ret);
505   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
506 #if DEBUG_POSTGRES
507   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
508                    "datastore-postgres",
509                    "Stored %u bytes in database\n",
510                    (unsigned int) size);
511 #endif
512   return GNUNET_OK;
513 }
514
515
516 /**
517  * Function invoked to process the result and call
518  * the processor.
519  *
520  * @param plugin global plugin data
521  * @param proc function to call the value (once only).
522  * @param proc_cls closure for proc
523  * @param res result from exec
524  */
525 static void 
526 process_result (struct Plugin *plugin,
527                 PluginDatumProcessor proc, void *proc_cls,
528                 PGresult *res)
529 {
530   int iret;
531   enum GNUNET_BLOCK_Type type;
532   uint32_t anonymity;
533   uint32_t priority;
534   uint32_t size;
535   unsigned int rowid;
536   struct GNUNET_TIME_Absolute expiration_time;
537   GNUNET_HashCode key;
538
539   if (GNUNET_OK != check_result (plugin,
540                                  res,
541                                  PGRES_TUPLES_OK,
542                                  "PQexecPrepared",
543                                  "select",
544                                  __LINE__))
545     {
546 #if DEBUG_POSTGRES
547       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
548                        "datastore-postgres",
549                        "Ending iteration (postgres error)\n");
550 #endif
551       proc (proc_cls, 
552             NULL, 0, NULL, 0, 0, 0, 
553             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
554       return;
555     }
556
557   if (0 == PQntuples (res))
558     {
559       /* no result */
560 #if DEBUG_POSTGRES
561       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
562                        "datastore-postgres",
563                        "Ending iteration (no more results)\n");
564 #endif
565       proc (proc_cls, 
566             NULL, 0, NULL, 0, 0, 0, 
567             GNUNET_TIME_UNIT_ZERO_ABS, 0);
568       PQclear (res);
569       return; 
570     }
571   if ((1 != PQntuples (res)) ||
572       (7 != PQnfields (res)) ||
573       (sizeof (uint32_t) != PQfsize (res, 0)) ||
574       (sizeof (uint32_t) != PQfsize (res, 6)))
575     {
576       GNUNET_break (0);
577       proc (proc_cls, 
578             NULL, 0, NULL, 0, 0, 0, 
579             GNUNET_TIME_UNIT_ZERO_ABS, 0);
580       PQclear (res);
581       return;
582     }
583   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
584   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
585       (sizeof (uint32_t) != PQfsize (res, 1)) ||
586       (sizeof (uint32_t) != PQfsize (res, 2)) ||
587       (sizeof (uint64_t) != PQfsize (res, 3)) ||
588       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
589     {
590       GNUNET_break (0);
591       PQclear (res);
592       delete_by_rowid (plugin, rowid);
593       proc (proc_cls, 
594             NULL, 0, NULL, 0, 0, 0, 
595             GNUNET_TIME_UNIT_ZERO_ABS, 0);
596       return;
597     }
598
599   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
600   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
601   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
602   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
603   memcpy (&key, 
604           PQgetvalue (res, 0, 4), 
605           sizeof (GNUNET_HashCode));
606   size = PQgetlength (res, 0, 5);
607 #if DEBUG_POSTGRES
608   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
609                    "datastore-postgres",
610                    "Found result of size %u bytes and type %u in database\n",
611                    (unsigned int) size,
612                    (unsigned int) type);
613 #endif
614   iret = proc (proc_cls,
615                &key,
616                size,
617                PQgetvalue (res, 0, 5),
618                (enum GNUNET_BLOCK_Type) type,
619                priority,
620                anonymity,
621                expiration_time,
622                rowid);
623   PQclear (res);
624   if (iret == GNUNET_NO)
625     {
626 #if DEBUG_POSTGRES
627       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628                   "Processor asked for item %u to be removed.\n",
629                   rowid);
630 #endif
631       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
632         {
633 #if DEBUG_POSTGRES
634           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
635                            "datastore-postgres",
636                            "Deleting %u bytes from database\n",
637                            (unsigned int) size);
638 #endif
639           plugin->env->duc (plugin->env->cls,
640                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
641 #if DEBUG_POSTGRES
642           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
643                            "datastore-postgres",
644                            "Deleted %u bytes from database\n",
645                            (unsigned int) size);
646 #endif
647         }
648     }
649 }
650
651
652 /**
653  * Iterate over the results for a particular key
654  * in the datastore.
655  *
656  * @param cls closure
657  * @param key maybe NULL (to match all entries)
658  * @param vhash hash of the value, maybe NULL (to
659  *        match all values that have the right key).
660  *        Note that for DBlocks there is no difference
661  *        betwen key and vhash, but for other blocks
662  *        there may be!
663  * @param type entries of which type are relevant?
664  *     Use 0 for any type.
665  * @param iter function to call on each matching value;
666  *        will be called once with a NULL value at the end
667  * @param iter_cls closure for iter
668  */
669 static void
670 postgres_plugin_get_key (void *cls,
671                          uint64_t offset,
672                          const GNUNET_HashCode *key,
673                          const GNUNET_HashCode *vhash,
674                          enum GNUNET_BLOCK_Type type,
675                          PluginDatumProcessor proc, void *proc_cls)
676 {
677   struct Plugin *plugin = cls;
678   const int paramFormats[] = { 1, 1, 1, 1, 1 };
679   int paramLengths[4];
680   const char *paramValues[4];
681   int nparams;
682   const char *pname;
683   PGresult *ret;
684   uint64_t total;
685   uint64_t blimit_off;
686   uint32_t btype;
687
688   GNUNET_assert (key != NULL);
689   paramValues[0] = (const char*) key;
690   paramLengths[0] = sizeof (GNUNET_HashCode);
691   btype = htonl (type);
692   if (type != 0)
693     {
694       if (vhash != NULL)
695         {
696           paramValues[1] = (const char *) vhash;
697           paramLengths[1] = sizeof (GNUNET_HashCode);
698           paramValues[2] = (const char *) &btype;
699           paramLengths[2] = sizeof (btype);
700           paramValues[3] = (const char *) &blimit_off;
701           paramLengths[3] = sizeof (blimit_off);
702           nparams = 4;
703           pname = "getvt";
704           ret = PQexecParams (plugin->dbh,
705                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
706                               3,
707                               NULL,
708                               paramValues, 
709                               paramLengths,
710                               paramFormats, 1);
711         }
712       else
713         {
714           paramValues[1] = (const char *) &btype;
715           paramLengths[1] = sizeof (btype);
716           paramValues[2] = (const char *) &blimit_off;
717           paramLengths[2] = sizeof (blimit_off);
718           nparams = 3;
719           pname = "gett";
720           ret = PQexecParams (plugin->dbh,
721                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
722                               2,
723                               NULL,
724                               paramValues, 
725                               paramLengths, 
726                               paramFormats, 1);
727         }
728     }
729   else
730     {
731       if (vhash != NULL)
732         {
733           paramValues[1] = (const char *) vhash;
734           paramLengths[1] = sizeof (GNUNET_HashCode);
735           paramValues[2] = (const char *) &blimit_off;
736           paramLengths[2] = sizeof (blimit_off);
737           nparams = 3;
738           pname = "getv";
739           ret = PQexecParams (plugin->dbh,
740                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
741                               2,
742                               NULL,
743                               paramValues, 
744                               paramLengths,
745                               paramFormats, 1);
746         }
747       else
748         {
749           paramValues[1] = (const char *) &blimit_off;
750           paramLengths[1] = sizeof (blimit_off);
751           nparams = 2;
752           pname = "get";
753           ret = PQexecParams (plugin->dbh,
754                               "SELECT count(*) FROM gn090 WHERE hash=$1",
755                               1,
756                               NULL,
757                               paramValues, 
758                               paramLengths,
759                               paramFormats, 1);
760         }
761     }
762   if (GNUNET_OK != check_result (plugin,
763                                  ret,
764                                  PGRES_TUPLES_OK,
765                                  "PQexecParams",
766                                  pname,
767                                  __LINE__))
768     {
769       proc (proc_cls, 
770             NULL, 0, NULL, 0, 0, 0, 
771             GNUNET_TIME_UNIT_ZERO_ABS, 0);
772       return;
773     }
774   if ((PQntuples (ret) != 1) ||
775       (PQnfields (ret) != 1) ||
776       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
777     {
778       GNUNET_break (0);
779       PQclear (ret);
780       proc (proc_cls, 
781             NULL, 0, NULL, 0, 0, 0, 
782             GNUNET_TIME_UNIT_ZERO_ABS, 0);
783       return;
784     }
785   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
786   PQclear (ret);
787   if (total == 0)
788     {
789       proc (proc_cls, 
790             NULL, 0, NULL, 0, 0, 0, 
791             GNUNET_TIME_UNIT_ZERO_ABS, 0);
792       return;
793     }
794   blimit_off = GNUNET_htonll (offset % total);
795   ret = PQexecPrepared (plugin->dbh,
796                         pname,
797                         nparams,
798                         paramValues, 
799                         paramLengths,
800                         paramFormats, 1);
801   process_result (plugin,
802                   proc, proc_cls,
803                   ret);
804 }
805
806
807 /**
808  * Select a subset of the items in the datastore and call
809  * the given iterator for each of them.
810  *
811  * @param cls our "struct Plugin*"
812  * @param type entries of which type should be considered?
813  *        Use 0 for any type.
814  * @param iter function to call on each matching value;
815  *        will be called once with a NULL value at the end
816  * @param iter_cls closure for iter
817  */
818 static void
819 postgres_plugin_get_zero_anonymity (void *cls,
820                                     uint64_t offset,
821                                     enum GNUNET_BLOCK_Type type,
822                                     PluginDatumProcessor proc, void *proc_cls)
823 {
824   struct Plugin *plugin = cls;
825   uint32_t btype;
826   uint64_t boff;
827   const int paramFormats[] = { 1, 1 };
828   int paramLengths[] = { sizeof (btype), sizeof (boff) };
829   const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
830   PGresult *ret;
831
832   btype = htonl ((uint32_t) type);
833   boff = GNUNET_htonll (offset);
834   ret = PQexecPrepared (plugin->dbh,
835                         "select_non_anonymous",
836                         2,
837                         paramValues, 
838                         paramLengths,
839                         paramFormats, 1);
840   process_result (plugin,
841                   proc, proc_cls,
842                   ret);
843 }
844
845
846 /**
847  * Context for 'repl_iter' function.
848  */
849 struct ReplCtx
850 {
851   
852   /**
853    * Plugin handle.
854    */
855   struct Plugin *plugin;
856   
857   /**
858    * Function to call for the result (or the NULL).
859    */
860   PluginDatumProcessor proc;
861   
862   /**
863    * Closure for proc.
864    */
865   void *proc_cls;
866 };
867
868
869 /**
870  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
871  * Decrements the replication counter and calls the original
872  * iterator.
873  *
874  * @param cls closure
875  * @param next_cls closure to pass to the "next" function.
876  * @param key key for the content
877  * @param size number of bytes in data
878  * @param data content stored
879  * @param type type of the content
880  * @param priority priority of the content
881  * @param anonymity anonymity-level for the content
882  * @param expiration expiration time for the content
883  * @param uid unique identifier for the datum;
884  *        maybe 0 if no unique identifier is available
885  *
886  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
887  *         (continue on call to "next", of course),
888  *         GNUNET_NO to delete the item and continue (if supported)
889  */
890 static int
891 repl_proc (void *cls,
892            const GNUNET_HashCode *key,
893            uint32_t size,
894            const void *data,
895            enum GNUNET_BLOCK_Type type,
896            uint32_t priority,
897            uint32_t anonymity,
898            struct GNUNET_TIME_Absolute expiration, 
899            uint64_t uid)
900 {
901   struct ReplCtx *rc = cls;
902   struct Plugin *plugin = rc->plugin;
903   int ret;
904   PGresult *qret;
905   uint32_t boid;
906
907   ret = rc->proc (rc->proc_cls,
908                   key,
909                   size, data, 
910                   type, priority, anonymity, expiration,
911                   uid);
912   if (NULL != key)
913     {
914       boid = htonl ( (uint32_t) uid);
915       const char *paramValues[] = {
916         (const char *) &boid,
917       };
918       int paramLengths[] = {
919         sizeof (boid),
920       };
921       const int paramFormats[] = { 1 };
922       qret = PQexecPrepared (plugin->dbh,
923                             "decrepl",
924                             1, paramValues, paramLengths, paramFormats, 1);
925       if (GNUNET_OK != check_result (plugin,
926                                      qret,
927                                      PGRES_COMMAND_OK,
928                                      "PQexecPrepared", 
929                                      "decrepl", __LINE__))
930         return GNUNET_SYSERR;
931       PQclear (qret);
932     }
933   return ret;
934 }
935
936
937 /**
938  * Get a random item for replication.  Returns a single, not expired, random item
939  * from those with the highest replication counters.  The item's 
940  * replication counter is decremented by one IF it was positive before.
941  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
942  *
943  * @param cls closure
944  * @param proc function to call the value (once only).
945  * @param proc_cls closure for iter
946  */
947 static void
948 postgres_plugin_get_replication (void *cls,
949                                  PluginDatumProcessor proc, void *proc_cls)
950 {
951   struct Plugin *plugin = cls;
952   struct ReplCtx rc;
953   PGresult *ret;
954
955   rc.plugin = plugin;
956   rc.proc = proc;
957   rc.proc_cls = proc_cls;
958   ret = PQexecPrepared (plugin->dbh,
959                         "select_replication_order",
960                         0,
961                         NULL, NULL, NULL, 1);
962   process_result (plugin,
963                   &repl_proc, &rc,
964                   ret);
965 }
966
967
968 /**
969  * Get a random item for expiration.
970  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
971  *
972  * @param cls closure
973  * @param proc function to call the value (once only).
974  * @param proc_cls closure for iter
975  */
976 static void
977 postgres_plugin_get_expiration (void *cls,
978                                 PluginDatumProcessor proc, void *proc_cls)
979 {
980   struct Plugin *plugin = cls;
981   uint64_t btime;
982   const int paramFormats[] = { 1 };
983   int paramLengths[] = { sizeof (btime) };
984   const char *paramValues[] = { (const char*) &btime };
985   PGresult *ret;
986   
987   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
988   ret = PQexecPrepared (plugin->dbh,
989                         "select_expiration_order",
990                         1,
991                         paramValues,
992                         paramLengths,
993                         paramFormats, 
994                         1);
995   process_result (plugin,
996                   proc, proc_cls,
997                   ret);
998 }
999
1000
1001 /**
1002  * Update the priority for a particular key in the datastore.  If
1003  * the expiration time in value is different than the time found in
1004  * the datastore, the higher value should be kept.  For the
1005  * anonymity level, the lower value is to be used.  The specified
1006  * priority should be added to the existing priority, ignoring the
1007  * priority in value.
1008  *
1009  * Note that it is possible for multiple values to match this put.
1010  * In that case, all of the respective values are updated.
1011  *
1012  * @param cls our "struct Plugin*"
1013  * @param uid unique identifier of the datum
1014  * @param delta by how much should the priority
1015  *     change?  If priority + delta < 0 the
1016  *     priority should be set to 0 (never go
1017  *     negative).
1018  * @param expire new expiration time should be the
1019  *     MAX of any existing expiration time and
1020  *     this value
1021  * @param msg set to error message
1022  * @return GNUNET_OK on success
1023  */
1024 static int
1025 postgres_plugin_update (void *cls,
1026                         uint64_t uid,
1027                         int delta, struct GNUNET_TIME_Absolute expire,
1028                         char **msg)
1029 {
1030   struct Plugin *plugin = cls;
1031   PGresult *ret;
1032   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1033   uint32_t boid = htonl ( (uint32_t) uid);
1034   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1035   const char *paramValues[] = {
1036     (const char *) &bdelta,
1037     (const char *) &bexpire,
1038     (const char *) &boid,
1039   };
1040   int paramLengths[] = {
1041     sizeof (bdelta),
1042     sizeof (bexpire),
1043     sizeof (boid),
1044   };
1045   const int paramFormats[] = { 1, 1, 1 };
1046
1047   ret = PQexecPrepared (plugin->dbh,
1048                         "update",
1049                         3, paramValues, paramLengths, paramFormats, 1);
1050   if (GNUNET_OK != check_result (plugin,
1051                                  ret,
1052                                  PGRES_COMMAND_OK,
1053                                  "PQexecPrepared", "update", __LINE__))
1054     return GNUNET_SYSERR;
1055   PQclear (ret);
1056   return GNUNET_OK;
1057 }
1058
1059
1060 /**
1061  * Drop database.
1062  */
1063 static void 
1064 postgres_plugin_drop (void *cls)
1065 {
1066   struct Plugin *plugin = cls;
1067
1068   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1069 }
1070
1071
1072 /**
1073  * Entry point for the plugin.
1074  *
1075  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1076  * @return our "struct Plugin*"
1077  */
1078 void *
1079 libgnunet_plugin_datastore_postgres_init (void *cls)
1080 {
1081   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1082   struct GNUNET_DATASTORE_PluginFunctions *api;
1083   struct Plugin *plugin;
1084
1085   plugin = GNUNET_malloc (sizeof (struct Plugin));
1086   plugin->env = env;
1087   if (GNUNET_OK != init_connection (plugin))
1088     {
1089       GNUNET_free (plugin);
1090       return NULL;
1091     }
1092   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1093   api->cls = plugin;
1094   api->estimate_size = &postgres_plugin_estimate_size;
1095   api->put = &postgres_plugin_put;
1096   api->update = &postgres_plugin_update;
1097   api->get_key = &postgres_plugin_get_key;
1098   api->get_replication = &postgres_plugin_get_replication;
1099   api->get_expiration = &postgres_plugin_get_expiration;
1100   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1101   api->drop = &postgres_plugin_drop;
1102   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1103                    "datastore-postgres",
1104                    _("Postgres database running\n"));
1105   return api;
1106 }
1107
1108
1109 /**
1110  * Exit point from the plugin.
1111  * @param cls our "struct Plugin*"
1112  * @return always NULL
1113  */
1114 void *
1115 libgnunet_plugin_datastore_postgres_done (void *cls)
1116 {
1117   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1118   struct Plugin *plugin = api->cls;
1119   
1120   PQfinish (plugin->dbh);
1121   GNUNET_free (plugin);
1122   GNUNET_free (api);
1123   return NULL;
1124 }
1125
1126 /* end of plugin_datastore_postgres.c */