more client code
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin 
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * Check if the result obtained from Postgres has
66  * the desired status code.  If not, log an error, clear the
67  * result and return GNUNET_SYSERR.
68  * 
69  * @param plugin global context
70  * @param ret result to check
71  * @param expected_status expected return value
72  * @param command name of SQL command that was run
73  * @param args arguments to SQL command
74  * @param line line number for error reporting
75  * @return GNUNET_OK if the result is acceptable
76  */
77 static int
78 check_result (struct Plugin *plugin,
79               PGresult * ret,
80               int expected_status,
81               const char *command, const char *args, int line)
82 {
83   if (ret == NULL)
84     {
85       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
86                        "datastore-postgres",
87                        "Postgres failed to allocate result for `%s:%s' at %d\n",
88                        command, args, line);
89       return GNUNET_SYSERR;
90     }
91   if (PQresultStatus (ret) != expected_status)
92     {
93       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
94                        "datastore-postgres",
95                        _("`%s:%s' failed at %s:%d with error: %s"),
96                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
97       PQclear (ret);
98       return GNUNET_SYSERR;
99     }
100   return GNUNET_OK;
101 }
102
103 /**
104  * Run simple SQL statement (without results).
105  *
106  * @param plugin global context
107  * @param sql statement to run
108  * @param line code line for error reporting
109  */
110 static int
111 pq_exec (struct Plugin *plugin,
112          const char *sql, int line)
113 {
114   PGresult *ret;
115   ret = PQexec (plugin->dbh, sql);
116   if (GNUNET_OK != check_result (plugin,
117                                  ret, 
118                                  PGRES_COMMAND_OK, "PQexec", sql, line))
119     return GNUNET_SYSERR;
120   PQclear (ret);
121   return GNUNET_OK;
122 }
123
124 /**
125  * Prepare SQL statement.
126  *
127  * @param plugin global context
128  * @param name name for the prepared SQL statement
129  * @param sql SQL code to prepare
130  * @param nparams number of parameters in sql
131  * @param line code line for error reporting
132  * @return GNUNET_OK on success
133  */
134 static int
135 pq_prepare (struct Plugin *plugin,
136             const char *name, const char *sql, int nparams, int line)
137 {
138   PGresult *ret;
139   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
140   if (GNUNET_OK !=
141       check_result (plugin, 
142                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
143     return GNUNET_SYSERR;
144   PQclear (ret);
145   return GNUNET_OK;
146 }
147
148 /**
149  * @brief Get a database handle
150  *
151  * @param plugin global context
152  * @return GNUNET_OK on success, GNUNET_SYSERR on error
153  */
154 static int
155 init_connection (struct Plugin *plugin)
156 {
157   char *conninfo;
158   PGresult *ret;
159
160   /* Open database and precompile statements */
161   conninfo = NULL;
162   (void) GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
163                                                 "datastore-postgres",
164                                                 "CONFIG",
165                                                 &conninfo);
166   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
167   if (NULL == plugin->dbh)
168     {
169       /* FIXME: warn about out-of-memory? */
170       GNUNET_free_non_null (conninfo);
171       return GNUNET_SYSERR;
172     }
173   if (PQstatus (plugin->dbh) != CONNECTION_OK)
174     {
175       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
176                        "datastore-postgres",
177                        _("Unable to initialize Postgres with configuration `%s': %s"),
178                        conninfo,
179                        PQerrorMessage (plugin->dbh));
180       PQfinish (plugin->dbh);
181       plugin->dbh = NULL;
182       GNUNET_free_non_null (conninfo);
183       return GNUNET_SYSERR;
184     }
185   GNUNET_free_non_null (conninfo);
186   ret = PQexec (plugin->dbh,
187                 "CREATE TABLE gn090 ("
188                 "  repl INTEGER NOT NULL DEFAULT 0,"
189                 "  type INTEGER NOT NULL DEFAULT 0,"
190                 "  prio INTEGER NOT NULL DEFAULT 0,"
191                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
192                 "  expire BIGINT NOT NULL DEFAULT 0,"
193                 "  rvalue BIGINT NOT NULL DEFAULT 0,"
194                 "  hash BYTEA NOT NULL DEFAULT '',"
195                 "  vhash BYTEA NOT NULL DEFAULT '',"
196                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
197   if ( (ret == NULL) || 
198        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
199          (0 != strcmp ("42P07",    /* duplicate table */
200                        PQresultErrorField
201                        (ret,
202                         PG_DIAG_SQLSTATE)))))
203     {
204       (void) check_result (plugin,
205                            ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
206       PQfinish (plugin->dbh);
207       plugin->dbh = NULL;
208       return GNUNET_SYSERR;
209     }
210   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
211     {
212       if ((GNUNET_OK !=
213            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
214           (GNUNET_OK !=
215            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
216                     __LINE__))
217           || (GNUNET_OK !=
218               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
219           || (GNUNET_OK !=
220               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
221           || (GNUNET_OK !=
222               pq_exec (plugin, "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)",
223                        __LINE__))
224           || (GNUNET_OK !=
225               pq_exec
226               (plugin, "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)",
227                __LINE__))
228           || (GNUNET_OK !=
229               pq_exec
230               (plugin, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)",
231                __LINE__))
232           || (GNUNET_OK !=
233               pq_exec (plugin, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)",
234                        __LINE__)))
235         {
236           PQclear (ret);
237           PQfinish (plugin->dbh);
238           plugin->dbh = NULL;
239           return GNUNET_SYSERR;
240         }
241     }
242   PQclear (ret);
243   ret = PQexec (plugin->dbh,
244                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
245   if (GNUNET_OK != 
246       check_result (plugin,
247                     ret, PGRES_COMMAND_OK,
248                     "ALTER TABLE", "gn090", __LINE__))
249     {
250       PQfinish (plugin->dbh);
251       plugin->dbh = NULL;
252       return GNUNET_SYSERR;
253     }
254   PQclear (ret);
255   ret = PQexec (plugin->dbh,
256                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
257   if (GNUNET_OK !=
258       check_result (plugin,
259                     ret, PGRES_COMMAND_OK,
260                     "ALTER TABLE", "gn090", __LINE__))
261     {
262       PQfinish (plugin->dbh);
263       plugin->dbh = NULL;
264       return GNUNET_SYSERR;
265     }
266   PQclear (ret);
267   ret = PQexec (plugin->dbh,
268                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
269   if (GNUNET_OK !=
270       check_result (plugin,
271                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
272     {
273       PQfinish (plugin->dbh);
274       plugin->dbh = NULL;
275       return GNUNET_SYSERR;
276     }
277   PQclear (ret);
278   if ((GNUNET_OK !=
279        pq_prepare (plugin,
280                    "getvt",
281                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
282                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
283                    "ORDER BY oid ASC LIMIT 1 OFFSET $4",
284                    4,
285                    __LINE__)) ||
286       (GNUNET_OK !=
287        pq_prepare (plugin,
288                    "gett",
289                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
290                    "WHERE hash=$1 AND type=$2 "
291                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
292                    3,
293                    __LINE__)) ||
294       (GNUNET_OK !=
295        pq_prepare (plugin,
296                    "getv",
297                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
298                    "WHERE hash=$1 AND vhash=$2 "
299                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
300                    3,
301                    __LINE__)) ||
302       (GNUNET_OK !=
303        pq_prepare (plugin,
304                    "get",
305                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
306                    "WHERE hash=$1 "
307                    "ORDER BY oid ASC LIMIT 1 OFFSET $2",
308                    2,
309                    __LINE__)) ||
310       (GNUNET_OK !=
311        pq_prepare (plugin,
312                    "put",
313                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
314                    "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)",
315                    9,
316                    __LINE__)) ||
317       (GNUNET_OK !=
318        pq_prepare (plugin,
319                    "update",
320                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
321                    "WHERE oid = $3",
322                    3,
323                    __LINE__)) ||
324       (GNUNET_OK !=
325        pq_prepare (plugin,
326                    "decrepl",
327                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
328                    "WHERE oid = $1",
329                    1,
330                    __LINE__)) ||
331       (GNUNET_OK !=
332        pq_prepare (plugin,
333                    "select_non_anonymous",
334                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
335                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
336                    1,
337                    __LINE__)) ||
338       (GNUNET_OK !=
339        pq_prepare (plugin,
340                    "select_expiration_order",
341                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
342                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "                      
343                    "UNION "                                             
344                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
345                    "ORDER BY prio ASC LIMIT 1) "                        
346                    "ORDER BY expire ASC LIMIT 1",
347                    1,
348                    __LINE__)) ||
349       (GNUNET_OK !=
350        pq_prepare (plugin,
351                    "select_replication_order",
352                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
353                    "ORDER BY repl DESC,RANDOM() LIMIT 1",
354                    0,
355                    __LINE__)) ||
356       (GNUNET_OK !=
357        pq_prepare (plugin,
358                    "delrow",
359                    "DELETE FROM gn090 " "WHERE oid=$1", 
360                    1,
361                    __LINE__)))
362     {
363       PQfinish (plugin->dbh);
364       plugin->dbh = NULL;
365       return GNUNET_SYSERR;
366     }
367   return GNUNET_OK;
368 }
369
370
371 /**
372  * Delete the row identified by the given rowid (qid
373  * in postgres).
374  *
375  * @param plugin global context
376  * @param rowid which row to delete
377  * @return GNUNET_OK on success
378  */
379 static int
380 delete_by_rowid (struct Plugin *plugin,
381                  unsigned int rowid)
382 {
383   uint32_t browid;
384   const char *paramValues[] = { (const char *) &browid };
385   int paramLengths[] = { sizeof (browid) };
386   const int paramFormats[] = { 1 };
387   PGresult *ret;
388
389   browid = htonl (rowid);
390   ret = PQexecPrepared (plugin->dbh,
391                         "delrow",
392                         1, paramValues, paramLengths, paramFormats, 1);
393   if (GNUNET_OK !=
394       check_result (plugin,
395                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
396                     __LINE__))
397     {
398       return GNUNET_SYSERR;
399     }
400   PQclear (ret);
401   return GNUNET_OK;
402 }
403
404
405 /**
406  * Get an estimate of how much space the database is
407  * currently using.
408  *
409  * @param cls our "struct Plugin*"
410  * @return number of bytes used on disk
411  */
412 static unsigned long long
413 postgres_plugin_estimate_size (void *cls)
414 {
415   struct Plugin *plugin = cls;
416   unsigned long long total;
417   PGresult *ret;
418
419   ret = PQexecParams (plugin->dbh,
420                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
421                       0, NULL, NULL, NULL, NULL, 1);
422   if (GNUNET_OK != check_result (plugin,
423                                  ret,
424                                  PGRES_TUPLES_OK,
425                                  "PQexecParams",
426                                  "get_size",
427                                  __LINE__))
428     {
429       return 0;
430     }
431   if ((PQntuples (ret) != 1) ||
432       (PQnfields (ret) != 1) ||
433       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
434     {
435       GNUNET_break (0);
436       PQclear (ret);
437       return 0;
438     }
439   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
440   PQclear (ret);
441   return total;
442 }
443
444
445 /**
446  * Store an item in the datastore.
447  *
448  * @param cls closure
449  * @param key key for the item
450  * @param size number of bytes in data
451  * @param data content stored
452  * @param type type of the content
453  * @param priority priority of the content
454  * @param anonymity anonymity-level for the content
455  * @param replication replication-level for the content
456  * @param expiration expiration time for the content
457  * @param msg set to error message
458  * @return GNUNET_OK on success
459  */
460 static int
461 postgres_plugin_put (void *cls,
462                      const GNUNET_HashCode * key,
463                      uint32_t size,
464                      const void *data,
465                      enum GNUNET_BLOCK_Type type,
466                      uint32_t priority,
467                      uint32_t anonymity,
468                      uint32_t replication,
469                      struct GNUNET_TIME_Absolute expiration,
470                      char **msg)
471 {
472   struct Plugin *plugin = cls;
473   GNUNET_HashCode vhash;
474   PGresult *ret;
475   uint32_t btype = htonl (type);
476   uint32_t bprio = htonl (priority);
477   uint32_t banon = htonl (anonymity);
478   uint32_t brepl = htonl (replication);
479   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
480   const char *paramValues[] = {
481     (const char *) &brepl,
482     (const char *) &btype,
483     (const char *) &bprio,
484     (const char *) &banon,
485     (const char *) &bexpi,
486     (const char *) key,
487     (const char *) &vhash,
488     (const char *) data
489   };
490   int paramLengths[] = {
491     sizeof (brepl),
492     sizeof (btype),
493     sizeof (bprio),
494     sizeof (banon),
495     sizeof (bexpi),
496     sizeof (GNUNET_HashCode),
497     sizeof (GNUNET_HashCode),
498     size
499   };
500   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
501
502   GNUNET_CRYPTO_hash (data, size, &vhash);
503   ret = PQexecPrepared (plugin->dbh,
504                         "put", 8, paramValues, paramLengths, paramFormats, 1);
505   if (GNUNET_OK != check_result (plugin, ret,
506                                  PGRES_COMMAND_OK,
507                                  "PQexecPrepared", "put", __LINE__))
508     return GNUNET_SYSERR;
509   PQclear (ret);
510   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
511 #if DEBUG_POSTGRES
512   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
513                    "datastore-postgres",
514                    "Stored %u bytes in database\n",
515                    (unsigned int) size);
516 #endif
517   return GNUNET_OK;
518 }
519
520
521 /**
522  * Function invoked to process the result and call
523  * the processor.
524  *
525  * @param plugin global plugin data
526  * @param proc function to call the value (once only).
527  * @param proc_cls closure for proc
528  * @param res result from exec
529  * @param line line number for error messages
530  */
531 static void 
532 process_result (struct Plugin *plugin,
533                 PluginDatumProcessor proc, void *proc_cls,
534                 PGresult *res,
535                 int line)
536 {
537   int iret;
538   enum GNUNET_BLOCK_Type type;
539   uint32_t anonymity;
540   uint32_t priority;
541   uint32_t size;
542   unsigned int rowid;
543   struct GNUNET_TIME_Absolute expiration_time;
544   GNUNET_HashCode key;
545
546   if (GNUNET_OK != check_result (plugin,
547                                  res,
548                                  PGRES_TUPLES_OK,
549                                  "PQexecPrepared",
550                                  "select",
551                                  line))
552     {
553 #if DEBUG_POSTGRES
554       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
555                        "datastore-postgres",
556                        "Ending iteration (postgres error)\n");
557 #endif
558       proc (proc_cls, 
559             NULL, 0, NULL, 0, 0, 0, 
560             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
561       return;
562     }
563
564   if (0 == PQntuples (res))
565     {
566       /* no result */
567 #if DEBUG_POSTGRES
568       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
569                        "datastore-postgres",
570                        "Ending iteration (no more results)\n");
571 #endif
572       proc (proc_cls, 
573             NULL, 0, NULL, 0, 0, 0, 
574             GNUNET_TIME_UNIT_ZERO_ABS, 0);
575       PQclear (res);
576       return; 
577     }
578   if ((1 != PQntuples (res)) ||
579       (7 != PQnfields (res)) ||
580       (sizeof (uint32_t) != PQfsize (res, 0)) ||
581       (sizeof (uint32_t) != PQfsize (res, 6)))
582     {
583       GNUNET_break (0);
584       proc (proc_cls, 
585             NULL, 0, NULL, 0, 0, 0, 
586             GNUNET_TIME_UNIT_ZERO_ABS, 0);
587       PQclear (res);
588       return;
589     }
590   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
591   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
592       (sizeof (uint32_t) != PQfsize (res, 1)) ||
593       (sizeof (uint32_t) != PQfsize (res, 2)) ||
594       (sizeof (uint64_t) != PQfsize (res, 3)) ||
595       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
596     {
597       GNUNET_break (0);
598       PQclear (res);
599       delete_by_rowid (plugin, rowid);
600       proc (proc_cls, 
601             NULL, 0, NULL, 0, 0, 0, 
602             GNUNET_TIME_UNIT_ZERO_ABS, 0);
603       return;
604     }
605
606   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
607   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
608   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
609   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
610   memcpy (&key, 
611           PQgetvalue (res, 0, 4), 
612           sizeof (GNUNET_HashCode));
613   size = PQgetlength (res, 0, 5);
614 #if DEBUG_POSTGRES
615   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
616                    "datastore-postgres",
617                    "Found result of size %u bytes and type %u in database\n",
618                    (unsigned int) size,
619                    (unsigned int) type);
620 #endif
621   iret = proc (proc_cls,
622                &key,
623                size,
624                PQgetvalue (res, 0, 5),
625                (enum GNUNET_BLOCK_Type) type,
626                priority,
627                anonymity,
628                expiration_time,
629                rowid);
630   PQclear (res);
631   if (iret == GNUNET_NO)
632     {
633 #if DEBUG_POSTGRES
634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635                   "Processor asked for item %u to be removed.\n",
636                   rowid);
637 #endif
638       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
639         {
640 #if DEBUG_POSTGRES
641           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
642                            "datastore-postgres",
643                            "Deleting %u bytes from database\n",
644                            (unsigned int) size);
645 #endif
646           plugin->env->duc (plugin->env->cls,
647                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
648 #if DEBUG_POSTGRES
649           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
650                            "datastore-postgres",
651                            "Deleted %u bytes from database\n",
652                            (unsigned int) size);
653 #endif
654         }
655     }
656 }
657
658
659 /**
660  * Iterate over the results for a particular key
661  * in the datastore.
662  *
663  * @param cls closure
664  * @param offset offset of the result (modulo num-results); 
665  *        specific ordering does not matter for the offset
666  * @param key maybe NULL (to match all entries)
667  * @param vhash hash of the value, maybe NULL (to
668  *        match all values that have the right key).
669  *        Note that for DBlocks there is no difference
670  *        betwen key and vhash, but for other blocks
671  *        there may be!
672  * @param type entries of which type are relevant?
673  *     Use 0 for any type.
674  * @param proc function to call on the matching value;
675  *        will be called once with a NULL if no value matches
676  * @param proc_cls closure for iter
677  */
678 static void
679 postgres_plugin_get_key (void *cls,
680                          uint64_t offset,
681                          const GNUNET_HashCode *key,
682                          const GNUNET_HashCode *vhash,
683                          enum GNUNET_BLOCK_Type type,
684                          PluginDatumProcessor proc, void *proc_cls)
685 {
686   struct Plugin *plugin = cls;
687   const int paramFormats[] = { 1, 1, 1, 1, 1 };
688   int paramLengths[4];
689   const char *paramValues[4];
690   int nparams;
691   const char *pname;
692   PGresult *ret;
693   uint64_t total;
694   uint64_t blimit_off;
695   uint32_t btype;
696
697   GNUNET_assert (key != NULL);
698   paramValues[0] = (const char*) key;
699   paramLengths[0] = sizeof (GNUNET_HashCode);
700   btype = htonl (type);
701   if (type != 0)
702     {
703       if (vhash != NULL)
704         {
705           paramValues[1] = (const char *) vhash;
706           paramLengths[1] = sizeof (GNUNET_HashCode);
707           paramValues[2] = (const char *) &btype;
708           paramLengths[2] = sizeof (btype);
709           paramValues[3] = (const char *) &blimit_off;
710           paramLengths[3] = sizeof (blimit_off);
711           nparams = 4;
712           pname = "getvt";
713           ret = PQexecParams (plugin->dbh,
714                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
715                               3,
716                               NULL,
717                               paramValues, 
718                               paramLengths,
719                               paramFormats, 1);
720         }
721       else
722         {
723           paramValues[1] = (const char *) &btype;
724           paramLengths[1] = sizeof (btype);
725           paramValues[2] = (const char *) &blimit_off;
726           paramLengths[2] = sizeof (blimit_off);
727           nparams = 3;
728           pname = "gett";
729           ret = PQexecParams (plugin->dbh,
730                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
731                               2,
732                               NULL,
733                               paramValues, 
734                               paramLengths, 
735                               paramFormats, 1);
736         }
737     }
738   else
739     {
740       if (vhash != NULL)
741         {
742           paramValues[1] = (const char *) vhash;
743           paramLengths[1] = sizeof (GNUNET_HashCode);
744           paramValues[2] = (const char *) &blimit_off;
745           paramLengths[2] = sizeof (blimit_off);
746           nparams = 3;
747           pname = "getv";
748           ret = PQexecParams (plugin->dbh,
749                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
750                               2,
751                               NULL,
752                               paramValues, 
753                               paramLengths,
754                               paramFormats, 1);
755         }
756       else
757         {
758           paramValues[1] = (const char *) &blimit_off;
759           paramLengths[1] = sizeof (blimit_off);
760           nparams = 2;
761           pname = "get";
762           ret = PQexecParams (plugin->dbh,
763                               "SELECT count(*) FROM gn090 WHERE hash=$1",
764                               1,
765                               NULL,
766                               paramValues, 
767                               paramLengths,
768                               paramFormats, 1);
769         }
770     }
771   if (GNUNET_OK != check_result (plugin,
772                                  ret,
773                                  PGRES_TUPLES_OK,
774                                  "PQexecParams",
775                                  pname,
776                                  __LINE__))
777     {
778       proc (proc_cls, 
779             NULL, 0, NULL, 0, 0, 0, 
780             GNUNET_TIME_UNIT_ZERO_ABS, 0);
781       return;
782     }
783   if ((PQntuples (ret) != 1) ||
784       (PQnfields (ret) != 1) ||
785       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
786     {
787       GNUNET_break (0);
788       PQclear (ret);
789       proc (proc_cls, 
790             NULL, 0, NULL, 0, 0, 0, 
791             GNUNET_TIME_UNIT_ZERO_ABS, 0);
792       return;
793     }
794   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
795   PQclear (ret);
796   if (total == 0)
797     {
798       proc (proc_cls, 
799             NULL, 0, NULL, 0, 0, 0, 
800             GNUNET_TIME_UNIT_ZERO_ABS, 0);
801       return;
802     }
803   blimit_off = GNUNET_htonll (offset % total);
804   ret = PQexecPrepared (plugin->dbh,
805                         pname,
806                         nparams,
807                         paramValues, 
808                         paramLengths,
809                         paramFormats, 1);
810   process_result (plugin,
811                   proc, proc_cls,
812                   ret, __LINE__);
813 }
814
815
816 /**
817  * Select a subset of the items in the datastore and call
818  * the given iterator for each of them.
819  *
820  * @param cls our "struct Plugin*"
821  * @param type entries of which type should be considered?
822  *        Use 0 for any type.
823  * @param proc function to call on the matching value;
824  *        will be called with a NULL if no value matches
825  * @param proc_cls closure for proc
826  */
827 static void
828 postgres_plugin_get_zero_anonymity (void *cls,
829                                     uint64_t offset,
830                                     enum GNUNET_BLOCK_Type type,
831                                     PluginDatumProcessor proc, void *proc_cls)
832 {
833   struct Plugin *plugin = cls;
834   uint32_t btype;
835   uint64_t boff;
836   const int paramFormats[] = { 1, 1 };
837   int paramLengths[] = { sizeof (btype), sizeof (boff) };
838   const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
839   PGresult *ret;
840
841   btype = htonl ((uint32_t) type);
842   boff = GNUNET_htonll (offset);
843   ret = PQexecPrepared (plugin->dbh,
844                         "select_non_anonymous",
845                         2,
846                         paramValues, 
847                         paramLengths,
848                         paramFormats, 1);
849   process_result (plugin,
850                   proc, proc_cls,
851                   ret, __LINE__);
852 }
853
854
855 /**
856  * Context for 'repl_iter' function.
857  */
858 struct ReplCtx
859 {
860   
861   /**
862    * Plugin handle.
863    */
864   struct Plugin *plugin;
865   
866   /**
867    * Function to call for the result (or the NULL).
868    */
869   PluginDatumProcessor proc;
870   
871   /**
872    * Closure for proc.
873    */
874   void *proc_cls;
875 };
876
877
878 /**
879  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
880  * Decrements the replication counter and calls the original
881  * iterator.
882  *
883  * @param cls closure
884  * @param next_cls closure to pass to the "next" function.
885  * @param key key for the content
886  * @param size number of bytes in data
887  * @param data content stored
888  * @param type type of the content
889  * @param priority priority of the content
890  * @param anonymity anonymity-level for the content
891  * @param expiration expiration time for the content
892  * @param uid unique identifier for the datum;
893  *        maybe 0 if no unique identifier is available
894  *
895  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
896  *         (continue on call to "next", of course),
897  *         GNUNET_NO to delete the item and continue (if supported)
898  */
899 static int
900 repl_proc (void *cls,
901            const GNUNET_HashCode *key,
902            uint32_t size,
903            const void *data,
904            enum GNUNET_BLOCK_Type type,
905            uint32_t priority,
906            uint32_t anonymity,
907            struct GNUNET_TIME_Absolute expiration, 
908            uint64_t uid)
909 {
910   struct ReplCtx *rc = cls;
911   struct Plugin *plugin = rc->plugin;
912   int ret;
913   PGresult *qret;
914   uint32_t boid;
915
916   ret = rc->proc (rc->proc_cls,
917                   key,
918                   size, data, 
919                   type, priority, anonymity, expiration,
920                   uid);
921   if (NULL != key)
922     {
923       boid = htonl ( (uint32_t) uid);
924       const char *paramValues[] = {
925         (const char *) &boid,
926       };
927       int paramLengths[] = {
928         sizeof (boid),
929       };
930       const int paramFormats[] = { 1 };
931       qret = PQexecPrepared (plugin->dbh,
932                             "decrepl",
933                             1, paramValues, paramLengths, paramFormats, 1);
934       if (GNUNET_OK != check_result (plugin,
935                                      qret,
936                                      PGRES_COMMAND_OK,
937                                      "PQexecPrepared", 
938                                      "decrepl", __LINE__))
939         return GNUNET_SYSERR;
940       PQclear (qret);
941     }
942   return ret;
943 }
944
945
946 /**
947  * Get a random item for replication.  Returns a single, not expired, random item
948  * from those with the highest replication counters.  The item's 
949  * replication counter is decremented by one IF it was positive before.
950  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
951  *
952  * @param cls closure
953  * @param proc function to call the value (once only).
954  * @param proc_cls closure for proc
955  */
956 static void
957 postgres_plugin_get_replication (void *cls,
958                                  PluginDatumProcessor proc, void *proc_cls)
959 {
960   struct Plugin *plugin = cls;
961   struct ReplCtx rc;
962   PGresult *ret;
963
964   rc.plugin = plugin;
965   rc.proc = proc;
966   rc.proc_cls = proc_cls;
967   ret = PQexecPrepared (plugin->dbh,
968                         "select_replication_order",
969                         0,
970                         NULL, NULL, NULL, 1);
971   process_result (plugin,
972                   &repl_proc, &rc,
973                   ret, __LINE__);
974 }
975
976
977 /**
978  * Get a random item for expiration.
979  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
980  *
981  * @param cls closure
982  * @param proc function to call the value (once only).
983  * @param proc_cls closure for proc
984  */
985 static void
986 postgres_plugin_get_expiration (void *cls,
987                                 PluginDatumProcessor proc, void *proc_cls)
988 {
989   struct Plugin *plugin = cls;
990   uint64_t btime;
991   const int paramFormats[] = { 1 };
992   int paramLengths[] = { sizeof (btime) };
993   const char *paramValues[] = { (const char*) &btime };
994   PGresult *ret;
995   
996   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
997   ret = PQexecPrepared (plugin->dbh,
998                         "select_expiration_order",
999                         1,
1000                         paramValues,
1001                         paramLengths,
1002                         paramFormats, 
1003                         1);
1004   process_result (plugin,
1005                   proc, proc_cls,
1006                   ret, __LINE__);
1007 }
1008
1009
1010 /**
1011  * Update the priority for a particular key in the datastore.  If
1012  * the expiration time in value is different than the time found in
1013  * the datastore, the higher value should be kept.  For the
1014  * anonymity level, the lower value is to be used.  The specified
1015  * priority should be added to the existing priority, ignoring the
1016  * priority in value.
1017  *
1018  * Note that it is possible for multiple values to match this put.
1019  * In that case, all of the respective values are updated.
1020  *
1021  * @param cls our "struct Plugin*"
1022  * @param uid unique identifier of the datum
1023  * @param delta by how much should the priority
1024  *     change?  If priority + delta < 0 the
1025  *     priority should be set to 0 (never go
1026  *     negative).
1027  * @param expire new expiration time should be the
1028  *     MAX of any existing expiration time and
1029  *     this value
1030  * @param msg set to error message
1031  * @return GNUNET_OK on success
1032  */
1033 static int
1034 postgres_plugin_update (void *cls,
1035                         uint64_t uid,
1036                         int delta, struct GNUNET_TIME_Absolute expire,
1037                         char **msg)
1038 {
1039   struct Plugin *plugin = cls;
1040   PGresult *ret;
1041   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1042   uint32_t boid = htonl ( (uint32_t) uid);
1043   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1044   const char *paramValues[] = {
1045     (const char *) &bdelta,
1046     (const char *) &bexpire,
1047     (const char *) &boid,
1048   };
1049   int paramLengths[] = {
1050     sizeof (bdelta),
1051     sizeof (bexpire),
1052     sizeof (boid),
1053   };
1054   const int paramFormats[] = { 1, 1, 1 };
1055
1056   ret = PQexecPrepared (plugin->dbh,
1057                         "update",
1058                         3, paramValues, paramLengths, paramFormats, 1);
1059   if (GNUNET_OK != check_result (plugin,
1060                                  ret,
1061                                  PGRES_COMMAND_OK,
1062                                  "PQexecPrepared", "update", __LINE__))
1063     return GNUNET_SYSERR;
1064   PQclear (ret);
1065   return GNUNET_OK;
1066 }
1067
1068
1069 /**
1070  * Drop database.
1071  */
1072 static void 
1073 postgres_plugin_drop (void *cls)
1074 {
1075   struct Plugin *plugin = cls;
1076
1077   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1078 }
1079
1080
1081 /**
1082  * Entry point for the plugin.
1083  *
1084  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1085  * @return our "struct Plugin*"
1086  */
1087 void *
1088 libgnunet_plugin_datastore_postgres_init (void *cls)
1089 {
1090   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1091   struct GNUNET_DATASTORE_PluginFunctions *api;
1092   struct Plugin *plugin;
1093
1094   plugin = GNUNET_malloc (sizeof (struct Plugin));
1095   plugin->env = env;
1096   if (GNUNET_OK != init_connection (plugin))
1097     {
1098       GNUNET_free (plugin);
1099       return NULL;
1100     }
1101   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1102   api->cls = plugin;
1103   api->estimate_size = &postgres_plugin_estimate_size;
1104   api->put = &postgres_plugin_put;
1105   api->update = &postgres_plugin_update;
1106   api->get_key = &postgres_plugin_get_key;
1107   api->get_replication = &postgres_plugin_get_replication;
1108   api->get_expiration = &postgres_plugin_get_expiration;
1109   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1110   api->drop = &postgres_plugin_drop;
1111   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1112                    "datastore-postgres",
1113                    _("Postgres database running\n"));
1114   return api;
1115 }
1116
1117
1118 /**
1119  * Exit point from the plugin.
1120  * @param cls our "struct Plugin*"
1121  * @return always NULL
1122  */
1123 void *
1124 libgnunet_plugin_datastore_postgres_done (void *cls)
1125 {
1126   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1127   struct Plugin *plugin = api->cls;
1128   
1129   PQfinish (plugin->dbh);
1130   GNUNET_free (plugin);
1131   GNUNET_free (api);
1132   return NULL;
1133 }
1134
1135 /* end of plugin_datastore_postgres.c */