fix
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 #define SELECT_IT_LOW_PRIORITY "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
34                                "WHERE (prio = $1 AND oid > $2) "                        \
35                                "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36                                "UNION "\
37                                "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
38                                "WHERE (prio > $1 AND oid != $2)"\
39                                "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40                                "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
43                                 "WHERE (prio = $1 AND oid < $2)"\
44                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45                                 "UNION "\
46                                 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
47                                 "WHERE (prio < $1 AND oid != $2)"\
48                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49                                 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
52                                   "WHERE (expire = $1 AND oid > $2) "\
53                                   "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54                                   "UNION "\
55                                   "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
56                                   "WHERE (expire > $1 AND oid != $2) "          \
57                                   "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58                                   "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
62                                   "WHERE (expire = $1 AND oid < $2)"\
63                                   " AND expire > $3 AND type!=3"\
64                                   " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65                                   "UNION "\
66                                   "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
67                                   "WHERE (expire < $1 AND oid != $2)"           \
68                                   " AND expire > $3 AND type!=3"\
69                                   " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70                                   "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72 /**
73  * After how many ms "busy" should a DB operation fail for good?
74  * A low value makes sure that we are more responsive to requests
75  * (especially PUTs).  A high value guarantees a higher success
76  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
77  *
78  * The default value of 1s should ensure that users do not experience
79  * huge latencies while at the same time allowing operations to succeed
80  * with reasonable probability.
81  */
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
83
84
85 struct NextRequestClosure
86 {
87   struct Plugin *plugin;
88   PluginIterator iter;
89   void *iter_cls;
90   const char *paramValues[5];
91   const char *pname;
92   int paramLengths[5];
93   int nparams; 
94   uint64_t bnow;
95   GNUNET_HashCode key;
96   GNUNET_HashCode vhash;
97   long long count;
98   uint64_t off;
99   uint64_t blimit_off;
100   unsigned long long total;
101   uint64_t blast_expire;
102   uint32_t blast_rowid;
103   uint32_t blast_prio;
104   uint32_t btype;
105   int end_it;
106 };
107
108
109 /**
110  * Context for all functions in this plugin.
111  */
112 struct Plugin 
113 {
114   /**
115    * Our execution environment.
116    */
117   struct GNUNET_DATASTORE_PluginEnvironment *env;
118
119   /**
120    * Native Postgres database handle.
121    */
122   PGconn *dbh;
123
124   /**
125    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
126    */
127   struct NextRequestClosure *next_task_nc;
128
129   /**
130    * Pending task with scheduler for running the next request.
131    */
132   GNUNET_SCHEDULER_TaskIdentifier next_task;
133
134   unsigned long long payload;
135
136   unsigned int lastSync;
137   
138 };
139
140
141 /**
142  * Check if the result obtained from Postgres has
143  * the desired status code.  If not, log an error, clear the
144  * result and return GNUNET_SYSERR.
145  * 
146  * @return GNUNET_OK if the result is acceptable
147  */
148 static int
149 check_result (struct Plugin *plugin,
150               PGresult * ret,
151               int expected_status,
152               const char *command, const char *args, int line)
153 {
154   if (ret == NULL)
155     {
156       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
157                        "datastore-postgres",
158                        "Postgres failed to allocate result for `%s:%s' at %d\n",
159                        command, args, line);
160       return GNUNET_SYSERR;
161     }
162   if (PQresultStatus (ret) != expected_status)
163     {
164       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
165                        "datastore-postgres",
166                        _("`%s:%s' failed at %s:%d with error: %s"),
167                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
168       PQclear (ret);
169       return GNUNET_SYSERR;
170     }
171   return GNUNET_OK;
172 }
173
174 /**
175  * Run simple SQL statement (without results).
176  */
177 static int
178 pq_exec (struct Plugin *plugin,
179          const char *sql, int line)
180 {
181   PGresult *ret;
182   ret = PQexec (plugin->dbh, sql);
183   if (GNUNET_OK != check_result (plugin,
184                                  ret, 
185                                  PGRES_COMMAND_OK, "PQexec", sql, line))
186     return GNUNET_SYSERR;
187   PQclear (ret);
188   return GNUNET_OK;
189 }
190
191 /**
192  * Prepare SQL statement.
193  */
194 static int
195 pq_prepare (struct Plugin *plugin,
196             const char *name, const char *sql, int nparms, int line)
197 {
198   PGresult *ret;
199   ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
200   if (GNUNET_OK !=
201       check_result (plugin, 
202                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
203     return GNUNET_SYSERR;
204   PQclear (ret);
205   return GNUNET_OK;
206 }
207
208 /**
209  * @brief Get a database handle
210  * @return GNUNET_OK on success, GNUNET_SYSERR on error
211  */
212 static int
213 init_connection (struct Plugin *plugin)
214 {
215   char *conninfo;
216   PGresult *ret;
217
218   /* Open database and precompile statements */
219   conninfo = NULL;
220   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
221                                          "datastore-postgres",
222                                          "CONFIG",
223                                          &conninfo);
224   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
225   GNUNET_free_non_null (conninfo);
226   if (NULL == plugin->dbh)
227     {
228       /* FIXME: warn about out-of-memory? */
229       return GNUNET_SYSERR;
230     }
231   if (PQstatus (plugin->dbh) != CONNECTION_OK)
232     {
233       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
234                        "postgres",
235                        _("Unable to initialize Postgres: %s"),
236                        PQerrorMessage (plugin->dbh));
237       PQfinish (plugin->dbh);
238       plugin->dbh = NULL;
239       return GNUNET_SYSERR;
240     }
241   ret = PQexec (plugin->dbh,
242                 "CREATE TABLE gn080 ("
243                 "  size INTEGER NOT NULL DEFAULT 0,"
244                 "  type INTEGER NOT NULL DEFAULT 0,"
245                 "  prio INTEGER NOT NULL DEFAULT 0,"
246                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
247                 "  expire BIGINT NOT NULL DEFAULT 0,"
248                 "  hash BYTEA NOT NULL DEFAULT '',"
249                 "  vhash BYTEA NOT NULL DEFAULT '',"
250                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
251   if ( (ret == NULL) || 
252        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
253          (0 != strcmp ("42P07",    /* duplicate table */
254                        PQresultErrorField
255                        (ret,
256                         PG_DIAG_SQLSTATE)))))
257     {
258       check_result (plugin,
259                     ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn080", __LINE__);
260       PQfinish (plugin->dbh);
261       plugin->dbh = NULL;
262       return GNUNET_SYSERR;
263     }
264   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
265     {
266       if ((GNUNET_OK !=
267            pq_exec (plugin, "CREATE INDEX idx_hash ON gn080 (hash)", __LINE__)) ||
268           (GNUNET_OK !=
269            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)",
270                     __LINE__))
271           || (GNUNET_OK !=
272               pq_exec (plugin, "CREATE INDEX idx_prio ON gn080 (prio)", __LINE__))
273           || (GNUNET_OK !=
274               pq_exec (plugin, "CREATE INDEX idx_expire ON gn080 (expire)", __LINE__))
275           || (GNUNET_OK !=
276               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)",
277                        __LINE__))
278           || (GNUNET_OK !=
279               pq_exec
280               (plugin, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
281                __LINE__))
282           || (GNUNET_OK !=
283               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)",
284                        __LINE__)))
285         {
286           PQclear (ret);
287           PQfinish (plugin->dbh);
288           plugin->dbh = NULL;
289           return GNUNET_SYSERR;
290         }
291     }
292   PQclear (ret);
293 #if 1
294   ret = PQexec (plugin->dbh,
295                 "ALTER TABLE gn080 ALTER value SET STORAGE EXTERNAL");
296   if (GNUNET_OK != 
297       check_result (plugin,
298                     ret, PGRES_COMMAND_OK,
299                     "ALTER TABLE", "gn080", __LINE__))
300     {
301       PQfinish (plugin->dbh);
302       plugin->dbh = NULL;
303       return GNUNET_SYSERR;
304     }
305   PQclear (ret);
306   ret = PQexec (plugin->dbh,
307                 "ALTER TABLE gn080 ALTER hash SET STORAGE PLAIN");
308   if (GNUNET_OK !=
309       check_result (plugin,
310                     ret, PGRES_COMMAND_OK,
311                     "ALTER TABLE", "gn080", __LINE__))
312     {
313       PQfinish (plugin->dbh);
314       plugin->dbh = NULL;
315       return GNUNET_SYSERR;
316     }
317   PQclear (ret);
318   ret = PQexec (plugin->dbh,
319                 "ALTER TABLE gn080 ALTER vhash SET STORAGE PLAIN");
320   if (GNUNET_OK !=
321       check_result (plugin,
322                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn080", __LINE__))
323     {
324       PQfinish (plugin->dbh);
325       plugin->dbh = NULL;
326       return GNUNET_SYSERR;
327     }
328   PQclear (ret);
329 #endif
330   if ((GNUNET_OK !=
331        pq_prepare (plugin,
332                    "getvt",
333                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
334                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
335                    "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
336                    5,
337                    __LINE__)) ||
338       (GNUNET_OK !=
339        pq_prepare (plugin,
340                    "gett",
341                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
342                    "WHERE hash=$1 AND type=$2"
343                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
344                    4,
345                    __LINE__)) ||
346       (GNUNET_OK !=
347        pq_prepare (plugin,
348                    "getv",
349                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
350                    "WHERE hash=$1 AND vhash=$2"
351                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
352                    4,
353                    __LINE__)) ||
354       (GNUNET_OK !=
355        pq_prepare (plugin,
356                    "get",
357                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
358                    "WHERE hash=$1"
359                    "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
360                    3,
361                    __LINE__)) ||
362       (GNUNET_OK !=
363        pq_prepare (plugin,
364                    "put",
365                    "INSERT INTO gn080 (size, type, prio, anonLevel, expire, hash, vhash, value) "
366                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
367                    8,
368                    __LINE__)) ||
369       (GNUNET_OK !=
370        pq_prepare (plugin,
371                    "update",
372                    "UPDATE gn080 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
373                    "WHERE oid = $3",
374                    3,
375                    __LINE__)) ||
376       (GNUNET_OK !=
377        pq_prepare (plugin,
378                    "select_low_priority",
379                    SELECT_IT_LOW_PRIORITY,
380                    2,
381                    __LINE__)) ||
382       (GNUNET_OK !=
383        pq_prepare (plugin,
384                    "select_non_anonymous",
385                    SELECT_IT_NON_ANONYMOUS,
386                    2,
387                    __LINE__)) ||
388       (GNUNET_OK !=
389        pq_prepare (plugin,
390                    "select_expiration_time",
391                    SELECT_IT_EXPIRATION_TIME,
392                    2,
393                    __LINE__)) ||
394       (GNUNET_OK !=
395        pq_prepare (plugin,
396                    "select_migration_order",
397                    SELECT_IT_MIGRATION_ORDER,
398                    3,
399                    __LINE__)) ||
400       (GNUNET_OK !=
401        pq_prepare (plugin,
402                    "delrow",
403                    "DELETE FROM gn080 " "WHERE oid=$1", 1, __LINE__)))
404     {
405       PQfinish (plugin->dbh);
406       plugin->dbh = NULL;
407       return GNUNET_SYSERR;
408     }
409   return GNUNET_OK;
410 }
411
412
413 /**
414  * Delete the row identified by the given rowid (qid
415  * in postgres).
416  *
417  * @return GNUNET_OK on success
418  */
419 static int
420 delete_by_rowid (struct Plugin *plugin,
421                  unsigned int rowid)
422 {
423   const char *paramValues[] = { (const char *) &rowid };
424   int paramLengths[] = { sizeof (rowid) };
425   const int paramFormats[] = { 1 };
426   PGresult *ret;
427
428   ret = PQexecPrepared (plugin->dbh,
429                         "delrow",
430                         1, paramValues, paramLengths, paramFormats, 1);
431   if (GNUNET_OK !=
432       check_result (plugin,
433                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
434                     __LINE__))
435     {
436       return GNUNET_SYSERR;
437     }
438   PQclear (ret);
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Get an estimate of how much space the database is
445  * currently using.
446  *
447  * @param cls our "struct Plugin*"
448  * @return number of bytes used on disk
449  */
450 static unsigned long long
451 postgres_plugin_get_size (void *cls)
452 {
453   struct Plugin *plugin = cls;
454   double ret;
455
456   ret = plugin->payload;
457   return (unsigned long long) (ret * 1.00);
458   /* benchmarking shows XX% overhead */
459 }
460
461
462 /**
463  * Store an item in the datastore.
464  *
465  * @param cls closure
466  * @param key key for the item
467  * @param size number of bytes in data
468  * @param data content stored
469  * @param type type of the content
470  * @param priority priority of the content
471  * @param anonymity anonymity-level for the content
472  * @param expiration expiration time for the content
473  * @param msg set to error message
474  * @return GNUNET_OK on success
475  */
476 static int
477 postgres_plugin_put (void *cls,
478                      const GNUNET_HashCode * key,
479                      uint32_t size,
480                      const void *data,
481                      enum GNUNET_BLOCK_Type type,
482                      uint32_t priority,
483                      uint32_t anonymity,
484                      struct GNUNET_TIME_Absolute expiration,
485                      char **msg)
486 {
487   struct Plugin *plugin = cls;
488   GNUNET_HashCode vhash;
489   PGresult *ret;
490   uint32_t bsize = htonl (size);
491   uint32_t btype = htonl (type);
492   uint32_t bprio = htonl (priority);
493   uint32_t banon = htonl (anonymity);
494   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
495   const char *paramValues[] = {
496     (const char *) &bsize,
497     (const char *) &btype,
498     (const char *) &bprio,
499     (const char *) &banon,
500     (const char *) &bexpi,
501     (const char *) key,
502     (const char *) &vhash,
503     (const char *) data
504   };
505   int paramLengths[] = {
506     sizeof (bsize),
507     sizeof (btype),
508     sizeof (bprio),
509     sizeof (banon),
510     sizeof (bexpi),
511     sizeof (GNUNET_HashCode),
512     sizeof (GNUNET_HashCode),
513     size
514   };
515   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
516
517   GNUNET_CRYPTO_hash (data, size, &vhash);
518   ret = PQexecPrepared (plugin->dbh,
519                         "put", 8, paramValues, paramLengths, paramFormats, 1);
520   if (GNUNET_OK != check_result (plugin, ret,
521                                  PGRES_COMMAND_OK,
522                                  "PQexecPrepared", "put", __LINE__))
523     return GNUNET_SYSERR;
524   PQclear (ret);
525   plugin->payload += size;
526   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
527                    "postgres",
528                    "Stored %u bytes in database, new payload is %llu\n",
529                    (unsigned int) size,
530                    (unsigned long long) plugin->payload);
531   return GNUNET_OK;
532 }
533
534 /**
535  * Function invoked on behalf of a "PluginIterator"
536  * asking the database plugin to call the iterator
537  * with the next item.
538  *
539  * @param cls the 'struct NextRequestClosure'
540  * @param tc scheduler context
541  */
542 static void 
543 postgres_next_request_cont (void *next_cls,
544                             const struct GNUNET_SCHEDULER_TaskContext *tc)
545 {
546   struct NextRequestClosure *nrc = next_cls;
547   struct Plugin *plugin = nrc->plugin;
548   const int paramFormats[] = { 1, 1, 1, 1, 1 };
549   int iret;
550   PGresult *res;
551   enum GNUNET_BLOCK_Type type;
552   uint32_t anonymity;
553   uint32_t priority;
554   uint32_t size;
555   unsigned int rowid;
556   struct GNUNET_TIME_Absolute expiration_time;
557   GNUNET_HashCode key;
558
559   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
560   plugin->next_task_nc = NULL;
561   if ( (GNUNET_YES == nrc->end_it) ||
562        (nrc->count == nrc->total) )
563     {
564       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
565                        "postgres",
566                        "Ending iteration (%s)\n",
567                        (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
568       nrc->iter (nrc->iter_cls, 
569                  NULL, NULL, 0, NULL, 0, 0, 0, 
570                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
571       GNUNET_free (nrc);
572       return;
573     }
574   
575   if (nrc->count == 0)
576     nrc->blimit_off = GNUNET_htonll (nrc->off);
577   else
578     nrc->blimit_off = GNUNET_htonll (0);
579   if (nrc->count + nrc->off == nrc->total)
580     nrc->blast_rowid = htonl (0); /* back to start */
581   
582   res = PQexecPrepared (plugin->dbh,
583                         nrc->pname,
584                         nrc->nparams,
585                         nrc->paramValues, 
586                         nrc->paramLengths,
587                         paramFormats, 1);
588   if (GNUNET_OK != check_result (plugin,
589                                  res,
590                                  PGRES_TUPLES_OK,
591                                  "PQexecPrepared",
592                                  nrc->pname,
593                                  __LINE__))
594     {
595       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
596                        "postgres",
597                        "Ending iteration (postgres error)\n");
598       nrc->iter (nrc->iter_cls, 
599                  NULL, NULL, 0, NULL, 0, 0, 0, 
600                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
601       GNUNET_free (nrc);
602       return;
603     }
604
605   if (0 == PQntuples (res))
606     {
607       /* no result */
608       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
609                        "postgres",
610                        "Ending iteration (no more results)\n");
611       nrc->iter (nrc->iter_cls, 
612                  NULL, NULL, 0, NULL, 0, 0, 0, 
613                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
614       PQclear (res);
615       GNUNET_free (nrc);
616       return; 
617     }
618   if ((1 != PQntuples (res)) ||
619       (8 != PQnfields (res)) ||
620       (sizeof (uint32_t) != PQfsize (res, 0)) ||
621       (sizeof (uint32_t) != PQfsize (res, 7)))
622     {
623       GNUNET_break (0);
624       nrc->iter (nrc->iter_cls, 
625                  NULL, NULL, 0, NULL, 0, 0, 0, 
626                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
627       PQclear (res);
628       GNUNET_free (nrc);
629       return;
630     }
631   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 7));
632   size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
633   if ((sizeof (uint32_t) != PQfsize (res, 1)) ||
634       (sizeof (uint32_t) != PQfsize (res, 2)) ||
635       (sizeof (uint32_t) != PQfsize (res, 3)) ||
636       (sizeof (uint64_t) != PQfsize (res, 4)) ||
637       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 5)) ||
638       (size != PQgetlength (res, 0, 6)))
639     {
640       GNUNET_break (0);
641       PQclear (res);
642       delete_by_rowid (plugin, rowid);
643       nrc->iter (nrc->iter_cls, 
644                  NULL, NULL, 0, NULL, 0, 0, 0, 
645                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
646       GNUNET_free (nrc);
647       return;
648     }
649
650   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
651   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
652   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 3));
653   expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 4));
654   size = PQgetlength (res, 0, 6);
655   memcpy (&key, PQgetvalue (res, 0, 5), sizeof (GNUNET_HashCode));
656
657   nrc->blast_prio = htonl (priority);
658   nrc->blast_expire = GNUNET_htonll (expiration_time.value);
659   nrc->blast_rowid = htonl (rowid);
660   nrc->count++;
661
662   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
663                    "postgres",
664                    "Found result of size %u bytes and type %u in database\n",
665                    (unsigned int) size,
666                    (unsigned int) type);
667   iret = nrc->iter (nrc->iter_cls,
668                     nrc,
669                     &key,
670                     size,
671                     PQgetvalue (res, 0, 6),
672                     (enum GNUNET_BLOCK_Type) type,
673                     priority,
674                     anonymity,
675                     expiration_time,
676                     rowid);
677   PQclear (res);
678   if (iret == GNUNET_SYSERR)
679     {
680       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
681                        "postgres",
682                        "Ending iteration (client error)\n");
683       return;
684     }
685   if (iret == GNUNET_NO)
686     {
687       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
688         {
689           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
690                            "postgres",
691                            "Deleting %u bytes from database, current payload is %llu\n",
692                            (unsigned int) size,
693                            (unsigned long long) plugin->payload);
694           GNUNET_assert (plugin->payload >= size);
695           plugin->payload -= size;
696           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
697                            "postgres",
698                            "Deleted %u bytes from database, new payload is %llu\n",
699                            (unsigned int) size,
700                            (unsigned long long) plugin->payload);
701         }
702     }
703 }
704
705
706 /**
707  * Function invoked on behalf of a "PluginIterator"
708  * asking the database plugin to call the iterator
709  * with the next item.
710  *
711  * @param next_cls whatever argument was given
712  *        to the PluginIterator as "next_cls".
713  * @param end_it set to GNUNET_YES if we
714  *        should terminate the iteration early
715  *        (iterator should be still called once more
716  *         to signal the end of the iteration).
717  */
718 static void 
719 postgres_plugin_next_request (void *next_cls,
720                               int end_it)
721 {
722   struct NextRequestClosure *nrc = next_cls;
723
724   if (GNUNET_YES == end_it)
725     nrc->end_it = GNUNET_YES;
726   nrc->plugin->next_task_nc = nrc;
727   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
728                                                      &postgres_next_request_cont,
729                                                      nrc);
730 }
731
732
733 /**
734  * Update the priority for a particular key in the datastore.  If
735  * the expiration time in value is different than the time found in
736  * the datastore, the higher value should be kept.  For the
737  * anonymity level, the lower value is to be used.  The specified
738  * priority should be added to the existing priority, ignoring the
739  * priority in value.
740  *
741  * Note that it is possible for multiple values to match this put.
742  * In that case, all of the respective values are updated.
743  *
744  * @param cls our "struct Plugin*"
745  * @param uid unique identifier of the datum
746  * @param delta by how much should the priority
747  *     change?  If priority + delta < 0 the
748  *     priority should be set to 0 (never go
749  *     negative).
750  * @param expire new expiration time should be the
751  *     MAX of any existing expiration time and
752  *     this value
753  * @param msg set to error message
754  * @return GNUNET_OK on success
755  */
756 static int
757 postgres_plugin_update (void *cls,
758                         uint64_t uid,
759                         int delta, struct GNUNET_TIME_Absolute expire,
760                         char **msg)
761 {
762   struct Plugin *plugin = cls;
763   PGresult *ret;
764   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
765   uint32_t boid = htonl ( (uint32_t) uid);
766   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
767   const char *paramValues[] = {
768     (const char *) &bdelta,
769     (const char *) &bexpire,
770     (const char *) &boid,
771   };
772   int paramLengths[] = {
773     sizeof (bdelta),
774     sizeof (bexpire),
775     sizeof (boid),
776   };
777   const int paramFormats[] = { 1, 1, 1 };
778
779   ret = PQexecPrepared (plugin->dbh,
780                         "update",
781                         3, paramValues, paramLengths, paramFormats, 1);
782   if (GNUNET_OK != check_result (plugin,
783                                  ret,
784                                  PGRES_COMMAND_OK,
785                                  "PQexecPrepared", "update", __LINE__))
786     return GNUNET_SYSERR;
787   PQclear (ret);
788   return GNUNET_OK;
789 }
790
791
792 /**
793  * Call a method for each key in the database and
794  * call the callback method on it.
795  *
796  * @param type entries of which type should be considered?
797  * @param iter maybe NULL (to just count); iter
798  *     should return GNUNET_SYSERR to abort the
799  *     iteration, GNUNET_NO to delete the entry and
800  *     continue and GNUNET_OK to continue iterating
801  */
802 static void
803 postgres_iterate (struct Plugin *plugin,
804                   unsigned int type,
805                   int is_asc,
806                   unsigned int iter_select,
807                   PluginIterator iter, void *iter_cls)
808 {
809   struct NextRequestClosure *nrc;
810
811   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
812   nrc->plugin = plugin;
813   nrc->iter = iter;
814   nrc->iter_cls = iter_cls;
815   if (is_asc)
816     {
817       nrc->blast_prio = htonl (0);
818       nrc->blast_rowid = htonl (0);
819       nrc->blast_expire = htonl (0);
820     }
821   else
822     {
823       nrc->blast_prio = htonl (0x7FFFFFFFL);
824       nrc->blast_rowid = htonl (0xFFFFFFFF);
825       nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
826     }
827   switch (iter_select)
828     {
829     case 0:
830       nrc->pname = "select_low_priority";
831       nrc->nparams = 2;
832       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
833       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
834       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
835       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
836       break;
837     case 1:
838       nrc->pname = "select_non_anonymous";
839       nrc->nparams = 2;
840       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
841       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
842       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
843       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
844       break;
845     case 2:
846       nrc->pname = "select_expiration_time";
847       nrc->nparams = 2;
848       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
849       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
850       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
851       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
852       break;
853     case 3:
854       nrc->pname = "select_migration_order";
855       nrc->nparams = 3;
856       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
857       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
858       nrc->paramValues[2] = (const char *) &nrc->bnow;
859       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
860       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
861       nrc->paramLengths[2] = sizeof (nrc->bnow);
862       break;
863     default:
864       GNUNET_break (0);
865       iter (iter_cls, 
866             NULL, NULL, 0, NULL, 0, 0, 0, 
867             GNUNET_TIME_UNIT_ZERO_ABS, 0);
868       return;
869     }
870   nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
871   postgres_plugin_next_request (nrc,
872                                 GNUNET_NO);
873 }
874
875
876 /**
877  * Select a subset of the items in the datastore and call
878  * the given iterator for each of them.
879  *
880  * @param cls our "struct Plugin*"
881  * @param type entries of which type should be considered?
882  *        Use 0 for any type.
883  * @param iter function to call on each matching value;
884  *        will be called once with a NULL value at the end
885  * @param iter_cls closure for iter
886  */
887 static void
888 postgres_plugin_iter_low_priority (void *cls,
889                                    enum GNUNET_BLOCK_Type type,
890                                    PluginIterator iter,
891                                    void *iter_cls)
892 {
893   struct Plugin *plugin = cls;
894   
895   postgres_iterate (plugin,
896                     type, 
897                     GNUNET_YES, 0, 
898                     iter, iter_cls);
899 }
900
901
902
903
904 /**
905  * Iterate over the results for a particular key
906  * in the datastore.
907  *
908  * @param cls closure
909  * @param key maybe NULL (to match all entries)
910  * @param vhash hash of the value, maybe NULL (to
911  *        match all values that have the right key).
912  *        Note that for DBlocks there is no difference
913  *        betwen key and vhash, but for other blocks
914  *        there may be!
915  * @param type entries of which type are relevant?
916  *     Use 0 for any type.
917  * @param iter function to call on each matching value;
918  *        will be called once with a NULL value at the end
919  * @param iter_cls closure for iter
920  */
921 static void
922 postgres_plugin_get (void *cls,
923                      const GNUNET_HashCode * key,
924                      const GNUNET_HashCode * vhash,
925                      enum GNUNET_BLOCK_Type type,
926                      PluginIterator iter, void *iter_cls)
927 {
928   struct Plugin *plugin = cls;
929   struct NextRequestClosure *nrc;
930   const int paramFormats[] = { 1, 1, 1, 1, 1 };
931   PGresult *ret;
932
933   if (key == NULL)
934     {
935       postgres_plugin_iter_low_priority (plugin, type, 
936                                          iter, iter_cls);
937       return;
938     }
939   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
940   nrc->plugin = plugin;
941   nrc->iter = iter;
942   nrc->iter_cls = iter_cls;
943   nrc->key = *key;
944   if (vhash != NULL)
945     nrc->vhash = *vhash;
946   nrc->paramValues[0] = (const char*) &nrc->key;
947   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
948   nrc->btype = htonl (type);
949   if (type != 0)
950     {
951       if (vhash != NULL)
952         {
953           nrc->paramValues[1] = (const char *) &nrc->vhash;
954           nrc->paramLengths[1] = sizeof (nrc->vhash);
955           nrc->paramValues[2] = (const char *) &nrc->btype;
956           nrc->paramLengths[2] = sizeof (nrc->btype);
957           nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
958           nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
959           nrc->paramValues[4] = (const char *) &nrc->blimit_off;
960           nrc->paramLengths[4] = sizeof (nrc->blimit_off);
961           nrc->nparams = 5;
962           nrc->pname = "getvt";
963           ret = PQexecParams (plugin->dbh,
964                               "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2 AND type=$3",
965                               3,
966                               NULL,
967                               nrc->paramValues, 
968                               nrc->paramLengths,
969                               paramFormats, 1);
970         }
971       else
972         {
973           nrc->paramValues[1] = (const char *) &nrc->btype;
974           nrc->paramLengths[1] = sizeof (nrc->btype);
975           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
976           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
977           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
978           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
979           nrc->nparams = 4;
980           nrc->pname = "gett";
981           ret = PQexecParams (plugin->dbh,
982                               "SELECT count(*) FROM gn080 WHERE hash=$1 AND type=$2",
983                               2,
984                               NULL,
985                               nrc->paramValues, 
986                               nrc->paramLengths, 
987                               paramFormats, 1);
988         }
989     }
990   else
991     {
992       if (vhash != NULL)
993         {
994           nrc->paramValues[1] = (const char *) &nrc->vhash;
995           nrc->paramLengths[1] = sizeof (nrc->vhash);
996           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
997           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
998           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
999           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1000           nrc->nparams = 4;
1001           nrc->pname = "getv";
1002           ret = PQexecParams (plugin->dbh,
1003                               "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2",
1004                               2,
1005                               NULL,
1006                               nrc->paramValues, 
1007                               nrc->paramLengths,
1008                               paramFormats, 1);
1009         }
1010       else
1011         {
1012           nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1013           nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1014           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1015           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1016           nrc->nparams = 3;
1017           nrc->pname = "get";
1018           ret = PQexecParams (plugin->dbh,
1019                               "SELECT count(*) FROM gn080 WHERE hash=$1",
1020                               1,
1021                               NULL,
1022                               nrc->paramValues, 
1023                               nrc->paramLengths,
1024                               paramFormats, 1);
1025         }
1026     }
1027   if (GNUNET_OK != check_result (plugin,
1028                                  ret,
1029                                  PGRES_TUPLES_OK,
1030                                  "PQexecParams",
1031                                  nrc->pname,
1032                                  __LINE__))
1033     {
1034       iter (iter_cls, 
1035             NULL, NULL, 0, NULL, 0, 0, 0, 
1036             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1037       return;
1038     }
1039   if ((PQntuples (ret) != 1) ||
1040       (PQnfields (ret) != 1) ||
1041       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1042     {
1043       GNUNET_break (0);
1044       PQclear (ret);
1045       iter (iter_cls, 
1046             NULL, NULL, 0, NULL, 0, 0, 0, 
1047             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1048       return;
1049     }
1050   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1051   fprintf (stderr, "Total number of results: %llu\n",
1052            (unsigned long long) nrc->total);
1053   PQclear (ret);
1054   if (nrc->total == 0)
1055     {
1056       iter (iter_cls, 
1057             NULL, NULL, 0, NULL, 0, 0, 0, 
1058             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1059       return;
1060     }
1061   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1062                                        nrc->total);
1063   postgres_plugin_next_request (nrc,
1064                                 GNUNET_NO);
1065 }
1066
1067
1068 /**
1069  * Select a subset of the items in the datastore and call
1070  * the given iterator for each of them.
1071  *
1072  * @param cls our "struct Plugin*"
1073  * @param type entries of which type should be considered?
1074  *        Use 0 for any type.
1075  * @param iter function to call on each matching value;
1076  *        will be called once with a NULL value at the end
1077  * @param iter_cls closure for iter
1078  */
1079 static void
1080 postgres_plugin_iter_zero_anonymity (void *cls,
1081                                      enum GNUNET_BLOCK_Type type,
1082                                      PluginIterator iter,
1083                                      void *iter_cls)
1084 {
1085   struct Plugin *plugin = cls;
1086
1087   postgres_iterate (plugin, 
1088                     type, GNUNET_NO, 1,
1089                     iter, iter_cls);
1090 }
1091
1092
1093 /**
1094  * Select a subset of the items in the datastore and call
1095  * the given iterator for each of them.
1096  *
1097  * @param cls our "struct Plugin*"
1098  * @param type entries of which type should be considered?
1099  *        Use 0 for any type.
1100  * @param iter function to call on each matching value;
1101  *        will be called once with a NULL value at the end
1102  * @param iter_cls closure for iter
1103  */
1104 static void
1105 postgres_plugin_iter_ascending_expiration (void *cls,
1106                                            enum GNUNET_BLOCK_Type type,
1107                                            PluginIterator iter,
1108                                            void *iter_cls)
1109 {
1110   struct Plugin *plugin = cls;
1111
1112   postgres_iterate (plugin, type, GNUNET_YES, 2,
1113                     iter, iter_cls);
1114 }
1115
1116
1117
1118 /**
1119  * Select a subset of the items in the datastore and call
1120  * the given iterator for each of them.
1121  *
1122  * @param cls our "struct Plugin*"
1123  * @param type entries of which type should be considered?
1124  *        Use 0 for any type.
1125  * @param iter function to call on each matching value;
1126  *        will be called once with a NULL value at the end
1127  * @param iter_cls closure for iter
1128  */
1129 static void
1130 postgres_plugin_iter_migration_order (void *cls,
1131                                       enum GNUNET_BLOCK_Type type,
1132                                       PluginIterator iter,
1133                                       void *iter_cls)
1134 {
1135   struct Plugin *plugin = cls;
1136
1137   postgres_iterate (plugin, 0, GNUNET_NO, 3, 
1138                     iter, iter_cls);
1139 }
1140
1141
1142
1143 /**
1144  * Select a subset of the items in the datastore and call
1145  * the given iterator for each of them.
1146  *
1147  * @param cls our "struct Plugin*"
1148  * @param type entries of which type should be considered?
1149  *        Use 0 for any type.
1150  * @param iter function to call on each matching value;
1151  *        will be called once with a NULL value at the end
1152  * @param iter_cls closure for iter
1153  */
1154 static void
1155 postgres_plugin_iter_all_now (void *cls,
1156                               enum GNUNET_BLOCK_Type type,
1157                               PluginIterator iter,
1158                               void *iter_cls)
1159 {
1160   struct Plugin *plugin = cls;
1161
1162   postgres_iterate (plugin, 
1163                     0, GNUNET_YES, 0, 
1164                     iter, iter_cls);
1165 }
1166
1167
1168 /**
1169  * Drop database.
1170  */
1171 static void 
1172 postgres_plugin_drop (void *cls)
1173 {
1174   struct Plugin *plugin = cls;
1175
1176   pq_exec (plugin, "DROP TABLE gn080", __LINE__);
1177 }
1178
1179
1180 /**
1181  * Entry point for the plugin.
1182  *
1183  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1184  * @return our "struct Plugin*"
1185  */
1186 void *
1187 libgnunet_plugin_datastore_postgres_init (void *cls)
1188 {
1189   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1190   struct GNUNET_DATASTORE_PluginFunctions *api;
1191   struct Plugin *plugin;
1192
1193   plugin = GNUNET_malloc (sizeof (struct Plugin));
1194   plugin->env = env;
1195   if (GNUNET_OK != init_connection (plugin))
1196     {
1197       GNUNET_free (plugin);
1198       return NULL;
1199     }
1200   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1201   api->cls = plugin;
1202   api->get_size = &postgres_plugin_get_size;
1203   api->put = &postgres_plugin_put;
1204   api->next_request = &postgres_plugin_next_request;
1205   api->get = &postgres_plugin_get;
1206   api->update = &postgres_plugin_update;
1207   api->iter_low_priority = &postgres_plugin_iter_low_priority;
1208   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1209   api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1210   api->iter_migration_order = &postgres_plugin_iter_migration_order;
1211   api->iter_all_now = &postgres_plugin_iter_all_now;
1212   api->drop = &postgres_plugin_drop;
1213   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1214                    "postgres", _("Postgres database running\n"));
1215   return api;
1216 }
1217
1218
1219 /**
1220  * Exit point from the plugin.
1221  * @param cls our "struct Plugin*"
1222  * @return always NULL
1223  */
1224 void *
1225 libgnunet_plugin_datastore_postgres_done (void *cls)
1226 {
1227   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1228   struct Plugin *plugin = api->cls;
1229   
1230   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1231     {
1232       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1233                                plugin->next_task);
1234       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1235       GNUNET_free (plugin->next_task_nc);
1236       plugin->next_task_nc = NULL;
1237     }
1238   PQfinish (plugin->dbh);
1239   GNUNET_free (plugin);
1240   GNUNET_free (api);
1241   return NULL;
1242 }
1243
1244 /* end of plugin_datastore_postgres.c */