use big endian
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 #define SELECT_IT_LOW_PRIORITY "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
34                                "WHERE (prio = $1 AND oid > $2) "                        \
35                                "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36                                "UNION "\
37                                "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
38                                "WHERE (prio > $1 AND oid != $2)"\
39                                "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40                                "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
43                                 "WHERE (prio = $1 AND oid < $2)"\
44                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45                                 "UNION "\
46                                 "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
47                                 "WHERE (prio < $1 AND oid != $2)"\
48                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49                                 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
52                                   "WHERE (expire = $1 AND oid > $2) "\
53                                   "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54                                   "UNION "\
55                                   "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
56                                   "WHERE (expire > $1 AND oid != $2) "          \
57                                   "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58                                   "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
62                                   "WHERE (expire = $1 AND oid < $2)"\
63                                   " AND expire > $3 AND type!=3"\
64                                   " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65                                   "UNION "\
66                                   "(SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "\
67                                   "WHERE (expire < $1 AND oid != $2)"           \
68                                   " AND expire > $3 AND type!=3"\
69                                   " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70                                   "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72 /**
73  * After how many ms "busy" should a DB operation fail for good?
74  * A low value makes sure that we are more responsive to requests
75  * (especially PUTs).  A high value guarantees a higher success
76  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
77  *
78  * The default value of 1s should ensure that users do not experience
79  * huge latencies while at the same time allowing operations to succeed
80  * with reasonable probability.
81  */
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
83
84
85 struct NextRequestClosure
86 {
87   struct Plugin *plugin;
88   PluginIterator iter;
89   void *iter_cls;
90   const char *paramValues[5];
91   const char *pname;
92   int paramLengths[5];
93   int nparams; 
94   uint64_t bnow;
95   GNUNET_HashCode key;
96   GNUNET_HashCode vhash;
97   long long count;
98   uint64_t off;
99   uint64_t blimit_off;
100   unsigned long long total;
101   uint64_t blast_expire;
102   uint32_t blast_rowid;
103   uint32_t blast_prio;
104   uint32_t btype;
105   int end_it;
106 };
107
108
109 /**
110  * Context for all functions in this plugin.
111  */
112 struct Plugin 
113 {
114   /**
115    * Our execution environment.
116    */
117   struct GNUNET_DATASTORE_PluginEnvironment *env;
118
119   /**
120    * Native Postgres database handle.
121    */
122   PGconn *dbh;
123
124   /**
125    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
126    */
127   struct NextRequestClosure *next_task_nc;
128
129   /**
130    * Pending task with scheduler for running the next request.
131    */
132   GNUNET_SCHEDULER_TaskIdentifier next_task;
133
134   unsigned long long payload;
135
136   unsigned int lastSync;
137   
138 };
139
140
141 /**
142  * Check if the result obtained from Postgres has
143  * the desired status code.  If not, log an error, clear the
144  * result and return GNUNET_SYSERR.
145  * 
146  * @return GNUNET_OK if the result is acceptable
147  */
148 static int
149 check_result (struct Plugin *plugin,
150               PGresult * ret,
151               int expected_status,
152               const char *command, const char *args, int line)
153 {
154   if (ret == NULL)
155     {
156       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
157                        "datastore-postgres",
158                        "Postgres failed to allocate result for `%s:%s' at %d\n",
159                        command, args, line);
160       return GNUNET_SYSERR;
161     }
162   if (PQresultStatus (ret) != expected_status)
163     {
164       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
165                        "datastore-postgres",
166                        _("`%s:%s' failed at %s:%d with error: %s"),
167                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
168       PQclear (ret);
169       return GNUNET_SYSERR;
170     }
171   return GNUNET_OK;
172 }
173
174 /**
175  * Run simple SQL statement (without results).
176  */
177 static int
178 pq_exec (struct Plugin *plugin,
179          const char *sql, int line)
180 {
181   PGresult *ret;
182   ret = PQexec (plugin->dbh, sql);
183   if (GNUNET_OK != check_result (plugin,
184                                  ret, 
185                                  PGRES_COMMAND_OK, "PQexec", sql, line))
186     return GNUNET_SYSERR;
187   PQclear (ret);
188   return GNUNET_OK;
189 }
190
191 /**
192  * Prepare SQL statement.
193  */
194 static int
195 pq_prepare (struct Plugin *plugin,
196             const char *name, const char *sql, int nparms, int line)
197 {
198   PGresult *ret;
199   ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
200   if (GNUNET_OK !=
201       check_result (plugin, 
202                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
203     return GNUNET_SYSERR;
204   PQclear (ret);
205   return GNUNET_OK;
206 }
207
208 /**
209  * @brief Get a database handle
210  * @return GNUNET_OK on success, GNUNET_SYSERR on error
211  */
212 static int
213 init_connection (struct Plugin *plugin)
214 {
215   char *conninfo;
216   PGresult *ret;
217
218   /* Open database and precompile statements */
219   conninfo = NULL;
220   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
221                                          "datastore-postgres",
222                                          "CONFIG",
223                                          &conninfo);
224   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
225   GNUNET_free_non_null (conninfo);
226   if (NULL == plugin->dbh)
227     {
228       /* FIXME: warn about out-of-memory? */
229       return GNUNET_SYSERR;
230     }
231   if (PQstatus (plugin->dbh) != CONNECTION_OK)
232     {
233       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
234                        "postgres",
235                        _("Unable to initialize Postgres: %s"),
236                        PQerrorMessage (plugin->dbh));
237       PQfinish (plugin->dbh);
238       plugin->dbh = NULL;
239       return GNUNET_SYSERR;
240     }
241   ret = PQexec (plugin->dbh,
242                 "CREATE TABLE gn080 ("
243                 "  size INTEGER NOT NULL DEFAULT 0,"
244                 "  type INTEGER NOT NULL DEFAULT 0,"
245                 "  prio INTEGER NOT NULL DEFAULT 0,"
246                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
247                 "  expire BIGINT NOT NULL DEFAULT 0,"
248                 "  hash BYTEA NOT NULL DEFAULT '',"
249                 "  vhash BYTEA NOT NULL DEFAULT '',"
250                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
251   if ( (ret == NULL) || 
252        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
253          (0 != strcmp ("42P07",    /* duplicate table */
254                        PQresultErrorField
255                        (ret,
256                         PG_DIAG_SQLSTATE)))))
257     {
258       check_result (plugin,
259                     ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn080", __LINE__);
260       PQfinish (plugin->dbh);
261       plugin->dbh = NULL;
262       return GNUNET_SYSERR;
263     }
264   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
265     {
266       if ((GNUNET_OK !=
267            pq_exec (plugin, "CREATE INDEX idx_hash ON gn080 (hash)", __LINE__)) ||
268           (GNUNET_OK !=
269            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)",
270                     __LINE__))
271           || (GNUNET_OK !=
272               pq_exec (plugin, "CREATE INDEX idx_prio ON gn080 (prio)", __LINE__))
273           || (GNUNET_OK !=
274               pq_exec (plugin, "CREATE INDEX idx_expire ON gn080 (expire)", __LINE__))
275           || (GNUNET_OK !=
276               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)",
277                        __LINE__))
278           || (GNUNET_OK !=
279               pq_exec
280               (plugin, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
281                __LINE__))
282           || (GNUNET_OK !=
283               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)",
284                        __LINE__)))
285         {
286           PQclear (ret);
287           PQfinish (plugin->dbh);
288           plugin->dbh = NULL;
289           return GNUNET_SYSERR;
290         }
291     }
292   PQclear (ret);
293 #if 1
294   ret = PQexec (plugin->dbh,
295                 "ALTER TABLE gn080 ALTER value SET STORAGE EXTERNAL");
296   if (GNUNET_OK != 
297       check_result (plugin,
298                     ret, PGRES_COMMAND_OK,
299                     "ALTER TABLE", "gn080", __LINE__))
300     {
301       PQfinish (plugin->dbh);
302       plugin->dbh = NULL;
303       return GNUNET_SYSERR;
304     }
305   PQclear (ret);
306   ret = PQexec (plugin->dbh,
307                 "ALTER TABLE gn080 ALTER hash SET STORAGE PLAIN");
308   if (GNUNET_OK !=
309       check_result (plugin,
310                     ret, PGRES_COMMAND_OK,
311                     "ALTER TABLE", "gn080", __LINE__))
312     {
313       PQfinish (plugin->dbh);
314       plugin->dbh = NULL;
315       return GNUNET_SYSERR;
316     }
317   PQclear (ret);
318   ret = PQexec (plugin->dbh,
319                 "ALTER TABLE gn080 ALTER vhash SET STORAGE PLAIN");
320   if (GNUNET_OK !=
321       check_result (plugin,
322                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn080", __LINE__))
323     {
324       PQfinish (plugin->dbh);
325       plugin->dbh = NULL;
326       return GNUNET_SYSERR;
327     }
328   PQclear (ret);
329 #endif
330   if ((GNUNET_OK !=
331        pq_prepare (plugin,
332                    "getvt",
333                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
334                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
335                    "AND oid >= $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
336                    5,
337                    __LINE__)) ||
338       (GNUNET_OK !=
339        pq_prepare (plugin,
340                    "gett",
341                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
342                    "WHERE hash=$1 AND type=$2"
343                    "AND oid >= $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
344                    4,
345                    __LINE__)) ||
346       (GNUNET_OK !=
347        pq_prepare (plugin,
348                    "getv",
349                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
350                    "WHERE hash=$1 AND vhash=$2"
351                    "AND oid >= $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
352                    4,
353                    __LINE__)) ||
354       (GNUNET_OK !=
355        pq_prepare (plugin,
356                    "get",
357                    "SELECT size, type, prio, anonLevel, expire, hash, value, oid FROM gn080 "
358                    "WHERE hash=$1"
359                    "AND oid >= $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
360                    3,
361                    __LINE__)) ||
362       (GNUNET_OK !=
363        pq_prepare (plugin,
364                    "put",
365                    "INSERT INTO gn080 (size, type, prio, anonLevel, expire, hash, vhash, value) "
366                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
367                    8,
368                    __LINE__)) ||
369       (GNUNET_OK !=
370        pq_prepare (plugin,
371                    "update",
372                    "UPDATE gn080 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
373                    "WHERE oid = $3",
374                    3,
375                    __LINE__)) ||
376       (GNUNET_OK !=
377        pq_prepare (plugin,
378                    "select_low_priority",
379                    SELECT_IT_LOW_PRIORITY,
380                    2,
381                    __LINE__)) ||
382       (GNUNET_OK !=
383        pq_prepare (plugin,
384                    "select_non_anonymous",
385                    SELECT_IT_NON_ANONYMOUS,
386                    2,
387                    __LINE__)) ||
388       (GNUNET_OK !=
389        pq_prepare (plugin,
390                    "select_expiration_time",
391                    SELECT_IT_EXPIRATION_TIME,
392                    2,
393                    __LINE__)) ||
394       (GNUNET_OK !=
395        pq_prepare (plugin,
396                    "select_migration_order",
397                    SELECT_IT_MIGRATION_ORDER,
398                    3,
399                    __LINE__)) ||
400       (GNUNET_OK !=
401        pq_prepare (plugin,
402                    "delrow",
403                    "DELETE FROM gn080 " "WHERE oid=$1", 1, __LINE__)))
404     {
405       PQfinish (plugin->dbh);
406       plugin->dbh = NULL;
407       return GNUNET_SYSERR;
408     }
409   return GNUNET_OK;
410 }
411
412
413 /**
414  * Delete the row identified by the given rowid (qid
415  * in postgres).
416  *
417  * @return GNUNET_OK on success
418  */
419 static int
420 delete_by_rowid (struct Plugin *plugin,
421                  unsigned int rowid)
422 {
423   const char *paramValues[] = { (const char *) &rowid };
424   int paramLengths[] = { sizeof (rowid) };
425   const int paramFormats[] = { 1 };
426   PGresult *ret;
427
428   ret = PQexecPrepared (plugin->dbh,
429                         "delrow",
430                         1, paramValues, paramLengths, paramFormats, 1);
431   if (GNUNET_OK !=
432       check_result (plugin,
433                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
434                     __LINE__))
435     {
436       return GNUNET_SYSERR;
437     }
438   PQclear (ret);
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Get an estimate of how much space the database is
445  * currently using.
446  *
447  * @param cls our "struct Plugin*"
448  * @return number of bytes used on disk
449  */
450 static unsigned long long
451 postgres_plugin_get_size (void *cls)
452 {
453   struct Plugin *plugin = cls;
454   double ret;
455
456   ret = plugin->payload;
457   return (unsigned long long) (ret * 1.00);
458   /* benchmarking shows XX% overhead */
459 }
460
461
462 /**
463  * Store an item in the datastore.
464  *
465  * @param cls closure
466  * @param key key for the item
467  * @param size number of bytes in data
468  * @param data content stored
469  * @param type type of the content
470  * @param priority priority of the content
471  * @param anonymity anonymity-level for the content
472  * @param expiration expiration time for the content
473  * @param msg set to error message
474  * @return GNUNET_OK on success
475  */
476 static int
477 postgres_plugin_put (void *cls,
478                      const GNUNET_HashCode * key,
479                      uint32_t size,
480                      const void *data,
481                      enum GNUNET_BLOCK_Type type,
482                      uint32_t priority,
483                      uint32_t anonymity,
484                      struct GNUNET_TIME_Absolute expiration,
485                      char **msg)
486 {
487   struct Plugin *plugin = cls;
488   GNUNET_HashCode vhash;
489   PGresult *ret;
490   uint32_t bsize = htonl (size);
491   uint32_t btype = htonl (type);
492   uint32_t bprio = htonl (priority);
493   uint32_t banon = htonl (anonymity);
494   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
495   const char *paramValues[] = {
496     (const char *) &bsize,
497     (const char *) &btype,
498     (const char *) &bprio,
499     (const char *) &banon,
500     (const char *) &bexpi,
501     (const char *) key,
502     (const char *) &vhash,
503     (const char *) data
504   };
505   int paramLengths[] = {
506     sizeof (bsize),
507     sizeof (btype),
508     sizeof (bprio),
509     sizeof (banon),
510     sizeof (bexpi),
511     sizeof (GNUNET_HashCode),
512     sizeof (GNUNET_HashCode),
513     size
514   };
515   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
516
517   GNUNET_CRYPTO_hash (data, size, &vhash);
518   ret = PQexecPrepared (plugin->dbh,
519                         "put", 8, paramValues, paramLengths, paramFormats, 1);
520   if (GNUNET_OK != check_result (plugin, ret,
521                                  PGRES_COMMAND_OK,
522                                  "PQexecPrepared", "put", __LINE__))
523     return GNUNET_SYSERR;
524   PQclear (ret);
525   plugin->payload += size;
526   return GNUNET_OK;
527 }
528
529 /**
530  * Function invoked on behalf of a "PluginIterator"
531  * asking the database plugin to call the iterator
532  * with the next item.
533  *
534  * @param cls the 'struct NextRequestClosure'
535  * @param tc scheduler context
536  */
537 static void 
538 postgres_next_request_cont (void *next_cls,
539                             const struct GNUNET_SCHEDULER_TaskContext *tc)
540 {
541   struct NextRequestClosure *nrc = next_cls;
542   struct Plugin *plugin = nrc->plugin;
543   const int paramFormats[] = { 1, 1, 1, 1, 1 };
544   int iret;
545   PGresult *res;
546   enum GNUNET_BLOCK_Type type;
547   uint32_t anonymity;
548   uint32_t priority;
549   uint32_t size;
550   unsigned int rowid;
551   struct GNUNET_TIME_Absolute expiration_time;
552   GNUNET_HashCode key;
553
554   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
555   plugin->next_task_nc = NULL;
556   if (GNUNET_YES == nrc->end_it) 
557     {
558       nrc->iter (nrc->iter_cls, 
559                  NULL, NULL, 0, NULL, 0, 0, 0, 
560                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
561       GNUNET_free (nrc);
562       return;
563     }
564
565   
566   if (nrc->count == 0)
567     nrc->blimit_off = GNUNET_htonll (nrc->off);
568   else
569     nrc->blimit_off = GNUNET_htonll (0);
570   
571   res = PQexecPrepared (plugin->dbh,
572                         nrc->pname,
573                         nrc->nparams,
574                         nrc->paramValues, 
575                         nrc->paramLengths,
576                         paramFormats, 1);
577   if (GNUNET_OK != check_result (plugin,
578                                  res,
579                                  PGRES_TUPLES_OK,
580                                  "PQexecPrepared",
581                                  nrc->pname,
582                                  __LINE__))
583     {
584       nrc->iter (nrc->iter_cls, 
585                  NULL, NULL, 0, NULL, 0, 0, 0, 
586                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
587       GNUNET_free (nrc);
588       return;
589     }
590
591   if (0 == PQntuples (res))
592     {
593       /* no result */
594       nrc->iter (nrc->iter_cls, 
595                  NULL, NULL, 0, NULL, 0, 0, 0, 
596                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
597       PQclear (res);
598       GNUNET_free (nrc);
599       return; 
600     }
601   if ((1 != PQntuples (res)) ||
602       (8 != PQnfields (res)) ||
603       (sizeof (uint32_t) != PQfsize (res, 0)) ||
604       (sizeof (uint32_t) != PQfsize (res, 7)))
605     {
606       GNUNET_break (0);
607       nrc->iter (nrc->iter_cls, 
608                  NULL, NULL, 0, NULL, 0, 0, 0, 
609                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
610       PQclear (res);
611       GNUNET_free (nrc);
612       return;
613     }
614   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 7));
615   size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
616   if ((sizeof (uint32_t) != PQfsize (res, 1)) ||
617       (sizeof (uint32_t) != PQfsize (res, 2)) ||
618       (sizeof (uint32_t) != PQfsize (res, 3)) ||
619       (sizeof (uint64_t) != PQfsize (res, 4)) ||
620       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 5)) ||
621       (size != PQgetlength (res, 0, 6)))
622     {
623       GNUNET_break (0);
624       PQclear (res);
625       delete_by_rowid (plugin, rowid);
626       nrc->iter (nrc->iter_cls, 
627                  NULL, NULL, 0, NULL, 0, 0, 0, 
628                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
629       GNUNET_free (nrc);
630       return;
631     }
632
633   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
634   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
635   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 3));
636   expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 4));
637   size = PQgetlength (res, 0, 6);
638   memcpy (&key, PQgetvalue (res, 0, 5), sizeof (GNUNET_HashCode));
639
640   nrc->blast_prio = htonl (priority);
641   nrc->blast_expire = GNUNET_htonll (expiration_time.value);
642   nrc->blast_rowid = htonl (rowid + 1);
643   nrc->count++;
644   iret = nrc->iter (nrc->iter_cls,
645                     nrc,
646                     &key,
647                     size,
648                     PQgetvalue (res, 0, 6),
649                     (enum GNUNET_BLOCK_Type) type,
650                     priority,
651                     anonymity,
652                     expiration_time,
653                     rowid);
654   PQclear (res);
655   if (iret == GNUNET_SYSERR)
656     return;
657   if (iret == GNUNET_NO)
658     {
659       plugin->payload -= size;
660       delete_by_rowid (plugin, rowid);
661     }
662 }
663
664
665 /**
666  * Function invoked on behalf of a "PluginIterator"
667  * asking the database plugin to call the iterator
668  * with the next item.
669  *
670  * @param next_cls whatever argument was given
671  *        to the PluginIterator as "next_cls".
672  * @param end_it set to GNUNET_YES if we
673  *        should terminate the iteration early
674  *        (iterator should be still called once more
675  *         to signal the end of the iteration).
676  */
677 static void 
678 postgres_plugin_next_request (void *next_cls,
679                               int end_it)
680 {
681   struct NextRequestClosure *nrc = next_cls;
682
683   if (GNUNET_YES == end_it)
684     nrc->end_it = GNUNET_YES;
685   nrc->plugin->next_task_nc = nrc;
686   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
687                                                      &postgres_next_request_cont,
688                                                      nrc);
689 }
690
691
692 /**
693  * Update the priority for a particular key in the datastore.  If
694  * the expiration time in value is different than the time found in
695  * the datastore, the higher value should be kept.  For the
696  * anonymity level, the lower value is to be used.  The specified
697  * priority should be added to the existing priority, ignoring the
698  * priority in value.
699  *
700  * Note that it is possible for multiple values to match this put.
701  * In that case, all of the respective values are updated.
702  *
703  * @param cls our "struct Plugin*"
704  * @param uid unique identifier of the datum
705  * @param delta by how much should the priority
706  *     change?  If priority + delta < 0 the
707  *     priority should be set to 0 (never go
708  *     negative).
709  * @param expire new expiration time should be the
710  *     MAX of any existing expiration time and
711  *     this value
712  * @param msg set to error message
713  * @return GNUNET_OK on success
714  */
715 static int
716 postgres_plugin_update (void *cls,
717                         uint64_t uid,
718                         int delta, struct GNUNET_TIME_Absolute expire,
719                         char **msg)
720 {
721   struct Plugin *plugin = cls;
722   PGresult *ret;
723   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
724   uint32_t boid = htonl ( (uint32_t) uid);
725   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
726   const char *paramValues[] = {
727     (const char *) &bdelta,
728     (const char *) &bexpire,
729     (const char *) &boid,
730   };
731   int paramLengths[] = {
732     sizeof (bdelta),
733     sizeof (bexpire),
734     sizeof (boid),
735   };
736   const int paramFormats[] = { 1, 1, 1 };
737
738   ret = PQexecPrepared (plugin->dbh,
739                         "update",
740                         3, paramValues, paramLengths, paramFormats, 1);
741   if (GNUNET_OK != check_result (plugin,
742                                  ret,
743                                  PGRES_COMMAND_OK,
744                                  "PQexecPrepared", "update", __LINE__))
745     return GNUNET_SYSERR;
746   PQclear (ret);
747   return GNUNET_OK;
748 }
749
750
751 /**
752  * Call a method for each key in the database and
753  * call the callback method on it.
754  *
755  * @param type entries of which type should be considered?
756  * @param iter maybe NULL (to just count); iter
757  *     should return GNUNET_SYSERR to abort the
758  *     iteration, GNUNET_NO to delete the entry and
759  *     continue and GNUNET_OK to continue iterating
760  */
761 static void
762 postgres_iterate (struct Plugin *plugin,
763                   unsigned int type,
764                   int is_asc,
765                   unsigned int iter_select,
766                   PluginIterator iter, void *iter_cls)
767 {
768   struct NextRequestClosure *nrc;
769
770   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
771   nrc->plugin = plugin;
772   nrc->iter = iter;
773   nrc->iter_cls = iter_cls;
774   if (is_asc)
775     {
776       nrc->blast_prio = htonl (0);
777       nrc->blast_rowid = htonl (0);
778       nrc->blast_expire = htonl (0);
779     }
780   else
781     {
782       nrc->blast_prio = htonl (0x7FFFFFFFL);
783       nrc->blast_rowid = htonl (0xFFFFFFFF);
784       nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
785     }
786   switch (iter_select)
787     {
788     case 0:
789       nrc->pname = "select_low_priority";
790       nrc->nparams = 2;
791       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
792       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
793       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
794       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
795       break;
796     case 1:
797       nrc->pname = "select_non_anonymous";
798       nrc->nparams = 2;
799       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
800       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
801       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
802       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
803       break;
804     case 2:
805       nrc->pname = "select_expiration_time";
806       nrc->nparams = 2;
807       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
808       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
809       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
810       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
811       break;
812     case 3:
813       nrc->pname = "select_migration_order";
814       nrc->nparams = 3;
815       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
816       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
817       nrc->paramValues[2] = (const char *) &nrc->bnow;
818       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
819       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
820       nrc->paramLengths[2] = sizeof (nrc->bnow);
821       break;
822     default:
823       GNUNET_break (0);
824       iter (iter_cls, 
825             NULL, NULL, 0, NULL, 0, 0, 0, 
826             GNUNET_TIME_UNIT_ZERO_ABS, 0);
827       return;
828     }
829   nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
830   postgres_plugin_next_request (nrc,
831                                 GNUNET_NO);
832 }
833
834
835 /**
836  * Select a subset of the items in the datastore and call
837  * the given iterator for each of them.
838  *
839  * @param cls our "struct Plugin*"
840  * @param type entries of which type should be considered?
841  *        Use 0 for any type.
842  * @param iter function to call on each matching value;
843  *        will be called once with a NULL value at the end
844  * @param iter_cls closure for iter
845  */
846 static void
847 postgres_plugin_iter_low_priority (void *cls,
848                                    enum GNUNET_BLOCK_Type type,
849                                    PluginIterator iter,
850                                    void *iter_cls)
851 {
852   struct Plugin *plugin = cls;
853   
854   postgres_iterate (plugin,
855                     type, 
856                     GNUNET_YES, 0, 
857                     iter, iter_cls);
858 }
859
860
861
862
863 /**
864  * Iterate over the results for a particular key
865  * in the datastore.
866  *
867  * @param cls closure
868  * @param key maybe NULL (to match all entries)
869  * @param vhash hash of the value, maybe NULL (to
870  *        match all values that have the right key).
871  *        Note that for DBlocks there is no difference
872  *        betwen key and vhash, but for other blocks
873  *        there may be!
874  * @param type entries of which type are relevant?
875  *     Use 0 for any type.
876  * @param iter function to call on each matching value;
877  *        will be called once with a NULL value at the end
878  * @param iter_cls closure for iter
879  */
880 static void
881 postgres_plugin_get (void *cls,
882                      const GNUNET_HashCode * key,
883                      const GNUNET_HashCode * vhash,
884                      enum GNUNET_BLOCK_Type type,
885                      PluginIterator iter, void *iter_cls)
886 {
887   struct Plugin *plugin = cls;
888   struct NextRequestClosure *nrc;
889   const int paramFormats[] = { 1, 1, 1, 1, 1 };
890   PGresult *ret;
891
892   if (key == NULL)
893     {
894       postgres_plugin_iter_low_priority (plugin, type, 
895                                          iter, iter_cls);
896       return;
897     }
898   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
899   nrc->plugin = plugin;
900   nrc->iter = iter;
901   nrc->iter_cls = iter_cls;
902   nrc->key = *key;
903   if (vhash != NULL)
904     nrc->vhash = *vhash;
905   nrc->paramValues[0] = (const char*) &nrc->key;
906   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
907   nrc->btype = htonl (type);
908   if (type != 0)
909     {
910       if (vhash != NULL)
911         {
912           nrc->paramValues[1] = (const char *) &nrc->vhash;
913           nrc->paramLengths[1] = sizeof (nrc->vhash);
914           nrc->paramValues[2] = (const char *) &nrc->btype;
915           nrc->paramLengths[2] = sizeof (nrc->btype);
916           nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
917           nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
918           nrc->paramValues[4] = (const char *) &nrc->blimit_off;
919           nrc->paramLengths[4] = sizeof (nrc->blimit_off);
920           nrc->nparams = 5;
921           nrc->pname = "getvt";
922           ret = PQexecParams (plugin->dbh,
923                               "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2 AND type=$3",
924                               3,
925                               NULL,
926                               nrc->paramValues, 
927                               nrc->paramLengths,
928                               paramFormats, 1);
929         }
930       else
931         {
932           nrc->paramValues[1] = (const char *) &nrc->btype;
933           nrc->paramLengths[1] = sizeof (nrc->btype);
934           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
935           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
936           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
937           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
938           nrc->nparams = 4;
939           nrc->pname = "gett";
940           ret = PQexecParams (plugin->dbh,
941                               "SELECT count(*) FROM gn080 WHERE hash=$1 AND type=$2",
942                               2,
943                               NULL,
944                               nrc->paramValues, 
945                               nrc->paramLengths, 
946                               paramFormats, 1);
947         }
948     }
949   else
950     {
951       if (vhash != NULL)
952         {
953           nrc->paramValues[1] = (const char *) &nrc->vhash;
954           nrc->paramLengths[1] = sizeof (nrc->vhash);
955           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
956           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
957           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
958           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
959           nrc->nparams = 4;
960           nrc->pname = "getv";
961           ret = PQexecParams (plugin->dbh,
962                               "SELECT count(*) FROM gn080 WHERE hash=$1 AND vhash=$2",
963                               2,
964                               NULL,
965                               nrc->paramValues, 
966                               nrc->paramLengths,
967                               paramFormats, 1);
968         }
969       else
970         {
971           nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
972           nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
973           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
974           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
975           nrc->nparams = 3;
976           nrc->pname = "get";
977           ret = PQexecParams (plugin->dbh,
978                               "SELECT count(*) FROM gn080 WHERE hash=$1",
979                               1,
980                               NULL,
981                               nrc->paramValues, 
982                               nrc->paramLengths,
983                               paramFormats, 1);
984         }
985     }
986   if (GNUNET_OK != check_result (plugin,
987                                  ret,
988                                  PGRES_TUPLES_OK,
989                                  "PQexecParams",
990                                  nrc->pname,
991                                  __LINE__))
992     {
993       iter (iter_cls, 
994             NULL, NULL, 0, NULL, 0, 0, 0, 
995             GNUNET_TIME_UNIT_ZERO_ABS, 0);
996       return;
997     }
998   if ((PQntuples (ret) != 1) ||
999       (PQnfields (ret) != 1) ||
1000       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1001     {
1002       GNUNET_break (0);
1003       PQclear (ret);
1004       iter (iter_cls, 
1005             NULL, NULL, 0, NULL, 0, 0, 0, 
1006             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1007       return;
1008     }
1009   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1010   PQclear (ret);
1011   if (nrc->total == 0)
1012     {
1013       iter (iter_cls, 
1014             NULL, NULL, 0, NULL, 0, 0, 0, 
1015             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1016       return;
1017     }
1018   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1019                                        nrc->total);
1020   postgres_plugin_next_request (nrc,
1021                                 GNUNET_NO);
1022 }
1023
1024
1025 /**
1026  * Select a subset of the items in the datastore and call
1027  * the given iterator for each of them.
1028  *
1029  * @param cls our "struct Plugin*"
1030  * @param type entries of which type should be considered?
1031  *        Use 0 for any type.
1032  * @param iter function to call on each matching value;
1033  *        will be called once with a NULL value at the end
1034  * @param iter_cls closure for iter
1035  */
1036 static void
1037 postgres_plugin_iter_zero_anonymity (void *cls,
1038                                      enum GNUNET_BLOCK_Type type,
1039                                      PluginIterator iter,
1040                                      void *iter_cls)
1041 {
1042   struct Plugin *plugin = cls;
1043
1044   postgres_iterate (plugin, 
1045                     type, GNUNET_NO, 1,
1046                     iter, iter_cls);
1047 }
1048
1049
1050 /**
1051  * Select a subset of the items in the datastore and call
1052  * the given iterator for each of them.
1053  *
1054  * @param cls our "struct Plugin*"
1055  * @param type entries of which type should be considered?
1056  *        Use 0 for any type.
1057  * @param iter function to call on each matching value;
1058  *        will be called once with a NULL value at the end
1059  * @param iter_cls closure for iter
1060  */
1061 static void
1062 postgres_plugin_iter_ascending_expiration (void *cls,
1063                                            enum GNUNET_BLOCK_Type type,
1064                                            PluginIterator iter,
1065                                            void *iter_cls)
1066 {
1067   struct Plugin *plugin = cls;
1068
1069   postgres_iterate (plugin, type, GNUNET_YES, 2,
1070                     iter, iter_cls);
1071 }
1072
1073
1074
1075 /**
1076  * Select a subset of the items in the datastore and call
1077  * the given iterator for each of them.
1078  *
1079  * @param cls our "struct Plugin*"
1080  * @param type entries of which type should be considered?
1081  *        Use 0 for any type.
1082  * @param iter function to call on each matching value;
1083  *        will be called once with a NULL value at the end
1084  * @param iter_cls closure for iter
1085  */
1086 static void
1087 postgres_plugin_iter_migration_order (void *cls,
1088                                       enum GNUNET_BLOCK_Type type,
1089                                       PluginIterator iter,
1090                                       void *iter_cls)
1091 {
1092   struct Plugin *plugin = cls;
1093
1094   postgres_iterate (plugin, 0, GNUNET_NO, 3, 
1095                     iter, iter_cls);
1096 }
1097
1098
1099
1100 /**
1101  * Select a subset of the items in the datastore and call
1102  * the given iterator for each of them.
1103  *
1104  * @param cls our "struct Plugin*"
1105  * @param type entries of which type should be considered?
1106  *        Use 0 for any type.
1107  * @param iter function to call on each matching value;
1108  *        will be called once with a NULL value at the end
1109  * @param iter_cls closure for iter
1110  */
1111 static void
1112 postgres_plugin_iter_all_now (void *cls,
1113                               enum GNUNET_BLOCK_Type type,
1114                               PluginIterator iter,
1115                               void *iter_cls)
1116 {
1117   struct Plugin *plugin = cls;
1118
1119   postgres_iterate (plugin, 
1120                     0, GNUNET_YES, 0, 
1121                     iter, iter_cls);
1122 }
1123
1124
1125 /**
1126  * Drop database.
1127  */
1128 static void 
1129 postgres_plugin_drop (void *cls)
1130 {
1131   struct Plugin *plugin = cls;
1132
1133   pq_exec (plugin, "DROP TABLE gn080", __LINE__);
1134 }
1135
1136
1137 /**
1138  * Entry point for the plugin.
1139  *
1140  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1141  * @return our "struct Plugin*"
1142  */
1143 void *
1144 libgnunet_plugin_datastore_postgres_init (void *cls)
1145 {
1146   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1147   struct GNUNET_DATASTORE_PluginFunctions *api;
1148   struct Plugin *plugin;
1149
1150   plugin = GNUNET_malloc (sizeof (struct Plugin));
1151   plugin->env = env;
1152   if (GNUNET_OK != init_connection (plugin))
1153     {
1154       GNUNET_free (plugin);
1155       return NULL;
1156     }
1157   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1158   api->cls = plugin;
1159   api->get_size = &postgres_plugin_get_size;
1160   api->put = &postgres_plugin_put;
1161   api->next_request = &postgres_plugin_next_request;
1162   api->get = &postgres_plugin_get;
1163   api->update = &postgres_plugin_update;
1164   api->iter_low_priority = &postgres_plugin_iter_low_priority;
1165   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1166   api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1167   api->iter_migration_order = &postgres_plugin_iter_migration_order;
1168   api->iter_all_now = &postgres_plugin_iter_all_now;
1169   api->drop = &postgres_plugin_drop;
1170   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1171                    "postgres", _("Postgres database running\n"));
1172   return api;
1173 }
1174
1175
1176 /**
1177  * Exit point from the plugin.
1178  * @param cls our "struct Plugin*"
1179  * @return always NULL
1180  */
1181 void *
1182 libgnunet_plugin_datastore_postgres_done (void *cls)
1183 {
1184   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1185   struct Plugin *plugin = api->cls;
1186   
1187   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1188     {
1189       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1190                                plugin->next_task);
1191       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1192       GNUNET_free (plugin->next_task_nc);
1193       plugin->next_task_nc = NULL;
1194     }
1195   PQfinish (plugin->dbh);
1196   GNUNET_free (plugin);
1197   GNUNET_free (api);
1198   return NULL;
1199 }
1200
1201 /* end of plugin_datastore_postgres.c */