fixes
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Closure for 'postgres_next_request_cont'.
48  */
49 struct NextRequestClosure
50 {
51   /**
52    * Global plugin data.
53    */
54   struct Plugin *plugin;
55   
56   /**
57    * Function to call for each matching entry.
58    */
59   PluginIterator iter;
60   
61   /**
62    * Closure for 'iter'.
63    */
64   void *iter_cls;
65   
66   /**
67    * Parameters for the prepared statement.
68    */
69   const char *paramValues[5];
70   
71   /**
72    * Name of the prepared statement to run.
73    */
74   const char *pname;
75   
76   /**
77    * Size of values pointed to by paramValues.
78    */
79   int paramLengths[5];
80   
81   /**
82    * Number of paramters in paramValues/paramLengths.
83    */
84   int nparams; 
85   
86   /**
87    * Current time (possible parameter), big-endian.
88    */
89   uint64_t bnow;
90   
91   /**
92    * Key (possible parameter)
93    */
94   GNUNET_HashCode key;
95   
96   /**
97    * Hash of value (possible parameter)
98    */
99   GNUNET_HashCode vhash;
100   
101   /**
102    * Number of entries found so far
103    */
104   unsigned long long count;
105   
106   /**
107    * Offset this iteration starts at.
108    */
109   uint64_t off;
110   
111   /**
112    * Current offset to use in query, big-endian.
113    */
114   uint64_t blimit_off;
115   
116   /**
117    * Current total number of entries found so far, big-endian.
118    */
119   uint64_t bcount;
120   
121   /**
122    *  Overall number of matching entries.
123    */
124   unsigned long long total;
125   
126   /**
127    * Type of block (possible paramter), big-endian.
128    */
129   uint32_t btype;
130   
131   /**
132    * Flag set to GNUNET_YES to stop iteration.
133    */
134   int end_it;
135
136   /**
137    * Flag to indicate that there should only be one result.
138    */
139   int one_shot;
140 };
141
142
143 /**
144  * Context for all functions in this plugin.
145  */
146 struct Plugin 
147 {
148   /**
149    * Our execution environment.
150    */
151   struct GNUNET_DATASTORE_PluginEnvironment *env;
152
153   /**
154    * Native Postgres database handle.
155    */
156   PGconn *dbh;
157
158   /**
159    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
160    */
161   struct NextRequestClosure *next_task_nc;
162
163   /**
164    * Pending task with scheduler for running the next request.
165    */
166   GNUNET_SCHEDULER_TaskIdentifier next_task;
167
168 };
169
170
171 /**
172  * Check if the result obtained from Postgres has
173  * the desired status code.  If not, log an error, clear the
174  * result and return GNUNET_SYSERR.
175  * 
176  * @param plugin global context
177  * @param ret result to check
178  * @param expected_status expected return value
179  * @param command name of SQL command that was run
180  * @param args arguments to SQL command
181  * @param line line number for error reporting
182  * @return GNUNET_OK if the result is acceptable
183  */
184 static int
185 check_result (struct Plugin *plugin,
186               PGresult * ret,
187               int expected_status,
188               const char *command, const char *args, int line)
189 {
190   if (ret == NULL)
191     {
192       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
193                        "datastore-postgres",
194                        "Postgres failed to allocate result for `%s:%s' at %d\n",
195                        command, args, line);
196       return GNUNET_SYSERR;
197     }
198   if (PQresultStatus (ret) != expected_status)
199     {
200       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
201                        "datastore-postgres",
202                        _("`%s:%s' failed at %s:%d with error: %s"),
203                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
204       PQclear (ret);
205       return GNUNET_SYSERR;
206     }
207   return GNUNET_OK;
208 }
209
210 /**
211  * Run simple SQL statement (without results).
212  *
213  * @param plugin global context
214  * @param sql statement to run
215  * @param line code line for error reporting
216  */
217 static int
218 pq_exec (struct Plugin *plugin,
219          const char *sql, int line)
220 {
221   PGresult *ret;
222   ret = PQexec (plugin->dbh, sql);
223   if (GNUNET_OK != check_result (plugin,
224                                  ret, 
225                                  PGRES_COMMAND_OK, "PQexec", sql, line))
226     return GNUNET_SYSERR;
227   PQclear (ret);
228   return GNUNET_OK;
229 }
230
231 /**
232  * Prepare SQL statement.
233  *
234  * @param plugin global context
235  * @param name name for the prepared SQL statement
236  * @param sql SQL code to prepare
237  * @param nparams number of parameters in sql
238  * @param line code line for error reporting
239  * @return GNUNET_OK on success
240  */
241 static int
242 pq_prepare (struct Plugin *plugin,
243             const char *name, const char *sql, int nparams, int line)
244 {
245   PGresult *ret;
246   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
247   if (GNUNET_OK !=
248       check_result (plugin, 
249                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
250     return GNUNET_SYSERR;
251   PQclear (ret);
252   return GNUNET_OK;
253 }
254
255 /**
256  * @brief Get a database handle
257  *
258  * @param plugin global context
259  * @return GNUNET_OK on success, GNUNET_SYSERR on error
260  */
261 static int
262 init_connection (struct Plugin *plugin)
263 {
264   char *conninfo;
265   PGresult *ret;
266
267   /* Open database and precompile statements */
268   conninfo = NULL;
269   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
270                                          "datastore-postgres",
271                                          "CONFIG",
272                                          &conninfo);
273   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
274   if (NULL == plugin->dbh)
275     {
276       /* FIXME: warn about out-of-memory? */
277       GNUNET_free_non_null (conninfo);
278       return GNUNET_SYSERR;
279     }
280   if (PQstatus (plugin->dbh) != CONNECTION_OK)
281     {
282       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
283                        "datastore-postgres",
284                        _("Unable to initialize Postgres with configuration `%s': %s"),
285                        conninfo,
286                        PQerrorMessage (plugin->dbh));
287       PQfinish (plugin->dbh);
288       plugin->dbh = NULL;
289       GNUNET_free_non_null (conninfo);
290       return GNUNET_SYSERR;
291     }
292   GNUNET_free_non_null (conninfo);
293   ret = PQexec (plugin->dbh,
294                 "CREATE TABLE gn090 ("
295                 "  repl INTEGER NOT NULL DEFAULT 0,"
296                 "  type INTEGER NOT NULL DEFAULT 0,"
297                 "  prio INTEGER NOT NULL DEFAULT 0,"
298                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
299                 "  expire BIGINT NOT NULL DEFAULT 0,"
300                 "  hash BYTEA NOT NULL DEFAULT '',"
301                 "  vhash BYTEA NOT NULL DEFAULT '',"
302                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
303   if ( (ret == NULL) || 
304        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
305          (0 != strcmp ("42P07",    /* duplicate table */
306                        PQresultErrorField
307                        (ret,
308                         PG_DIAG_SQLSTATE)))))
309     {
310       (void) check_result (plugin,
311                            ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
312       PQfinish (plugin->dbh);
313       plugin->dbh = NULL;
314       return GNUNET_SYSERR;
315     }
316   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
317     {
318       if ((GNUNET_OK !=
319            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
320           (GNUNET_OK !=
321            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
322                     __LINE__))
323           || (GNUNET_OK !=
324               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
325           || (GNUNET_OK !=
326               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
327           || (GNUNET_OK !=
328               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
329                        __LINE__))
330           || (GNUNET_OK !=
331               pq_exec
332               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
333                __LINE__))
334           || (GNUNET_OK !=
335               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
336                        __LINE__)))
337         {
338           PQclear (ret);
339           PQfinish (plugin->dbh);
340           plugin->dbh = NULL;
341           return GNUNET_SYSERR;
342         }
343     }
344   PQclear (ret);
345   ret = PQexec (plugin->dbh,
346                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
347   if (GNUNET_OK != 
348       check_result (plugin,
349                     ret, PGRES_COMMAND_OK,
350                     "ALTER TABLE", "gn090", __LINE__))
351     {
352       PQfinish (plugin->dbh);
353       plugin->dbh = NULL;
354       return GNUNET_SYSERR;
355     }
356   PQclear (ret);
357   ret = PQexec (plugin->dbh,
358                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
359   if (GNUNET_OK !=
360       check_result (plugin,
361                     ret, PGRES_COMMAND_OK,
362                     "ALTER TABLE", "gn090", __LINE__))
363     {
364       PQfinish (plugin->dbh);
365       plugin->dbh = NULL;
366       return GNUNET_SYSERR;
367     }
368   PQclear (ret);
369   ret = PQexec (plugin->dbh,
370                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
371   if (GNUNET_OK !=
372       check_result (plugin,
373                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
374     {
375       PQfinish (plugin->dbh);
376       plugin->dbh = NULL;
377       return GNUNET_SYSERR;
378     }
379   PQclear (ret);
380   if ((GNUNET_OK !=
381        pq_prepare (plugin,
382                    "getvt",
383                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
384                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
385                    "ORDER BY oid ASC LIMIT 1 OFFSET $4",
386                    4,
387                    __LINE__)) ||
388       (GNUNET_OK !=
389        pq_prepare (plugin,
390                    "gett",
391                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
392                    "WHERE hash=$1 AND type=$2 "
393                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
394                    3,
395                    __LINE__)) ||
396       (GNUNET_OK !=
397        pq_prepare (plugin,
398                    "getv",
399                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
400                    "WHERE hash=$1 AND vhash=$2 "
401                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
402                    3,
403                    __LINE__)) ||
404       (GNUNET_OK !=
405        pq_prepare (plugin,
406                    "get",
407                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
408                    "WHERE hash=$1 "
409                    "ORDER BY oid ASC LIMIT 1 OFFSET $2",
410                    2,
411                    __LINE__)) ||
412       (GNUNET_OK !=
413        pq_prepare (plugin,
414                    "put",
415                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) "
416                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
417                    8,
418                    __LINE__)) ||
419       (GNUNET_OK !=
420        pq_prepare (plugin,
421                    "update",
422                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
423                    "WHERE oid = $3",
424                    3,
425                    __LINE__)) ||
426       (GNUNET_OK !=
427        pq_prepare (plugin,
428                    "decrepl",
429                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
430                    "WHERE oid = $1",
431                    1,
432                    __LINE__)) ||
433       (GNUNET_OK !=
434        pq_prepare (plugin,
435                    "select_non_anonymous",
436                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437                    "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
438                    1,
439                    __LINE__)) ||
440       (GNUNET_OK !=
441        pq_prepare (plugin,
442                    "select_expiration_order",
443                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
444                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "                      
445                    "UNION "                                             
446                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
447                    "ORDER BY prio ASC LIMIT 1) "                        
448                    "ORDER BY expire ASC LIMIT 1",
449                    1,
450                    __LINE__)) ||
451       (GNUNET_OK !=
452        pq_prepare (plugin,
453                    "select_replication_order",
454                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
455                    "ORDER BY repl DESC,RANDOM() LIMIT 1",
456                    0,
457                    __LINE__)) ||
458       (GNUNET_OK !=
459        pq_prepare (plugin,
460                    "delrow",
461                    "DELETE FROM gn090 " "WHERE oid=$1", 
462                    1,
463                    __LINE__)))
464     {
465       PQfinish (plugin->dbh);
466       plugin->dbh = NULL;
467       return GNUNET_SYSERR;
468     }
469   return GNUNET_OK;
470 }
471
472
473 /**
474  * Delete the row identified by the given rowid (qid
475  * in postgres).
476  *
477  * @param plugin global context
478  * @param rowid which row to delete
479  * @return GNUNET_OK on success
480  */
481 static int
482 delete_by_rowid (struct Plugin *plugin,
483                  unsigned int rowid)
484 {
485   const char *paramValues[] = { (const char *) &rowid };
486   int paramLengths[] = { sizeof (rowid) };
487   const int paramFormats[] = { 1 };
488   PGresult *ret;
489
490   ret = PQexecPrepared (plugin->dbh,
491                         "delrow",
492                         1, paramValues, paramLengths, paramFormats, 1);
493   if (GNUNET_OK !=
494       check_result (plugin,
495                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
496                     __LINE__))
497     {
498       return GNUNET_SYSERR;
499     }
500   PQclear (ret);
501   return GNUNET_OK;
502 }
503
504
505 /**
506  * Get an estimate of how much space the database is
507  * currently using.
508  *
509  * @param cls our "struct Plugin*"
510  * @return number of bytes used on disk
511  */
512 static unsigned long long
513 postgres_plugin_get_size (void *cls)
514 {
515   struct Plugin *plugin = cls;
516   unsigned long long total;
517   PGresult *ret;
518
519   ret = PQexecParams (plugin->dbh,
520                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
521                       0, NULL, NULL, NULL, NULL, 1);
522   if (GNUNET_OK != check_result (plugin,
523                                  ret,
524                                  PGRES_TUPLES_OK,
525                                  "PQexecParams",
526                                  "get_size",
527                                  __LINE__))
528     {
529       return 0;
530     }
531   if ((PQntuples (ret) != 1) ||
532       (PQnfields (ret) != 1) ||
533       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
534     {
535       GNUNET_break (0);
536       PQclear (ret);
537       return 0;
538     }
539   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
540   PQclear (ret);
541   return total;
542 }
543
544
545 /**
546  * Store an item in the datastore.
547  *
548  * @param cls closure
549  * @param key key for the item
550  * @param size number of bytes in data
551  * @param data content stored
552  * @param type type of the content
553  * @param priority priority of the content
554  * @param anonymity anonymity-level for the content
555  * @param replication replication-level for the content
556  * @param expiration expiration time for the content
557  * @param msg set to error message
558  * @return GNUNET_OK on success
559  */
560 static int
561 postgres_plugin_put (void *cls,
562                      const GNUNET_HashCode * key,
563                      uint32_t size,
564                      const void *data,
565                      enum GNUNET_BLOCK_Type type,
566                      uint32_t priority,
567                      uint32_t anonymity,
568                      uint32_t replication,
569                      struct GNUNET_TIME_Absolute expiration,
570                      char **msg)
571 {
572   struct Plugin *plugin = cls;
573   GNUNET_HashCode vhash;
574   PGresult *ret;
575   uint32_t btype = htonl (type);
576   uint32_t bprio = htonl (priority);
577   uint32_t banon = htonl (anonymity);
578   uint32_t brepl = htonl (replication);
579   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
580   const char *paramValues[] = {
581     (const char *) &brepl,
582     (const char *) &btype,
583     (const char *) &bprio,
584     (const char *) &banon,
585     (const char *) &bexpi,
586     (const char *) key,
587     (const char *) &vhash,
588     (const char *) data
589   };
590   int paramLengths[] = {
591     sizeof (brepl),
592     sizeof (btype),
593     sizeof (bprio),
594     sizeof (banon),
595     sizeof (bexpi),
596     sizeof (GNUNET_HashCode),
597     sizeof (GNUNET_HashCode),
598     size
599   };
600   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
601
602   GNUNET_CRYPTO_hash (data, size, &vhash);
603   ret = PQexecPrepared (plugin->dbh,
604                         "put", 8, paramValues, paramLengths, paramFormats, 1);
605   if (GNUNET_OK != check_result (plugin, ret,
606                                  PGRES_COMMAND_OK,
607                                  "PQexecPrepared", "put", __LINE__))
608     return GNUNET_SYSERR;
609   PQclear (ret);
610   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
611 #if DEBUG_POSTGRES
612   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
613                    "datastore-postgres",
614                    "Stored %u bytes in database\n",
615                    (unsigned int) size);
616 #endif
617   return GNUNET_OK;
618 }
619
620
621 /**
622  * Function invoked on behalf of a "PluginIterator"
623  * asking the database plugin to call the iterator
624  * with the next item.
625  *
626  * @param next_cls the 'struct NextRequestClosure'
627  * @param tc scheduler context
628  */
629 static void 
630 postgres_next_request_cont (void *next_cls,
631                             const struct GNUNET_SCHEDULER_TaskContext *tc)
632 {
633   struct NextRequestClosure *nrc = next_cls;
634   struct Plugin *plugin = nrc->plugin;
635   const int paramFormats[] = { 1, 1, 1, 1, 1 };
636   int iret;
637   PGresult *res;
638   enum GNUNET_BLOCK_Type type;
639   uint32_t anonymity;
640   uint32_t priority;
641   uint32_t size;
642   unsigned int rowid;
643   struct GNUNET_TIME_Absolute expiration_time;
644   GNUNET_HashCode key;
645
646   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
647   plugin->next_task_nc = NULL;
648   if ( (GNUNET_YES == nrc->end_it) ||
649        (nrc->count == nrc->total) )
650     {
651 #if DEBUG_POSTGRES
652       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
653                        "datastore-postgres",
654                        "Ending iteration (%s)\n",
655                        (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
656 #endif
657       nrc->iter (nrc->iter_cls, 
658                  NULL, NULL, 0, NULL, 0, 0, 0, 
659                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
660       GNUNET_free (nrc);
661       return;
662     }  
663   if (nrc->off == nrc->total)
664     nrc->off = 0;
665   nrc->blimit_off = GNUNET_htonll (nrc->off);
666   nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
667   res = PQexecPrepared (plugin->dbh,
668                         nrc->pname,
669                         nrc->nparams,
670                         nrc->paramValues, 
671                         nrc->paramLengths,
672                         paramFormats, 1);
673   if (GNUNET_OK != check_result (plugin,
674                                  res,
675                                  PGRES_TUPLES_OK,
676                                  "PQexecPrepared",
677                                  nrc->pname,
678                                  __LINE__))
679     {
680 #if DEBUG_POSTGRES
681       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
682                        "datastore-postgres",
683                        "Ending iteration (postgres error)\n");
684 #endif
685       nrc->iter (nrc->iter_cls, 
686                  NULL, NULL, 0, NULL, 0, 0, 0, 
687                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
688       GNUNET_free (nrc);
689       return;
690     }
691
692   if (0 == PQntuples (res))
693     {
694       /* no result */
695 #if DEBUG_POSTGRES
696       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
697                        "datastore-postgres",
698                        "Ending iteration (no more results)\n");
699 #endif
700       nrc->iter (nrc->iter_cls, 
701                  NULL, NULL, 0, NULL, 0, 0, 0, 
702                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
703       PQclear (res);
704       GNUNET_free (nrc);
705       return; 
706     }
707   if ((1 != PQntuples (res)) ||
708       (7 != PQnfields (res)) ||
709       (sizeof (uint32_t) != PQfsize (res, 0)) ||
710       (sizeof (uint32_t) != PQfsize (res, 6)))
711     {
712       GNUNET_break (0);
713       nrc->iter (nrc->iter_cls, 
714                  NULL, NULL, 0, NULL, 0, 0, 0, 
715                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
716       PQclear (res);
717       GNUNET_free (nrc);
718       return;
719     }
720   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
721   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
722       (sizeof (uint32_t) != PQfsize (res, 1)) ||
723       (sizeof (uint32_t) != PQfsize (res, 2)) ||
724       (sizeof (uint64_t) != PQfsize (res, 3)) ||
725       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
726     {
727       GNUNET_break (0);
728       PQclear (res);
729       delete_by_rowid (plugin, rowid);
730       nrc->iter (nrc->iter_cls, 
731                  NULL, NULL, 0, NULL, 0, 0, 0, 
732                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
733       GNUNET_free (nrc);
734       return;
735     }
736
737   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
738   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
739   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
740   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
741   memcpy (&key, 
742           PQgetvalue (res, 0, 4), 
743           sizeof (GNUNET_HashCode));
744   size = PQgetlength (res, 0, 5);
745 #if DEBUG_POSTGRES
746   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
747                    "datastore-postgres",
748                    "Found result of size %u bytes and type %u in database\n",
749                    (unsigned int) size,
750                    (unsigned int) type);
751 #endif
752   iret = nrc->iter (nrc->iter_cls,
753                     (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
754                     &key,
755                     size,
756                     PQgetvalue (res, 0, 5),
757                     (enum GNUNET_BLOCK_Type) type,
758                     priority,
759                     anonymity,
760                     expiration_time,
761                     rowid);
762   PQclear (res);
763   if (iret != GNUNET_NO)
764     {
765       nrc->count++;
766       nrc->off++;
767     }
768   if (iret == GNUNET_SYSERR)
769     {
770 #if DEBUG_POSTGRES
771       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
772                        "datastore-postgres",
773                        "Ending iteration (client error)\n");
774 #endif
775       return;
776     }
777   if (iret == GNUNET_NO)
778     {
779       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
780         {
781 #if DEBUG_POSTGRES
782           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
783                            "datastore-postgres",
784                            "Deleting %u bytes from database\n",
785                            (unsigned int) size);
786 #endif
787           plugin->env->duc (plugin->env->cls,
788                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
789 #if DEBUG_POSTGRES
790           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
791                            "datastore-postgres",
792                            "Deleted %u bytes from database\n",
793                            (unsigned int) size);
794 #endif
795         }
796     }
797   if (nrc->one_shot == GNUNET_YES) 
798     GNUNET_free (nrc);
799 }
800
801
802 /**
803  * Function invoked on behalf of a "PluginIterator"
804  * asking the database plugin to call the iterator
805  * with the next item.
806  *
807  * @param next_cls whatever argument was given
808  *        to the PluginIterator as "next_cls".
809  * @param end_it set to GNUNET_YES if we
810  *        should terminate the iteration early
811  *        (iterator should be still called once more
812  *         to signal the end of the iteration).
813  */
814 static void 
815 postgres_plugin_next_request (void *next_cls,
816                               int end_it)
817 {
818   struct NextRequestClosure *nrc = next_cls;
819
820   if (GNUNET_YES == end_it)
821     nrc->end_it = GNUNET_YES;
822   nrc->plugin->next_task_nc = nrc;
823   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
824                                                      nrc);
825 }
826
827
828 /**
829  * Iterate over the results for a particular key
830  * in the datastore.
831  *
832  * @param cls closure
833  * @param key maybe NULL (to match all entries)
834  * @param vhash hash of the value, maybe NULL (to
835  *        match all values that have the right key).
836  *        Note that for DBlocks there is no difference
837  *        betwen key and vhash, but for other blocks
838  *        there may be!
839  * @param type entries of which type are relevant?
840  *     Use 0 for any type.
841  * @param iter function to call on each matching value;
842  *        will be called once with a NULL value at the end
843  * @param iter_cls closure for iter
844  */
845 static void
846 postgres_plugin_get (void *cls,
847                      const GNUNET_HashCode * key,
848                      const GNUNET_HashCode * vhash,
849                      enum GNUNET_BLOCK_Type type,
850                      PluginIterator iter, void *iter_cls)
851 {
852   struct Plugin *plugin = cls;
853   struct NextRequestClosure *nrc;
854   const int paramFormats[] = { 1, 1, 1, 1, 1 };
855   PGresult *ret;
856
857   GNUNET_assert (key != NULL);
858   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
859   nrc->plugin = plugin;
860   nrc->iter = iter;
861   nrc->iter_cls = iter_cls;
862   nrc->key = *key;
863   if (vhash != NULL)
864     nrc->vhash = *vhash;
865   nrc->paramValues[0] = (const char*) &nrc->key;
866   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
867   nrc->btype = htonl (type);
868   if (type != 0)
869     {
870       if (vhash != NULL)
871         {
872           nrc->paramValues[1] = (const char *) &nrc->vhash;
873           nrc->paramLengths[1] = sizeof (nrc->vhash);
874           nrc->paramValues[2] = (const char *) &nrc->btype;
875           nrc->paramLengths[2] = sizeof (nrc->btype);
876           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
877           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
878           nrc->nparams = 4;
879           nrc->pname = "getvt";
880           ret = PQexecParams (plugin->dbh,
881                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
882                               3,
883                               NULL,
884                               nrc->paramValues, 
885                               nrc->paramLengths,
886                               paramFormats, 1);
887         }
888       else
889         {
890           nrc->paramValues[1] = (const char *) &nrc->btype;
891           nrc->paramLengths[1] = sizeof (nrc->btype);
892           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
893           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
894           nrc->nparams = 3;
895           nrc->pname = "gett";
896           ret = PQexecParams (plugin->dbh,
897                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
898                               2,
899                               NULL,
900                               nrc->paramValues, 
901                               nrc->paramLengths, 
902                               paramFormats, 1);
903         }
904     }
905   else
906     {
907       if (vhash != NULL)
908         {
909           nrc->paramValues[1] = (const char *) &nrc->vhash;
910           nrc->paramLengths[1] = sizeof (nrc->vhash);
911           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
912           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
913           nrc->nparams = 3;
914           nrc->pname = "getv";
915           ret = PQexecParams (plugin->dbh,
916                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
917                               2,
918                               NULL,
919                               nrc->paramValues, 
920                               nrc->paramLengths,
921                               paramFormats, 1);
922         }
923       else
924         {
925           nrc->paramValues[1] = (const char *) &nrc->blimit_off;
926           nrc->paramLengths[1] = sizeof (nrc->blimit_off);
927           nrc->nparams = 2;
928           nrc->pname = "get";
929           ret = PQexecParams (plugin->dbh,
930                               "SELECT count(*) FROM gn090 WHERE hash=$1",
931                               1,
932                               NULL,
933                               nrc->paramValues, 
934                               nrc->paramLengths,
935                               paramFormats, 1);
936         }
937     }
938   if (GNUNET_OK != check_result (plugin,
939                                  ret,
940                                  PGRES_TUPLES_OK,
941                                  "PQexecParams",
942                                  nrc->pname,
943                                  __LINE__))
944     {
945       iter (iter_cls, 
946             NULL, NULL, 0, NULL, 0, 0, 0, 
947             GNUNET_TIME_UNIT_ZERO_ABS, 0);
948       GNUNET_free (nrc);
949       return;
950     }
951   if ((PQntuples (ret) != 1) ||
952       (PQnfields (ret) != 1) ||
953       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
954     {
955       GNUNET_break (0);
956       PQclear (ret);
957       iter (iter_cls, 
958             NULL, NULL, 0, NULL, 0, 0, 0, 
959             GNUNET_TIME_UNIT_ZERO_ABS, 0);
960       GNUNET_free (nrc);
961       return;
962     }
963   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
964   PQclear (ret);
965   if (nrc->total == 0)
966     {
967       iter (iter_cls, 
968             NULL, NULL, 0, NULL, 0, 0, 0, 
969             GNUNET_TIME_UNIT_ZERO_ABS, 0);
970       GNUNET_free (nrc);
971       return;
972     }
973   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
974                                        nrc->total);
975   postgres_plugin_next_request (nrc,
976                                 GNUNET_NO);
977 }
978
979
980 /**
981  * Select a subset of the items in the datastore and call
982  * the given iterator for each of them.
983  *
984  * @param cls our "struct Plugin*"
985  * @param type entries of which type should be considered?
986  *        Use 0 for any type.
987  * @param iter function to call on each matching value;
988  *        will be called once with a NULL value at the end
989  * @param iter_cls closure for iter
990  */
991 static void
992 postgres_plugin_iter_zero_anonymity (void *cls,
993                                      enum GNUNET_BLOCK_Type type,
994                                      PluginIterator iter,
995                                      void *iter_cls)
996 {
997   struct Plugin *plugin = cls;
998   struct NextRequestClosure *nrc;
999
1000   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1001   nrc->total = UINT32_MAX;
1002   nrc->btype = htonl ((uint32_t) type);
1003   nrc->plugin = plugin;
1004   nrc->iter = iter;
1005   nrc->iter_cls = iter_cls;
1006   nrc->pname = "select_non_anonymous";
1007   nrc->nparams = 1;
1008   nrc->paramLengths[0] = sizeof (nrc->bcount);
1009   nrc->paramValues[0] = (const char*) &nrc->bcount;
1010   postgres_plugin_next_request (nrc,
1011                                 GNUNET_NO);
1012 }
1013
1014 /**
1015  * Context for 'repl_iter' function.
1016  */
1017 struct ReplCtx
1018 {
1019   
1020   /**
1021    * Plugin handle.
1022    */
1023   struct Plugin *plugin;
1024   
1025   /**
1026    * Function to call for the result (or the NULL).
1027    */
1028   PluginIterator iter;
1029   
1030   /**
1031    * Closure for iter.
1032    */
1033   void *iter_cls;
1034 };
1035
1036
1037 /**
1038  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1039  * Decrements the replication counter and calls the original
1040  * iterator.
1041  *
1042  * @param cls closure
1043  * @param next_cls closure to pass to the "next" function.
1044  * @param key key for the content
1045  * @param size number of bytes in data
1046  * @param data content stored
1047  * @param type type of the content
1048  * @param priority priority of the content
1049  * @param anonymity anonymity-level for the content
1050  * @param expiration expiration time for the content
1051  * @param uid unique identifier for the datum;
1052  *        maybe 0 if no unique identifier is available
1053  *
1054  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1055  *         (continue on call to "next", of course),
1056  *         GNUNET_NO to delete the item and continue (if supported)
1057  */
1058 static int
1059 repl_iter (void *cls,
1060            void *next_cls,
1061            const GNUNET_HashCode *key,
1062            uint32_t size,
1063            const void *data,
1064            enum GNUNET_BLOCK_Type type,
1065            uint32_t priority,
1066            uint32_t anonymity,
1067            struct GNUNET_TIME_Absolute expiration, 
1068            uint64_t uid)
1069 {
1070   struct ReplCtx *rc = cls;
1071   struct Plugin *plugin = rc->plugin;
1072   int ret;
1073   PGresult *qret;
1074   uint32_t boid;
1075
1076   ret = rc->iter (rc->iter_cls,
1077                   next_cls, key,
1078                   size, data, 
1079                   type, priority, anonymity, expiration,
1080                   uid);
1081   if (NULL != key)
1082     {
1083       boid = htonl ( (uint32_t) uid);
1084       const char *paramValues[] = {
1085         (const char *) &boid,
1086       };
1087       int paramLengths[] = {
1088         sizeof (boid),
1089       };
1090       const int paramFormats[] = { 1 };
1091       qret = PQexecPrepared (plugin->dbh,
1092                             "decrepl",
1093                             1, paramValues, paramLengths, paramFormats, 1);
1094       if (GNUNET_OK != check_result (plugin,
1095                                      qret,
1096                                      PGRES_COMMAND_OK,
1097                                      "PQexecPrepared", 
1098                                      "decrepl", __LINE__))
1099         return GNUNET_SYSERR;
1100       PQclear (qret);
1101     }
1102   return ret;
1103 }
1104
1105
1106 /**
1107  * Get a random item for replication.  Returns a single, not expired, random item
1108  * from those with the highest replication counters.  The item's 
1109  * replication counter is decremented by one IF it was positive before.
1110  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1111  *
1112  * @param cls closure
1113  * @param iter function to call the value (once only).
1114  * @param iter_cls closure for iter
1115  */
1116 static void
1117 postgres_plugin_replication_get (void *cls,
1118                                  PluginIterator iter, void *iter_cls)
1119 {
1120   struct Plugin *plugin = cls;
1121   struct NextRequestClosure *nrc;
1122   struct ReplCtx rc;
1123
1124   rc.plugin = plugin;
1125   rc.iter = iter;
1126   rc.iter_cls = iter_cls;
1127   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1128   nrc->one_shot = GNUNET_YES;
1129   nrc->total = 1;
1130   nrc->plugin = plugin;
1131   nrc->iter = &repl_iter;
1132   nrc->iter_cls = &rc;
1133   nrc->pname = "select_replication_order";
1134   nrc->nparams = 0;
1135   postgres_next_request_cont (nrc, NULL);
1136 }
1137
1138
1139 /**
1140  * Get a random item for expiration.
1141  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1142  *
1143  * @param cls closure
1144  * @param iter function to call the value (once only).
1145  * @param iter_cls closure for iter
1146  */
1147 static void
1148 postgres_plugin_expiration_get (void *cls,
1149                                 PluginIterator iter, void *iter_cls)
1150 {
1151   struct Plugin *plugin = cls;
1152   struct NextRequestClosure *nrc;
1153   uint64_t btime;
1154   
1155   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
1156   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1157   nrc->one_shot = GNUNET_YES;
1158   nrc->total = 1;
1159   nrc->plugin = plugin;
1160   nrc->iter = iter;
1161   nrc->iter_cls = iter_cls;
1162   nrc->pname = "select_expiration_order";
1163   nrc->nparams = 1;
1164   nrc->paramValues[0] = (const char *) &btime;
1165   nrc->paramLengths[0] = sizeof (btime);
1166   postgres_next_request_cont (nrc, NULL);
1167 }
1168
1169
1170 /**
1171  * Update the priority for a particular key in the datastore.  If
1172  * the expiration time in value is different than the time found in
1173  * the datastore, the higher value should be kept.  For the
1174  * anonymity level, the lower value is to be used.  The specified
1175  * priority should be added to the existing priority, ignoring the
1176  * priority in value.
1177  *
1178  * Note that it is possible for multiple values to match this put.
1179  * In that case, all of the respective values are updated.
1180  *
1181  * @param cls our "struct Plugin*"
1182  * @param uid unique identifier of the datum
1183  * @param delta by how much should the priority
1184  *     change?  If priority + delta < 0 the
1185  *     priority should be set to 0 (never go
1186  *     negative).
1187  * @param expire new expiration time should be the
1188  *     MAX of any existing expiration time and
1189  *     this value
1190  * @param msg set to error message
1191  * @return GNUNET_OK on success
1192  */
1193 static int
1194 postgres_plugin_update (void *cls,
1195                         uint64_t uid,
1196                         int delta, struct GNUNET_TIME_Absolute expire,
1197                         char **msg)
1198 {
1199   struct Plugin *plugin = cls;
1200   PGresult *ret;
1201   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1202   uint32_t boid = htonl ( (uint32_t) uid);
1203   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1204   const char *paramValues[] = {
1205     (const char *) &bdelta,
1206     (const char *) &bexpire,
1207     (const char *) &boid,
1208   };
1209   int paramLengths[] = {
1210     sizeof (bdelta),
1211     sizeof (bexpire),
1212     sizeof (boid),
1213   };
1214   const int paramFormats[] = { 1, 1, 1 };
1215
1216   ret = PQexecPrepared (plugin->dbh,
1217                         "update",
1218                         3, paramValues, paramLengths, paramFormats, 1);
1219   if (GNUNET_OK != check_result (plugin,
1220                                  ret,
1221                                  PGRES_COMMAND_OK,
1222                                  "PQexecPrepared", "update", __LINE__))
1223     return GNUNET_SYSERR;
1224   PQclear (ret);
1225   return GNUNET_OK;
1226 }
1227
1228
1229 /**
1230  * Drop database.
1231  */
1232 static void 
1233 postgres_plugin_drop (void *cls)
1234 {
1235   struct Plugin *plugin = cls;
1236
1237   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1238 }
1239
1240
1241 /**
1242  * Entry point for the plugin.
1243  *
1244  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1245  * @return our "struct Plugin*"
1246  */
1247 void *
1248 libgnunet_plugin_datastore_postgres_init (void *cls)
1249 {
1250   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1251   struct GNUNET_DATASTORE_PluginFunctions *api;
1252   struct Plugin *plugin;
1253
1254   plugin = GNUNET_malloc (sizeof (struct Plugin));
1255   plugin->env = env;
1256   if (GNUNET_OK != init_connection (plugin))
1257     {
1258       GNUNET_free (plugin);
1259       return NULL;
1260     }
1261   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1262   api->cls = plugin;
1263   api->get_size = &postgres_plugin_get_size;
1264   api->put = &postgres_plugin_put;
1265   api->next_request = &postgres_plugin_next_request;
1266   api->get = &postgres_plugin_get;
1267   api->replication_get = &postgres_plugin_replication_get;
1268   api->expiration_get = &postgres_plugin_expiration_get;
1269   api->update = &postgres_plugin_update;
1270   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1271   api->drop = &postgres_plugin_drop;
1272   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1273                    "datastore-postgres",
1274                    _("Postgres database running\n"));
1275   return api;
1276 }
1277
1278
1279 /**
1280  * Exit point from the plugin.
1281  * @param cls our "struct Plugin*"
1282  * @return always NULL
1283  */
1284 void *
1285 libgnunet_plugin_datastore_postgres_done (void *cls)
1286 {
1287   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1288   struct Plugin *plugin = api->cls;
1289   
1290   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1291     {
1292       GNUNET_SCHEDULER_cancel (plugin->next_task);
1293       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1294       GNUNET_free (plugin->next_task_nc);
1295       plugin->next_task_nc = NULL;
1296     }
1297   PQfinish (plugin->dbh);
1298   GNUNET_free (plugin);
1299   GNUNET_free (api);
1300   return NULL;
1301 }
1302
1303 /* end of plugin_datastore_postgres.c */