fixes
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 #define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34                                "WHERE (prio = $1 AND oid > $2) "                        \
35                                "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36                                "UNION "\
37                                "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38                                "WHERE (prio > $1 AND oid != $2)"\
39                                "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40                                "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43                                 "WHERE (prio = $1 AND oid < $2)"\
44                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45                                 "UNION "\
46                                 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47                                 "WHERE (prio < $1 AND oid != $2)"\
48                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49                                 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52                                   "WHERE (expire = $1 AND oid > $2) "\
53                                   "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54                                   "UNION "\
55                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56                                   "WHERE (expire > $1 AND oid != $2) "          \
57                                   "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58                                   "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62                                   "WHERE (expire = $1 AND oid < $2)"\
63                                   " AND expire > $3 AND type!=3"\
64                                   " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65                                   "UNION "\
66                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67                                   "WHERE (expire < $1 AND oid != $2)"           \
68                                   " AND expire > $3 AND type!=3"\
69                                   " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70                                   "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72 /**
73  * After how many ms "busy" should a DB operation fail for good?
74  * A low value makes sure that we are more responsive to requests
75  * (especially PUTs).  A high value guarantees a higher success
76  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
77  *
78  * The default value of 1s should ensure that users do not experience
79  * huge latencies while at the same time allowing operations to succeed
80  * with reasonable probability.
81  */
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
83
84
85 /**
86  * Closure for 'postgres_next_request_cont'.
87  */
88 struct NextRequestClosure
89 {
90   /**
91    * Global plugin data.
92    */
93   struct Plugin *plugin;
94   
95   /**
96    * Function to call for each matching entry.
97    */
98   PluginIterator iter;
99   
100   /**
101    * Closure for 'iter'.
102    */
103   void *iter_cls;
104   
105   /**
106    * Parameters for the prepared statement.
107    */
108   const char *paramValues[5];
109   
110   /**
111    * Name of the prepared statement to run.
112    */
113   const char *pname;
114   
115   /**
116    * Size of values pointed to by paramValues.
117    */
118   int paramLengths[5];
119   
120   /**
121    * Number of paramters in paramValues/paramLengths.
122    */
123   int nparams; 
124   
125   /**
126    * Current time (possible parameter), big-endian.
127    */
128   uint64_t bnow;
129   
130   /**
131    * Key (possible parameter)
132    */
133   GNUNET_HashCode key;
134   
135   /**
136    * Hash of value (possible parameter)
137    */
138   GNUNET_HashCode vhash;
139   
140   /**
141    * Number of entries found so far
142    */
143   long long count;
144   
145   /**
146    * Offset this iteration starts at.
147    */
148   uint64_t off;
149   
150   /**
151    * Current offset to use in query, big-endian.
152    */
153   uint64_t blimit_off;
154   
155   /**
156    *  Overall number of matching entries.
157    */
158   unsigned long long total;
159   
160   /**
161    * Expiration value of previous result (possible parameter), big-endian.
162    */
163   uint64_t blast_expire;
164   
165   /**
166    * Row ID of last result (possible paramter), big-endian.
167    */
168   uint32_t blast_rowid;
169   
170   /**
171    * Priority of last result (possible parameter), big-endian.
172    */
173   uint32_t blast_prio;
174   
175   /**
176    * Type of block (possible paramter), big-endian.
177    */
178   uint32_t btype;
179   
180   /**
181    * Flag set to GNUNET_YES to stop iteration.
182    */
183   int end_it;
184 };
185
186
187 /**
188  * Context for all functions in this plugin.
189  */
190 struct Plugin 
191 {
192   /**
193    * Our execution environment.
194    */
195   struct GNUNET_DATASTORE_PluginEnvironment *env;
196
197   /**
198    * Native Postgres database handle.
199    */
200   PGconn *dbh;
201
202   /**
203    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
204    */
205   struct NextRequestClosure *next_task_nc;
206
207   /**
208    * Pending task with scheduler for running the next request.
209    */
210   GNUNET_SCHEDULER_TaskIdentifier next_task;
211
212 };
213
214
215 /**
216  * Check if the result obtained from Postgres has
217  * the desired status code.  If not, log an error, clear the
218  * result and return GNUNET_SYSERR.
219  * 
220  * @param plugin global context
221  * @param ret result to check
222  * @param expected_status expected return value
223  * @param command name of SQL command that was run
224  * @param args arguments to SQL command
225  * @param line line number for error reporting
226  * @return GNUNET_OK if the result is acceptable
227  */
228 static int
229 check_result (struct Plugin *plugin,
230               PGresult * ret,
231               int expected_status,
232               const char *command, const char *args, int line)
233 {
234   if (ret == NULL)
235     {
236       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
237                        "datastore-postgres",
238                        "Postgres failed to allocate result for `%s:%s' at %d\n",
239                        command, args, line);
240       return GNUNET_SYSERR;
241     }
242   if (PQresultStatus (ret) != expected_status)
243     {
244       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
245                        "datastore-postgres",
246                        _("`%s:%s' failed at %s:%d with error: %s"),
247                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
248       PQclear (ret);
249       return GNUNET_SYSERR;
250     }
251   return GNUNET_OK;
252 }
253
254 /**
255  * Run simple SQL statement (without results).
256  *
257  * @param plugin global context
258  * @param sql statement to run
259  * @param line code line for error reporting
260  */
261 static int
262 pq_exec (struct Plugin *plugin,
263          const char *sql, int line)
264 {
265   PGresult *ret;
266   ret = PQexec (plugin->dbh, sql);
267   if (GNUNET_OK != check_result (plugin,
268                                  ret, 
269                                  PGRES_COMMAND_OK, "PQexec", sql, line))
270     return GNUNET_SYSERR;
271   PQclear (ret);
272   return GNUNET_OK;
273 }
274
275 /**
276  * Prepare SQL statement.
277  *
278  * @param plugin global context
279  * @param sql SQL code to prepare
280  * @param nparams number of parameters in sql
281  * @param line code line for error reporting
282  * @return GNUNET_OK on success
283  */
284 static int
285 pq_prepare (struct Plugin *plugin,
286             const char *name, const char *sql, int nparms, int line)
287 {
288   PGresult *ret;
289   ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
290   if (GNUNET_OK !=
291       check_result (plugin, 
292                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
293     return GNUNET_SYSERR;
294   PQclear (ret);
295   return GNUNET_OK;
296 }
297
298 /**
299  * @brief Get a database handle
300  *
301  * @param plugin global context
302  * @return GNUNET_OK on success, GNUNET_SYSERR on error
303  */
304 static int
305 init_connection (struct Plugin *plugin)
306 {
307   char *conninfo;
308   PGresult *ret;
309
310   /* Open database and precompile statements */
311   conninfo = NULL;
312   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
313                                          "datastore-postgres",
314                                          "CONFIG",
315                                          &conninfo);
316   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
317   GNUNET_free_non_null (conninfo);
318   if (NULL == plugin->dbh)
319     {
320       /* FIXME: warn about out-of-memory? */
321       return GNUNET_SYSERR;
322     }
323   if (PQstatus (plugin->dbh) != CONNECTION_OK)
324     {
325       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
326                        "datastore-postgres",
327                        _("Unable to initialize Postgres: %s"),
328                        PQerrorMessage (plugin->dbh));
329       PQfinish (plugin->dbh);
330       plugin->dbh = NULL;
331       return GNUNET_SYSERR;
332     }
333   ret = PQexec (plugin->dbh,
334                 "CREATE TABLE gn090 ("
335                 "  type INTEGER NOT NULL DEFAULT 0,"
336                 "  prio INTEGER NOT NULL DEFAULT 0,"
337                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
338                 "  expire BIGINT NOT NULL DEFAULT 0,"
339                 "  hash BYTEA NOT NULL DEFAULT '',"
340                 "  vhash BYTEA NOT NULL DEFAULT '',"
341                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
342   if ( (ret == NULL) || 
343        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
344          (0 != strcmp ("42P07",    /* duplicate table */
345                        PQresultErrorField
346                        (ret,
347                         PG_DIAG_SQLSTATE)))))
348     {
349       check_result (plugin,
350                     ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
351       PQfinish (plugin->dbh);
352       plugin->dbh = NULL;
353       return GNUNET_SYSERR;
354     }
355   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
356     {
357       if ((GNUNET_OK !=
358            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
359           (GNUNET_OK !=
360            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
361                     __LINE__))
362           || (GNUNET_OK !=
363               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
364           || (GNUNET_OK !=
365               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
366           || (GNUNET_OK !=
367               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
368                        __LINE__))
369           || (GNUNET_OK !=
370               pq_exec
371               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
372                __LINE__))
373           || (GNUNET_OK !=
374               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
375                        __LINE__)))
376         {
377           PQclear (ret);
378           PQfinish (plugin->dbh);
379           plugin->dbh = NULL;
380           return GNUNET_SYSERR;
381         }
382     }
383   PQclear (ret);
384 #if 1
385   ret = PQexec (plugin->dbh,
386                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
387   if (GNUNET_OK != 
388       check_result (plugin,
389                     ret, PGRES_COMMAND_OK,
390                     "ALTER TABLE", "gn090", __LINE__))
391     {
392       PQfinish (plugin->dbh);
393       plugin->dbh = NULL;
394       return GNUNET_SYSERR;
395     }
396   PQclear (ret);
397   ret = PQexec (plugin->dbh,
398                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
399   if (GNUNET_OK !=
400       check_result (plugin,
401                     ret, PGRES_COMMAND_OK,
402                     "ALTER TABLE", "gn090", __LINE__))
403     {
404       PQfinish (plugin->dbh);
405       plugin->dbh = NULL;
406       return GNUNET_SYSERR;
407     }
408   PQclear (ret);
409   ret = PQexec (plugin->dbh,
410                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
411   if (GNUNET_OK !=
412       check_result (plugin,
413                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
414     {
415       PQfinish (plugin->dbh);
416       plugin->dbh = NULL;
417       return GNUNET_SYSERR;
418     }
419   PQclear (ret);
420 #endif
421   if ((GNUNET_OK !=
422        pq_prepare (plugin,
423                    "getvt",
424                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
425                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
426                    "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
427                    5,
428                    __LINE__)) ||
429       (GNUNET_OK !=
430        pq_prepare (plugin,
431                    "gett",
432                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
433                    "WHERE hash=$1 AND type=$2"
434                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
435                    4,
436                    __LINE__)) ||
437       (GNUNET_OK !=
438        pq_prepare (plugin,
439                    "getv",
440                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
441                    "WHERE hash=$1 AND vhash=$2"
442                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
443                    4,
444                    __LINE__)) ||
445       (GNUNET_OK !=
446        pq_prepare (plugin,
447                    "get",
448                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
449                    "WHERE hash=$1"
450                    "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
451                    3,
452                    __LINE__)) ||
453       (GNUNET_OK !=
454        pq_prepare (plugin,
455                    "put",
456                    "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
457                    "VALUES ($1, $2, $3, $4, $5, $6, $7)",
458                    8,
459                    __LINE__)) ||
460       (GNUNET_OK !=
461        pq_prepare (plugin,
462                    "update",
463                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
464                    "WHERE oid = $3",
465                    3,
466                    __LINE__)) ||
467       (GNUNET_OK !=
468        pq_prepare (plugin,
469                    "select_low_priority",
470                    SELECT_IT_LOW_PRIORITY,
471                    2,
472                    __LINE__)) ||
473       (GNUNET_OK !=
474        pq_prepare (plugin,
475                    "select_non_anonymous",
476                    SELECT_IT_NON_ANONYMOUS,
477                    2,
478                    __LINE__)) ||
479       (GNUNET_OK !=
480        pq_prepare (plugin,
481                    "select_expiration_time",
482                    SELECT_IT_EXPIRATION_TIME,
483                    2,
484                    __LINE__)) ||
485       (GNUNET_OK !=
486        pq_prepare (plugin,
487                    "select_migration_order",
488                    SELECT_IT_MIGRATION_ORDER,
489                    3,
490                    __LINE__)) ||
491       (GNUNET_OK !=
492        pq_prepare (plugin,
493                    "delrow",
494                    "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
495     {
496       PQfinish (plugin->dbh);
497       plugin->dbh = NULL;
498       return GNUNET_SYSERR;
499     }
500   return GNUNET_OK;
501 }
502
503
504 /**
505  * Delete the row identified by the given rowid (qid
506  * in postgres).
507  *
508  * @param plugin global context
509  * @param rowid which row to delete
510  * @return GNUNET_OK on success
511  */
512 static int
513 delete_by_rowid (struct Plugin *plugin,
514                  unsigned int rowid)
515 {
516   const char *paramValues[] = { (const char *) &rowid };
517   int paramLengths[] = { sizeof (rowid) };
518   const int paramFormats[] = { 1 };
519   PGresult *ret;
520
521   ret = PQexecPrepared (plugin->dbh,
522                         "delrow",
523                         1, paramValues, paramLengths, paramFormats, 1);
524   if (GNUNET_OK !=
525       check_result (plugin,
526                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
527                     __LINE__))
528     {
529       return GNUNET_SYSERR;
530     }
531   PQclear (ret);
532   return GNUNET_OK;
533 }
534
535
536 /**
537  * Get an estimate of how much space the database is
538  * currently using.
539  *
540  * @param cls our "struct Plugin*"
541  * @return number of bytes used on disk
542  */
543 static unsigned long long
544 postgres_plugin_get_size (void *cls)
545 {
546   struct Plugin *plugin = cls;
547   unsigned long long total;
548   PGresult *ret;
549
550   ret = PQexecParams (plugin->dbh,
551                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
552                       0, NULL, NULL, NULL, NULL, 1);
553   if (GNUNET_OK != check_result (plugin,
554                                  ret,
555                                  PGRES_TUPLES_OK,
556                                  "PQexecParams",
557                                  "get_size",
558                                  __LINE__))
559     {
560       return 0;
561     }
562   if ((PQntuples (ret) != 1) ||
563       (PQnfields (ret) != 1) ||
564       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
565     {
566       GNUNET_break (0);
567       PQclear (ret);
568       return 0;
569     }
570   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
571   PQclear (ret);
572   return total;
573 }
574
575
576 /**
577  * Store an item in the datastore.
578  *
579  * @param cls closure
580  * @param key key for the item
581  * @param size number of bytes in data
582  * @param data content stored
583  * @param type type of the content
584  * @param priority priority of the content
585  * @param anonymity anonymity-level for the content
586  * @param expiration expiration time for the content
587  * @param msg set to error message
588  * @return GNUNET_OK on success
589  */
590 static int
591 postgres_plugin_put (void *cls,
592                      const GNUNET_HashCode * key,
593                      uint32_t size,
594                      const void *data,
595                      enum GNUNET_BLOCK_Type type,
596                      uint32_t priority,
597                      uint32_t anonymity,
598                      struct GNUNET_TIME_Absolute expiration,
599                      char **msg)
600 {
601   struct Plugin *plugin = cls;
602   GNUNET_HashCode vhash;
603   PGresult *ret;
604   uint32_t btype = htonl (type);
605   uint32_t bprio = htonl (priority);
606   uint32_t banon = htonl (anonymity);
607   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
608   const char *paramValues[] = {
609     (const char *) &btype,
610     (const char *) &bprio,
611     (const char *) &banon,
612     (const char *) &bexpi,
613     (const char *) key,
614     (const char *) &vhash,
615     (const char *) data
616   };
617   int paramLengths[] = {
618     sizeof (btype),
619     sizeof (bprio),
620     sizeof (banon),
621     sizeof (bexpi),
622     sizeof (GNUNET_HashCode),
623     sizeof (GNUNET_HashCode),
624     size
625   };
626   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
627
628   GNUNET_CRYPTO_hash (data, size, &vhash);
629   ret = PQexecPrepared (plugin->dbh,
630                         "put", 7, paramValues, paramLengths, paramFormats, 1);
631   if (GNUNET_OK != check_result (plugin, ret,
632                                  PGRES_COMMAND_OK,
633                                  "PQexecPrepared", "put", __LINE__))
634     return GNUNET_SYSERR;
635   PQclear (ret);
636   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
637 #if DEBUG_POSTGRES
638   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
639                    "datastore-postgres",
640                    "Stored %u bytes in database\n",
641                    (unsigned int) size);
642 #endif
643   return GNUNET_OK;
644 }
645
646 /**
647  * Function invoked on behalf of a "PluginIterator"
648  * asking the database plugin to call the iterator
649  * with the next item.
650  *
651  * @param cls the 'struct NextRequestClosure'
652  * @param tc scheduler context
653  */
654 static void 
655 postgres_next_request_cont (void *next_cls,
656                             const struct GNUNET_SCHEDULER_TaskContext *tc)
657 {
658   struct NextRequestClosure *nrc = next_cls;
659   struct Plugin *plugin = nrc->plugin;
660   const int paramFormats[] = { 1, 1, 1, 1, 1 };
661   int iret;
662   PGresult *res;
663   enum GNUNET_BLOCK_Type type;
664   uint32_t anonymity;
665   uint32_t priority;
666   uint32_t size;
667   unsigned int rowid;
668   struct GNUNET_TIME_Absolute expiration_time;
669   GNUNET_HashCode key;
670
671   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
672   plugin->next_task_nc = NULL;
673   if ( (GNUNET_YES == nrc->end_it) ||
674        (nrc->count == nrc->total) )
675     {
676 #if DEBUG_POSTGRES
677       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
678                        "datastore-postgres",
679                        "Ending iteration (%s)\n",
680                        (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
681 #endif
682       nrc->iter (nrc->iter_cls, 
683                  NULL, NULL, 0, NULL, 0, 0, 0, 
684                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
685       GNUNET_free (nrc);
686       return;
687     }
688   
689   if (nrc->count == 0)
690     nrc->blimit_off = GNUNET_htonll (nrc->off);
691   else
692     nrc->blimit_off = GNUNET_htonll (0);
693   if (nrc->count + nrc->off == nrc->total)
694     nrc->blast_rowid = htonl (0); /* back to start */
695   
696   res = PQexecPrepared (plugin->dbh,
697                         nrc->pname,
698                         nrc->nparams,
699                         nrc->paramValues, 
700                         nrc->paramLengths,
701                         paramFormats, 1);
702   if (GNUNET_OK != check_result (plugin,
703                                  res,
704                                  PGRES_TUPLES_OK,
705                                  "PQexecPrepared",
706                                  nrc->pname,
707                                  __LINE__))
708     {
709 #if DEBUG_POSTGRES
710       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
711                        "datastore-postgres",
712                        "Ending iteration (postgres error)\n");
713 #endif
714       nrc->iter (nrc->iter_cls, 
715                  NULL, NULL, 0, NULL, 0, 0, 0, 
716                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
717       GNUNET_free (nrc);
718       return;
719     }
720
721   if (0 == PQntuples (res))
722     {
723       /* no result */
724 #if DEBUG_POSTGRES
725       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
726                        "datastore-postgres",
727                        "Ending iteration (no more results)\n");
728 #endif
729       nrc->iter (nrc->iter_cls, 
730                  NULL, NULL, 0, NULL, 0, 0, 0, 
731                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
732       PQclear (res);
733       GNUNET_free (nrc);
734       return; 
735     }
736   if ((1 != PQntuples (res)) ||
737       (7 != PQnfields (res)) ||
738       (sizeof (uint32_t) != PQfsize (res, 0)) ||
739       (sizeof (uint32_t) != PQfsize (res, 6)))
740     {
741       GNUNET_break (0);
742       nrc->iter (nrc->iter_cls, 
743                  NULL, NULL, 0, NULL, 0, 0, 0, 
744                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
745       PQclear (res);
746       GNUNET_free (nrc);
747       return;
748     }
749   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
750   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
751       (sizeof (uint32_t) != PQfsize (res, 1)) ||
752       (sizeof (uint32_t) != PQfsize (res, 2)) ||
753       (sizeof (uint64_t) != PQfsize (res, 3)) ||
754       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
755     {
756       GNUNET_break (0);
757       PQclear (res);
758       delete_by_rowid (plugin, rowid);
759       nrc->iter (nrc->iter_cls, 
760                  NULL, NULL, 0, NULL, 0, 0, 0, 
761                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
762       GNUNET_free (nrc);
763       return;
764     }
765
766   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
767   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
768   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
769   expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
770   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
771   size = PQgetlength (res, 0, 5);
772
773   nrc->blast_prio = htonl (priority);
774   nrc->blast_expire = GNUNET_htonll (expiration_time.value);
775   nrc->blast_rowid = htonl (rowid);
776   nrc->count++;
777
778 #if DEBUG_POSTGRES
779   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
780                    "datastore-postgres",
781                    "Found result of size %u bytes and type %u in database\n",
782                    (unsigned int) size,
783                    (unsigned int) type);
784 #endif
785   iret = nrc->iter (nrc->iter_cls,
786                     nrc,
787                     &key,
788                     size,
789                     PQgetvalue (res, 0, 5),
790                     (enum GNUNET_BLOCK_Type) type,
791                     priority,
792                     anonymity,
793                     expiration_time,
794                     rowid);
795   PQclear (res);
796   if (iret == GNUNET_SYSERR)
797     {
798 #if DEBUG_POSTGRES
799       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
800                        "datastore-postgres",
801                        "Ending iteration (client error)\n");
802 #endif
803       return;
804     }
805   if (iret == GNUNET_NO)
806     {
807       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
808         {
809 #if DEBUG_POSTGRES
810           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
811                            "datastore-postgres",
812                            "Deleting %u bytes from database\n",
813                            (unsigned int) size);
814 #endif
815           plugin->env->duc (plugin->env->cls,
816                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
817 #if DEBUG_POSTGRES
818           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
819                            "datastore-postgres",
820                            "Deleted %u bytes from database\n",
821                            (unsigned int) size);
822 #endif
823         }
824     }
825 }
826
827
828 /**
829  * Function invoked on behalf of a "PluginIterator"
830  * asking the database plugin to call the iterator
831  * with the next item.
832  *
833  * @param next_cls whatever argument was given
834  *        to the PluginIterator as "next_cls".
835  * @param end_it set to GNUNET_YES if we
836  *        should terminate the iteration early
837  *        (iterator should be still called once more
838  *         to signal the end of the iteration).
839  */
840 static void 
841 postgres_plugin_next_request (void *next_cls,
842                               int end_it)
843 {
844   struct NextRequestClosure *nrc = next_cls;
845
846   if (GNUNET_YES == end_it)
847     nrc->end_it = GNUNET_YES;
848   nrc->plugin->next_task_nc = nrc;
849   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
850                                                      &postgres_next_request_cont,
851                                                      nrc);
852 }
853
854
855 /**
856  * Update the priority for a particular key in the datastore.  If
857  * the expiration time in value is different than the time found in
858  * the datastore, the higher value should be kept.  For the
859  * anonymity level, the lower value is to be used.  The specified
860  * priority should be added to the existing priority, ignoring the
861  * priority in value.
862  *
863  * Note that it is possible for multiple values to match this put.
864  * In that case, all of the respective values are updated.
865  *
866  * @param cls our "struct Plugin*"
867  * @param uid unique identifier of the datum
868  * @param delta by how much should the priority
869  *     change?  If priority + delta < 0 the
870  *     priority should be set to 0 (never go
871  *     negative).
872  * @param expire new expiration time should be the
873  *     MAX of any existing expiration time and
874  *     this value
875  * @param msg set to error message
876  * @return GNUNET_OK on success
877  */
878 static int
879 postgres_plugin_update (void *cls,
880                         uint64_t uid,
881                         int delta, struct GNUNET_TIME_Absolute expire,
882                         char **msg)
883 {
884   struct Plugin *plugin = cls;
885   PGresult *ret;
886   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
887   uint32_t boid = htonl ( (uint32_t) uid);
888   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
889   const char *paramValues[] = {
890     (const char *) &bdelta,
891     (const char *) &bexpire,
892     (const char *) &boid,
893   };
894   int paramLengths[] = {
895     sizeof (bdelta),
896     sizeof (bexpire),
897     sizeof (boid),
898   };
899   const int paramFormats[] = { 1, 1, 1 };
900
901   ret = PQexecPrepared (plugin->dbh,
902                         "update",
903                         3, paramValues, paramLengths, paramFormats, 1);
904   if (GNUNET_OK != check_result (plugin,
905                                  ret,
906                                  PGRES_COMMAND_OK,
907                                  "PQexecPrepared", "update", __LINE__))
908     return GNUNET_SYSERR;
909   PQclear (ret);
910   return GNUNET_OK;
911 }
912
913
914 /**
915  * Call a method for each key in the database and
916  * call the callback method on it.
917  *
918  * @param plugin global context
919  * @param type entries of which type should be considered?
920  * @param is_asc ascending or descending iteration?
921  * @param iter_select which SELECT method should be used?
922  * @param iter maybe NULL (to just count); iter
923  *     should return GNUNET_SYSERR to abort the
924  *     iteration, GNUNET_NO to delete the entry and
925  *     continue and GNUNET_OK to continue iterating
926  * @param iter_cls closure for 'iter'
927  */
928 static void
929 postgres_iterate (struct Plugin *plugin,
930                   unsigned int type,
931                   int is_asc,
932                   unsigned int iter_select,
933                   PluginIterator iter, void *iter_cls)
934 {
935   struct NextRequestClosure *nrc;
936
937   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
938   nrc->count = UINT32_MAX;
939   nrc->plugin = plugin;
940   nrc->iter = iter;
941   nrc->iter_cls = iter_cls;
942   if (is_asc)
943     {
944       nrc->blast_prio = htonl (0);
945       nrc->blast_rowid = htonl (0);
946       nrc->blast_expire = htonl (0);
947     }
948   else
949     {
950       nrc->blast_prio = htonl (0x7FFFFFFFL);
951       nrc->blast_rowid = htonl (0xFFFFFFFF);
952       nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
953     }
954   switch (iter_select)
955     {
956     case 0:
957       nrc->pname = "select_low_priority";
958       nrc->nparams = 2;
959       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
960       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
961       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
962       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
963       break;
964     case 1:
965       nrc->pname = "select_non_anonymous";
966       nrc->nparams = 2;
967       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
968       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
969       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
970       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
971       break;
972     case 2:
973       nrc->pname = "select_expiration_time";
974       nrc->nparams = 2;
975       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
976       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
977       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
978       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
979       break;
980     case 3:
981       nrc->pname = "select_migration_order";
982       nrc->nparams = 3;
983       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
984       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
985       nrc->paramValues[2] = (const char *) &nrc->bnow;
986       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
987       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
988       nrc->paramLengths[2] = sizeof (nrc->bnow);
989       break;
990     default:
991       GNUNET_break (0);
992       iter (iter_cls, 
993             NULL, NULL, 0, NULL, 0, 0, 0, 
994             GNUNET_TIME_UNIT_ZERO_ABS, 0);
995       GNUNET_free (nrc);
996       return;
997     }
998   nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
999   postgres_plugin_next_request (nrc,
1000                                 GNUNET_NO);
1001 }
1002
1003
1004 /**
1005  * Select a subset of the items in the datastore and call
1006  * the given iterator for each of them.
1007  *
1008  * @param cls our "struct Plugin*"
1009  * @param type entries of which type should be considered?
1010  *        Use 0 for any type.
1011  * @param iter function to call on each matching value;
1012  *        will be called once with a NULL value at the end
1013  * @param iter_cls closure for iter
1014  */
1015 static void
1016 postgres_plugin_iter_low_priority (void *cls,
1017                                    enum GNUNET_BLOCK_Type type,
1018                                    PluginIterator iter,
1019                                    void *iter_cls)
1020 {
1021   struct Plugin *plugin = cls;
1022   
1023   postgres_iterate (plugin,
1024                     type, 
1025                     GNUNET_YES, 0, 
1026                     iter, iter_cls);
1027 }
1028
1029
1030
1031
1032 /**
1033  * Iterate over the results for a particular key
1034  * in the datastore.
1035  *
1036  * @param cls closure
1037  * @param key maybe NULL (to match all entries)
1038  * @param vhash hash of the value, maybe NULL (to
1039  *        match all values that have the right key).
1040  *        Note that for DBlocks there is no difference
1041  *        betwen key and vhash, but for other blocks
1042  *        there may be!
1043  * @param type entries of which type are relevant?
1044  *     Use 0 for any type.
1045  * @param iter function to call on each matching value;
1046  *        will be called once with a NULL value at the end
1047  * @param iter_cls closure for iter
1048  */
1049 static void
1050 postgres_plugin_get (void *cls,
1051                      const GNUNET_HashCode * key,
1052                      const GNUNET_HashCode * vhash,
1053                      enum GNUNET_BLOCK_Type type,
1054                      PluginIterator iter, void *iter_cls)
1055 {
1056   struct Plugin *plugin = cls;
1057   struct NextRequestClosure *nrc;
1058   const int paramFormats[] = { 1, 1, 1, 1, 1 };
1059   PGresult *ret;
1060
1061   if (key == NULL)
1062     {
1063       postgres_plugin_iter_low_priority (plugin, type, 
1064                                          iter, iter_cls);
1065       return;
1066     }
1067   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1068   nrc->plugin = plugin;
1069   nrc->iter = iter;
1070   nrc->iter_cls = iter_cls;
1071   nrc->key = *key;
1072   if (vhash != NULL)
1073     nrc->vhash = *vhash;
1074   nrc->paramValues[0] = (const char*) &nrc->key;
1075   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
1076   nrc->btype = htonl (type);
1077   if (type != 0)
1078     {
1079       if (vhash != NULL)
1080         {
1081           nrc->paramValues[1] = (const char *) &nrc->vhash;
1082           nrc->paramLengths[1] = sizeof (nrc->vhash);
1083           nrc->paramValues[2] = (const char *) &nrc->btype;
1084           nrc->paramLengths[2] = sizeof (nrc->btype);
1085           nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
1086           nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
1087           nrc->paramValues[4] = (const char *) &nrc->blimit_off;
1088           nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1089           nrc->nparams = 5;
1090           nrc->pname = "getvt";
1091           ret = PQexecParams (plugin->dbh,
1092                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
1093                               3,
1094                               NULL,
1095                               nrc->paramValues, 
1096                               nrc->paramLengths,
1097                               paramFormats, 1);
1098         }
1099       else
1100         {
1101           nrc->paramValues[1] = (const char *) &nrc->btype;
1102           nrc->paramLengths[1] = sizeof (nrc->btype);
1103           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1104           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1105           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1106           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1107           nrc->nparams = 4;
1108           nrc->pname = "gett";
1109           ret = PQexecParams (plugin->dbh,
1110                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
1111                               2,
1112                               NULL,
1113                               nrc->paramValues, 
1114                               nrc->paramLengths, 
1115                               paramFormats, 1);
1116         }
1117     }
1118   else
1119     {
1120       if (vhash != NULL)
1121         {
1122           nrc->paramValues[1] = (const char *) &nrc->vhash;
1123           nrc->paramLengths[1] = sizeof (nrc->vhash);
1124           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1125           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1126           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1127           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1128           nrc->nparams = 4;
1129           nrc->pname = "getv";
1130           ret = PQexecParams (plugin->dbh,
1131                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
1132                               2,
1133                               NULL,
1134                               nrc->paramValues, 
1135                               nrc->paramLengths,
1136                               paramFormats, 1);
1137         }
1138       else
1139         {
1140           nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1141           nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1142           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1143           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1144           nrc->nparams = 3;
1145           nrc->pname = "get";
1146           ret = PQexecParams (plugin->dbh,
1147                               "SELECT count(*) FROM gn090 WHERE hash=$1",
1148                               1,
1149                               NULL,
1150                               nrc->paramValues, 
1151                               nrc->paramLengths,
1152                               paramFormats, 1);
1153         }
1154     }
1155   if (GNUNET_OK != check_result (plugin,
1156                                  ret,
1157                                  PGRES_TUPLES_OK,
1158                                  "PQexecParams",
1159                                  nrc->pname,
1160                                  __LINE__))
1161     {
1162       iter (iter_cls, 
1163             NULL, NULL, 0, NULL, 0, 0, 0, 
1164             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1165       GNUNET_free (nrc);
1166       return;
1167     }
1168   if ((PQntuples (ret) != 1) ||
1169       (PQnfields (ret) != 1) ||
1170       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1171     {
1172       GNUNET_break (0);
1173       PQclear (ret);
1174       iter (iter_cls, 
1175             NULL, NULL, 0, NULL, 0, 0, 0, 
1176             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1177       GNUNET_free (nrc);
1178       return;
1179     }
1180   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1181   PQclear (ret);
1182   if (nrc->total == 0)
1183     {
1184       iter (iter_cls, 
1185             NULL, NULL, 0, NULL, 0, 0, 0, 
1186             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1187       GNUNET_free (nrc);
1188       return;
1189     }
1190   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1191                                        nrc->total);
1192   postgres_plugin_next_request (nrc,
1193                                 GNUNET_NO);
1194 }
1195
1196
1197 /**
1198  * Select a subset of the items in the datastore and call
1199  * the given iterator for each of them.
1200  *
1201  * @param cls our "struct Plugin*"
1202  * @param type entries of which type should be considered?
1203  *        Use 0 for any type.
1204  * @param iter function to call on each matching value;
1205  *        will be called once with a NULL value at the end
1206  * @param iter_cls closure for iter
1207  */
1208 static void
1209 postgres_plugin_iter_zero_anonymity (void *cls,
1210                                      enum GNUNET_BLOCK_Type type,
1211                                      PluginIterator iter,
1212                                      void *iter_cls)
1213 {
1214   struct Plugin *plugin = cls;
1215
1216   postgres_iterate (plugin, 
1217                     type, GNUNET_NO, 1,
1218                     iter, iter_cls);
1219 }
1220
1221
1222 /**
1223  * Select a subset of the items in the datastore and call
1224  * the given iterator for each of them.
1225  *
1226  * @param cls our "struct Plugin*"
1227  * @param type entries of which type should be considered?
1228  *        Use 0 for any type.
1229  * @param iter function to call on each matching value;
1230  *        will be called once with a NULL value at the end
1231  * @param iter_cls closure for iter
1232  */
1233 static void
1234 postgres_plugin_iter_ascending_expiration (void *cls,
1235                                            enum GNUNET_BLOCK_Type type,
1236                                            PluginIterator iter,
1237                                            void *iter_cls)
1238 {
1239   struct Plugin *plugin = cls;
1240
1241   postgres_iterate (plugin, type, GNUNET_YES, 2,
1242                     iter, iter_cls);
1243 }
1244
1245
1246 /**
1247  * Select a subset of the items in the datastore and call
1248  * the given iterator for each of them.
1249  *
1250  * @param cls our "struct Plugin*"
1251  * @param type entries of which type should be considered?
1252  *        Use 0 for any type.
1253  * @param iter function to call on each matching value;
1254  *        will be called once with a NULL value at the end
1255  * @param iter_cls closure for iter
1256  */
1257 static void
1258 postgres_plugin_iter_migration_order (void *cls,
1259                                       enum GNUNET_BLOCK_Type type,
1260                                       PluginIterator iter,
1261                                       void *iter_cls)
1262 {
1263   struct Plugin *plugin = cls;
1264
1265   postgres_iterate (plugin, 0, GNUNET_NO, 3, 
1266                     iter, iter_cls);
1267 }
1268
1269
1270 /**
1271  * Select a subset of the items in the datastore and call
1272  * the given iterator for each of them.
1273  *
1274  * @param cls our "struct Plugin*"
1275  * @param type entries of which type should be considered?
1276  *        Use 0 for any type.
1277  * @param iter function to call on each matching value;
1278  *        will be called once with a NULL value at the end
1279  * @param iter_cls closure for iter
1280  */
1281 static void
1282 postgres_plugin_iter_all_now (void *cls,
1283                               enum GNUNET_BLOCK_Type type,
1284                               PluginIterator iter,
1285                               void *iter_cls)
1286 {
1287   struct Plugin *plugin = cls;
1288
1289   postgres_iterate (plugin, 
1290                     0, GNUNET_YES, 0, 
1291                     iter, iter_cls);
1292 }
1293
1294
1295 /**
1296  * Drop database.
1297  */
1298 static void 
1299 postgres_plugin_drop (void *cls)
1300 {
1301   struct Plugin *plugin = cls;
1302
1303   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1304 }
1305
1306
1307 /**
1308  * Entry point for the plugin.
1309  *
1310  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1311  * @return our "struct Plugin*"
1312  */
1313 void *
1314 libgnunet_plugin_datastore_postgres_init (void *cls)
1315 {
1316   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1317   struct GNUNET_DATASTORE_PluginFunctions *api;
1318   struct Plugin *plugin;
1319
1320   plugin = GNUNET_malloc (sizeof (struct Plugin));
1321   plugin->env = env;
1322   if (GNUNET_OK != init_connection (plugin))
1323     {
1324       GNUNET_free (plugin);
1325       return NULL;
1326     }
1327   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1328   api->cls = plugin;
1329   api->get_size = &postgres_plugin_get_size;
1330   api->put = &postgres_plugin_put;
1331   api->next_request = &postgres_plugin_next_request;
1332   api->get = &postgres_plugin_get;
1333   api->update = &postgres_plugin_update;
1334   api->iter_low_priority = &postgres_plugin_iter_low_priority;
1335   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1336   api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1337   api->iter_migration_order = &postgres_plugin_iter_migration_order;
1338   api->iter_all_now = &postgres_plugin_iter_all_now;
1339   api->drop = &postgres_plugin_drop;
1340   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1341                    "datastore-postgres",
1342                    _("Postgres database running\n"));
1343   return api;
1344 }
1345
1346
1347 /**
1348  * Exit point from the plugin.
1349  * @param cls our "struct Plugin*"
1350  * @return always NULL
1351  */
1352 void *
1353 libgnunet_plugin_datastore_postgres_done (void *cls)
1354 {
1355   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1356   struct Plugin *plugin = api->cls;
1357   
1358   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1359     {
1360       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1361                                plugin->next_task);
1362       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1363       GNUNET_free (plugin->next_task_nc);
1364       plugin->next_task_nc = NULL;
1365     }
1366   PQfinish (plugin->dbh);
1367   GNUNET_free (plugin);
1368   GNUNET_free (api);
1369   return NULL;
1370 }
1371
1372 /* end of plugin_datastore_postgres.c */