avoid disconnect on cancel
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "plugin_datastore.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 #define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34                                "WHERE (prio = $1 AND oid > $2) "                        \
35                                "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36                                "UNION "\
37                                "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38                                "WHERE (prio > $1 AND oid != $2)"\
39                                "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40                                "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43                                 "WHERE (prio = $1 AND oid < $2)"\
44                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45                                 "UNION "\
46                                 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47                                 "WHERE (prio < $1 AND oid != $2)"\
48                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49                                 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52                                   "WHERE (expire = $1 AND oid > $2) "\
53                                   "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54                                   "UNION "\
55                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56                                   "WHERE (expire > $1 AND oid != $2) "          \
57                                   "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58                                   "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62                                   "WHERE (expire = $1 AND oid < $2)"\
63                                   " AND expire > $3 AND type!=3"\
64                                   " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65                                   "UNION "\
66                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67                                   "WHERE (expire < $1 AND oid != $2)"           \
68                                   " AND expire > $3 AND type!=3"\
69                                   " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70                                   "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72 /**
73  * After how many ms "busy" should a DB operation fail for good?
74  * A low value makes sure that we are more responsive to requests
75  * (especially PUTs).  A high value guarantees a higher success
76  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
77  *
78  * The default value of 1s should ensure that users do not experience
79  * huge latencies while at the same time allowing operations to succeed
80  * with reasonable probability.
81  */
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
83
84
85 /**
86  * Closure for 'postgres_next_request_cont'.
87  */
88 struct NextRequestClosure
89 {
90   /**
91    * Global plugin data.
92    */
93   struct Plugin *plugin;
94   
95   /**
96    * Function to call for each matching entry.
97    */
98   PluginIterator iter;
99   
100   /**
101    * Closure for 'iter'.
102    */
103   void *iter_cls;
104   
105   /**
106    * Parameters for the prepared statement.
107    */
108   const char *paramValues[5];
109   
110   /**
111    * Name of the prepared statement to run.
112    */
113   const char *pname;
114   
115   /**
116    * Size of values pointed to by paramValues.
117    */
118   int paramLengths[5];
119   
120   /**
121    * Number of paramters in paramValues/paramLengths.
122    */
123   int nparams; 
124   
125   /**
126    * Current time (possible parameter), big-endian.
127    */
128   uint64_t bnow;
129   
130   /**
131    * Key (possible parameter)
132    */
133   GNUNET_HashCode key;
134   
135   /**
136    * Hash of value (possible parameter)
137    */
138   GNUNET_HashCode vhash;
139   
140   /**
141    * Number of entries found so far
142    */
143   long long count;
144   
145   /**
146    * Offset this iteration starts at.
147    */
148   uint64_t off;
149   
150   /**
151    * Current offset to use in query, big-endian.
152    */
153   uint64_t blimit_off;
154   
155   /**
156    *  Overall number of matching entries.
157    */
158   unsigned long long total;
159   
160   /**
161    * Expiration value of previous result (possible parameter), big-endian.
162    */
163   uint64_t blast_expire;
164   
165   /**
166    * Row ID of last result (possible paramter), big-endian.
167    */
168   uint32_t blast_rowid;
169   
170   /**
171    * Priority of last result (possible parameter), big-endian.
172    */
173   uint32_t blast_prio;
174   
175   /**
176    * Type of block (possible paramter), big-endian.
177    */
178   uint32_t btype;
179   
180   /**
181    * Flag set to GNUNET_YES to stop iteration.
182    */
183   int end_it;
184 };
185
186
187 /**
188  * Context for all functions in this plugin.
189  */
190 struct Plugin 
191 {
192   /**
193    * Our execution environment.
194    */
195   struct GNUNET_DATASTORE_PluginEnvironment *env;
196
197   /**
198    * Native Postgres database handle.
199    */
200   PGconn *dbh;
201
202   /**
203    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
204    */
205   struct NextRequestClosure *next_task_nc;
206
207   /**
208    * Pending task with scheduler for running the next request.
209    */
210   GNUNET_SCHEDULER_TaskIdentifier next_task;
211
212 };
213
214
215 /**
216  * Check if the result obtained from Postgres has
217  * the desired status code.  If not, log an error, clear the
218  * result and return GNUNET_SYSERR.
219  * 
220  * @param plugin global context
221  * @param ret result to check
222  * @param expected_status expected return value
223  * @param command name of SQL command that was run
224  * @param args arguments to SQL command
225  * @param line line number for error reporting
226  * @return GNUNET_OK if the result is acceptable
227  */
228 static int
229 check_result (struct Plugin *plugin,
230               PGresult * ret,
231               int expected_status,
232               const char *command, const char *args, int line)
233 {
234   if (ret == NULL)
235     {
236       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
237                        "datastore-postgres",
238                        "Postgres failed to allocate result for `%s:%s' at %d\n",
239                        command, args, line);
240       return GNUNET_SYSERR;
241     }
242   if (PQresultStatus (ret) != expected_status)
243     {
244       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
245                        "datastore-postgres",
246                        _("`%s:%s' failed at %s:%d with error: %s"),
247                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
248       PQclear (ret);
249       return GNUNET_SYSERR;
250     }
251   return GNUNET_OK;
252 }
253
254 /**
255  * Run simple SQL statement (without results).
256  *
257  * @param plugin global context
258  * @param sql statement to run
259  * @param line code line for error reporting
260  */
261 static int
262 pq_exec (struct Plugin *plugin,
263          const char *sql, int line)
264 {
265   PGresult *ret;
266   ret = PQexec (plugin->dbh, sql);
267   if (GNUNET_OK != check_result (plugin,
268                                  ret, 
269                                  PGRES_COMMAND_OK, "PQexec", sql, line))
270     return GNUNET_SYSERR;
271   PQclear (ret);
272   return GNUNET_OK;
273 }
274
275 /**
276  * Prepare SQL statement.
277  *
278  * @param plugin global context
279  * @param name name for the prepared SQL statement
280  * @param sql SQL code to prepare
281  * @param nparams number of parameters in sql
282  * @param line code line for error reporting
283  * @return GNUNET_OK on success
284  */
285 static int
286 pq_prepare (struct Plugin *plugin,
287             const char *name, const char *sql, int nparams, int line)
288 {
289   PGresult *ret;
290   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
291   if (GNUNET_OK !=
292       check_result (plugin, 
293                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
294     return GNUNET_SYSERR;
295   PQclear (ret);
296   return GNUNET_OK;
297 }
298
299 /**
300  * @brief Get a database handle
301  *
302  * @param plugin global context
303  * @return GNUNET_OK on success, GNUNET_SYSERR on error
304  */
305 static int
306 init_connection (struct Plugin *plugin)
307 {
308   char *conninfo;
309   PGresult *ret;
310
311   /* Open database and precompile statements */
312   conninfo = NULL;
313   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
314                                          "datastore-postgres",
315                                          "CONFIG",
316                                          &conninfo);
317   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
318   GNUNET_free_non_null (conninfo);
319   if (NULL == plugin->dbh)
320     {
321       /* FIXME: warn about out-of-memory? */
322       return GNUNET_SYSERR;
323     }
324   if (PQstatus (plugin->dbh) != CONNECTION_OK)
325     {
326       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
327                        "datastore-postgres",
328                        _("Unable to initialize Postgres: %s"),
329                        PQerrorMessage (plugin->dbh));
330       PQfinish (plugin->dbh);
331       plugin->dbh = NULL;
332       return GNUNET_SYSERR;
333     }
334   ret = PQexec (plugin->dbh,
335                 "CREATE TABLE gn090 ("
336                 "  type INTEGER NOT NULL DEFAULT 0,"
337                 "  prio INTEGER NOT NULL DEFAULT 0,"
338                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
339                 "  expire BIGINT NOT NULL DEFAULT 0,"
340                 "  hash BYTEA NOT NULL DEFAULT '',"
341                 "  vhash BYTEA NOT NULL DEFAULT '',"
342                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
343   if ( (ret == NULL) || 
344        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
345          (0 != strcmp ("42P07",    /* duplicate table */
346                        PQresultErrorField
347                        (ret,
348                         PG_DIAG_SQLSTATE)))))
349     {
350       check_result (plugin,
351                     ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
352       PQfinish (plugin->dbh);
353       plugin->dbh = NULL;
354       return GNUNET_SYSERR;
355     }
356   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
357     {
358       if ((GNUNET_OK !=
359            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
360           (GNUNET_OK !=
361            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
362                     __LINE__))
363           || (GNUNET_OK !=
364               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
365           || (GNUNET_OK !=
366               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
367           || (GNUNET_OK !=
368               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
369                        __LINE__))
370           || (GNUNET_OK !=
371               pq_exec
372               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
373                __LINE__))
374           || (GNUNET_OK !=
375               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
376                        __LINE__)))
377         {
378           PQclear (ret);
379           PQfinish (plugin->dbh);
380           plugin->dbh = NULL;
381           return GNUNET_SYSERR;
382         }
383     }
384   PQclear (ret);
385 #if 1
386   ret = PQexec (plugin->dbh,
387                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
388   if (GNUNET_OK != 
389       check_result (plugin,
390                     ret, PGRES_COMMAND_OK,
391                     "ALTER TABLE", "gn090", __LINE__))
392     {
393       PQfinish (plugin->dbh);
394       plugin->dbh = NULL;
395       return GNUNET_SYSERR;
396     }
397   PQclear (ret);
398   ret = PQexec (plugin->dbh,
399                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
400   if (GNUNET_OK !=
401       check_result (plugin,
402                     ret, PGRES_COMMAND_OK,
403                     "ALTER TABLE", "gn090", __LINE__))
404     {
405       PQfinish (plugin->dbh);
406       plugin->dbh = NULL;
407       return GNUNET_SYSERR;
408     }
409   PQclear (ret);
410   ret = PQexec (plugin->dbh,
411                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
412   if (GNUNET_OK !=
413       check_result (plugin,
414                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
415     {
416       PQfinish (plugin->dbh);
417       plugin->dbh = NULL;
418       return GNUNET_SYSERR;
419     }
420   PQclear (ret);
421 #endif
422   if ((GNUNET_OK !=
423        pq_prepare (plugin,
424                    "getvt",
425                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
426                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
427                    "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
428                    5,
429                    __LINE__)) ||
430       (GNUNET_OK !=
431        pq_prepare (plugin,
432                    "gett",
433                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
434                    "WHERE hash=$1 AND type=$2"
435                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
436                    4,
437                    __LINE__)) ||
438       (GNUNET_OK !=
439        pq_prepare (plugin,
440                    "getv",
441                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
442                    "WHERE hash=$1 AND vhash=$2"
443                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
444                    4,
445                    __LINE__)) ||
446       (GNUNET_OK !=
447        pq_prepare (plugin,
448                    "get",
449                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
450                    "WHERE hash=$1"
451                    "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
452                    3,
453                    __LINE__)) ||
454       (GNUNET_OK !=
455        pq_prepare (plugin,
456                    "put",
457                    "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
458                    "VALUES ($1, $2, $3, $4, $5, $6, $7)",
459                    8,
460                    __LINE__)) ||
461       (GNUNET_OK !=
462        pq_prepare (plugin,
463                    "update",
464                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
465                    "WHERE oid = $3",
466                    3,
467                    __LINE__)) ||
468       (GNUNET_OK !=
469        pq_prepare (plugin,
470                    "select_low_priority",
471                    SELECT_IT_LOW_PRIORITY,
472                    2,
473                    __LINE__)) ||
474       (GNUNET_OK !=
475        pq_prepare (plugin,
476                    "select_non_anonymous",
477                    SELECT_IT_NON_ANONYMOUS,
478                    2,
479                    __LINE__)) ||
480       (GNUNET_OK !=
481        pq_prepare (plugin,
482                    "select_expiration_time",
483                    SELECT_IT_EXPIRATION_TIME,
484                    2,
485                    __LINE__)) ||
486       (GNUNET_OK !=
487        pq_prepare (plugin,
488                    "select_migration_order",
489                    SELECT_IT_MIGRATION_ORDER,
490                    3,
491                    __LINE__)) ||
492       (GNUNET_OK !=
493        pq_prepare (plugin,
494                    "delrow",
495                    "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
496     {
497       PQfinish (plugin->dbh);
498       plugin->dbh = NULL;
499       return GNUNET_SYSERR;
500     }
501   return GNUNET_OK;
502 }
503
504
505 /**
506  * Delete the row identified by the given rowid (qid
507  * in postgres).
508  *
509  * @param plugin global context
510  * @param rowid which row to delete
511  * @return GNUNET_OK on success
512  */
513 static int
514 delete_by_rowid (struct Plugin *plugin,
515                  unsigned int rowid)
516 {
517   const char *paramValues[] = { (const char *) &rowid };
518   int paramLengths[] = { sizeof (rowid) };
519   const int paramFormats[] = { 1 };
520   PGresult *ret;
521
522   ret = PQexecPrepared (plugin->dbh,
523                         "delrow",
524                         1, paramValues, paramLengths, paramFormats, 1);
525   if (GNUNET_OK !=
526       check_result (plugin,
527                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
528                     __LINE__))
529     {
530       return GNUNET_SYSERR;
531     }
532   PQclear (ret);
533   return GNUNET_OK;
534 }
535
536
537 /**
538  * Get an estimate of how much space the database is
539  * currently using.
540  *
541  * @param cls our "struct Plugin*"
542  * @return number of bytes used on disk
543  */
544 static unsigned long long
545 postgres_plugin_get_size (void *cls)
546 {
547   struct Plugin *plugin = cls;
548   unsigned long long total;
549   PGresult *ret;
550
551   ret = PQexecParams (plugin->dbh,
552                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
553                       0, NULL, NULL, NULL, NULL, 1);
554   if (GNUNET_OK != check_result (plugin,
555                                  ret,
556                                  PGRES_TUPLES_OK,
557                                  "PQexecParams",
558                                  "get_size",
559                                  __LINE__))
560     {
561       return 0;
562     }
563   if ((PQntuples (ret) != 1) ||
564       (PQnfields (ret) != 1) ||
565       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
566     {
567       GNUNET_break (0);
568       PQclear (ret);
569       return 0;
570     }
571   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
572   PQclear (ret);
573   return total;
574 }
575
576
577 /**
578  * Store an item in the datastore.
579  *
580  * @param cls closure
581  * @param key key for the item
582  * @param size number of bytes in data
583  * @param data content stored
584  * @param type type of the content
585  * @param priority priority of the content
586  * @param anonymity anonymity-level for the content
587  * @param expiration expiration time for the content
588  * @param msg set to error message
589  * @return GNUNET_OK on success
590  */
591 static int
592 postgres_plugin_put (void *cls,
593                      const GNUNET_HashCode * key,
594                      uint32_t size,
595                      const void *data,
596                      enum GNUNET_BLOCK_Type type,
597                      uint32_t priority,
598                      uint32_t anonymity,
599                      struct GNUNET_TIME_Absolute expiration,
600                      char **msg)
601 {
602   struct Plugin *plugin = cls;
603   GNUNET_HashCode vhash;
604   PGresult *ret;
605   uint32_t btype = htonl (type);
606   uint32_t bprio = htonl (priority);
607   uint32_t banon = htonl (anonymity);
608   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
609   const char *paramValues[] = {
610     (const char *) &btype,
611     (const char *) &bprio,
612     (const char *) &banon,
613     (const char *) &bexpi,
614     (const char *) key,
615     (const char *) &vhash,
616     (const char *) data
617   };
618   int paramLengths[] = {
619     sizeof (btype),
620     sizeof (bprio),
621     sizeof (banon),
622     sizeof (bexpi),
623     sizeof (GNUNET_HashCode),
624     sizeof (GNUNET_HashCode),
625     size
626   };
627   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
628
629   GNUNET_CRYPTO_hash (data, size, &vhash);
630   ret = PQexecPrepared (plugin->dbh,
631                         "put", 7, paramValues, paramLengths, paramFormats, 1);
632   if (GNUNET_OK != check_result (plugin, ret,
633                                  PGRES_COMMAND_OK,
634                                  "PQexecPrepared", "put", __LINE__))
635     return GNUNET_SYSERR;
636   PQclear (ret);
637   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
638 #if DEBUG_POSTGRES
639   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
640                    "datastore-postgres",
641                    "Stored %u bytes in database\n",
642                    (unsigned int) size);
643 #endif
644   return GNUNET_OK;
645 }
646
647 /**
648  * Function invoked on behalf of a "PluginIterator"
649  * asking the database plugin to call the iterator
650  * with the next item.
651  *
652  * @param next_cls the 'struct NextRequestClosure'
653  * @param tc scheduler context
654  */
655 static void 
656 postgres_next_request_cont (void *next_cls,
657                             const struct GNUNET_SCHEDULER_TaskContext *tc)
658 {
659   struct NextRequestClosure *nrc = next_cls;
660   struct Plugin *plugin = nrc->plugin;
661   const int paramFormats[] = { 1, 1, 1, 1, 1 };
662   int iret;
663   PGresult *res;
664   enum GNUNET_BLOCK_Type type;
665   uint32_t anonymity;
666   uint32_t priority;
667   uint32_t size;
668   unsigned int rowid;
669   struct GNUNET_TIME_Absolute expiration_time;
670   GNUNET_HashCode key;
671
672   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
673   plugin->next_task_nc = NULL;
674   if ( (GNUNET_YES == nrc->end_it) ||
675        (nrc->count == nrc->total) )
676     {
677 #if DEBUG_POSTGRES
678       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
679                        "datastore-postgres",
680                        "Ending iteration (%s)\n",
681                        (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
682 #endif
683       nrc->iter (nrc->iter_cls, 
684                  NULL, NULL, 0, NULL, 0, 0, 0, 
685                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
686       GNUNET_free (nrc);
687       return;
688     }
689   
690   if (nrc->count == 0)
691     nrc->blimit_off = GNUNET_htonll (nrc->off);
692   else
693     nrc->blimit_off = GNUNET_htonll (0);
694   if (nrc->count + nrc->off == nrc->total)
695     nrc->blast_rowid = htonl (0); /* back to start */
696   
697   res = PQexecPrepared (plugin->dbh,
698                         nrc->pname,
699                         nrc->nparams,
700                         nrc->paramValues, 
701                         nrc->paramLengths,
702                         paramFormats, 1);
703   if (GNUNET_OK != check_result (plugin,
704                                  res,
705                                  PGRES_TUPLES_OK,
706                                  "PQexecPrepared",
707                                  nrc->pname,
708                                  __LINE__))
709     {
710 #if DEBUG_POSTGRES
711       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
712                        "datastore-postgres",
713                        "Ending iteration (postgres error)\n");
714 #endif
715       nrc->iter (nrc->iter_cls, 
716                  NULL, NULL, 0, NULL, 0, 0, 0, 
717                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
718       GNUNET_free (nrc);
719       return;
720     }
721
722   if (0 == PQntuples (res))
723     {
724       /* no result */
725 #if DEBUG_POSTGRES
726       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
727                        "datastore-postgres",
728                        "Ending iteration (no more results)\n");
729 #endif
730       nrc->iter (nrc->iter_cls, 
731                  NULL, NULL, 0, NULL, 0, 0, 0, 
732                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
733       PQclear (res);
734       GNUNET_free (nrc);
735       return; 
736     }
737   if ((1 != PQntuples (res)) ||
738       (7 != PQnfields (res)) ||
739       (sizeof (uint32_t) != PQfsize (res, 0)) ||
740       (sizeof (uint32_t) != PQfsize (res, 6)))
741     {
742       GNUNET_break (0);
743       nrc->iter (nrc->iter_cls, 
744                  NULL, NULL, 0, NULL, 0, 0, 0, 
745                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
746       PQclear (res);
747       GNUNET_free (nrc);
748       return;
749     }
750   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
751   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
752       (sizeof (uint32_t) != PQfsize (res, 1)) ||
753       (sizeof (uint32_t) != PQfsize (res, 2)) ||
754       (sizeof (uint64_t) != PQfsize (res, 3)) ||
755       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
756     {
757       GNUNET_break (0);
758       PQclear (res);
759       delete_by_rowid (plugin, rowid);
760       nrc->iter (nrc->iter_cls, 
761                  NULL, NULL, 0, NULL, 0, 0, 0, 
762                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
763       GNUNET_free (nrc);
764       return;
765     }
766
767   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
768   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
769   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
770   expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
771   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
772   size = PQgetlength (res, 0, 5);
773
774   nrc->blast_prio = htonl (priority);
775   nrc->blast_expire = GNUNET_htonll (expiration_time.value);
776   nrc->blast_rowid = htonl (rowid);
777   nrc->count++;
778
779 #if DEBUG_POSTGRES
780   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
781                    "datastore-postgres",
782                    "Found result of size %u bytes and type %u in database\n",
783                    (unsigned int) size,
784                    (unsigned int) type);
785 #endif
786   iret = nrc->iter (nrc->iter_cls,
787                     nrc,
788                     &key,
789                     size,
790                     PQgetvalue (res, 0, 5),
791                     (enum GNUNET_BLOCK_Type) type,
792                     priority,
793                     anonymity,
794                     expiration_time,
795                     rowid);
796   PQclear (res);
797   if (iret == GNUNET_SYSERR)
798     {
799 #if DEBUG_POSTGRES
800       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
801                        "datastore-postgres",
802                        "Ending iteration (client error)\n");
803 #endif
804       return;
805     }
806   if (iret == GNUNET_NO)
807     {
808       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
809         {
810 #if DEBUG_POSTGRES
811           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
812                            "datastore-postgres",
813                            "Deleting %u bytes from database\n",
814                            (unsigned int) size);
815 #endif
816           plugin->env->duc (plugin->env->cls,
817                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
818 #if DEBUG_POSTGRES
819           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
820                            "datastore-postgres",
821                            "Deleted %u bytes from database\n",
822                            (unsigned int) size);
823 #endif
824         }
825     }
826 }
827
828
829 /**
830  * Function invoked on behalf of a "PluginIterator"
831  * asking the database plugin to call the iterator
832  * with the next item.
833  *
834  * @param next_cls whatever argument was given
835  *        to the PluginIterator as "next_cls".
836  * @param end_it set to GNUNET_YES if we
837  *        should terminate the iteration early
838  *        (iterator should be still called once more
839  *         to signal the end of the iteration).
840  */
841 static void 
842 postgres_plugin_next_request (void *next_cls,
843                               int end_it)
844 {
845   struct NextRequestClosure *nrc = next_cls;
846
847   if (GNUNET_YES == end_it)
848     nrc->end_it = GNUNET_YES;
849   nrc->plugin->next_task_nc = nrc;
850   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
851                                                      &postgres_next_request_cont,
852                                                      nrc);
853 }
854
855
856 /**
857  * Update the priority for a particular key in the datastore.  If
858  * the expiration time in value is different than the time found in
859  * the datastore, the higher value should be kept.  For the
860  * anonymity level, the lower value is to be used.  The specified
861  * priority should be added to the existing priority, ignoring the
862  * priority in value.
863  *
864  * Note that it is possible for multiple values to match this put.
865  * In that case, all of the respective values are updated.
866  *
867  * @param cls our "struct Plugin*"
868  * @param uid unique identifier of the datum
869  * @param delta by how much should the priority
870  *     change?  If priority + delta < 0 the
871  *     priority should be set to 0 (never go
872  *     negative).
873  * @param expire new expiration time should be the
874  *     MAX of any existing expiration time and
875  *     this value
876  * @param msg set to error message
877  * @return GNUNET_OK on success
878  */
879 static int
880 postgres_plugin_update (void *cls,
881                         uint64_t uid,
882                         int delta, struct GNUNET_TIME_Absolute expire,
883                         char **msg)
884 {
885   struct Plugin *plugin = cls;
886   PGresult *ret;
887   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
888   uint32_t boid = htonl ( (uint32_t) uid);
889   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
890   const char *paramValues[] = {
891     (const char *) &bdelta,
892     (const char *) &bexpire,
893     (const char *) &boid,
894   };
895   int paramLengths[] = {
896     sizeof (bdelta),
897     sizeof (bexpire),
898     sizeof (boid),
899   };
900   const int paramFormats[] = { 1, 1, 1 };
901
902   ret = PQexecPrepared (plugin->dbh,
903                         "update",
904                         3, paramValues, paramLengths, paramFormats, 1);
905   if (GNUNET_OK != check_result (plugin,
906                                  ret,
907                                  PGRES_COMMAND_OK,
908                                  "PQexecPrepared", "update", __LINE__))
909     return GNUNET_SYSERR;
910   PQclear (ret);
911   return GNUNET_OK;
912 }
913
914
915 /**
916  * Call a method for each key in the database and
917  * call the callback method on it.
918  *
919  * @param plugin global context
920  * @param type entries of which type should be considered?
921  * @param is_asc ascending or descending iteration?
922  * @param iter_select which SELECT method should be used?
923  * @param iter maybe NULL (to just count); iter
924  *     should return GNUNET_SYSERR to abort the
925  *     iteration, GNUNET_NO to delete the entry and
926  *     continue and GNUNET_OK to continue iterating
927  * @param iter_cls closure for 'iter'
928  */
929 static void
930 postgres_iterate (struct Plugin *plugin,
931                   unsigned int type,
932                   int is_asc,
933                   unsigned int iter_select,
934                   PluginIterator iter, void *iter_cls)
935 {
936   struct NextRequestClosure *nrc;
937
938   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
939   nrc->count = UINT32_MAX;
940   nrc->plugin = plugin;
941   nrc->iter = iter;
942   nrc->iter_cls = iter_cls;
943   if (is_asc)
944     {
945       nrc->blast_prio = htonl (0);
946       nrc->blast_rowid = htonl (0);
947       nrc->blast_expire = htonl (0);
948     }
949   else
950     {
951       nrc->blast_prio = htonl (0x7FFFFFFFL);
952       nrc->blast_rowid = htonl (0xFFFFFFFF);
953       nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
954     }
955   switch (iter_select)
956     {
957     case 0:
958       nrc->pname = "select_low_priority";
959       nrc->nparams = 2;
960       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
961       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
962       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
963       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
964       break;
965     case 1:
966       nrc->pname = "select_non_anonymous";
967       nrc->nparams = 2;
968       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
969       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
970       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
971       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
972       break;
973     case 2:
974       nrc->pname = "select_expiration_time";
975       nrc->nparams = 2;
976       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
977       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
978       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
979       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
980       break;
981     case 3:
982       nrc->pname = "select_migration_order";
983       nrc->nparams = 3;
984       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
985       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
986       nrc->paramValues[2] = (const char *) &nrc->bnow;
987       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
988       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
989       nrc->paramLengths[2] = sizeof (nrc->bnow);
990       break;
991     default:
992       GNUNET_break (0);
993       iter (iter_cls, 
994             NULL, NULL, 0, NULL, 0, 0, 0, 
995             GNUNET_TIME_UNIT_ZERO_ABS, 0);
996       GNUNET_free (nrc);
997       return;
998     }
999   nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
1000   postgres_plugin_next_request (nrc,
1001                                 GNUNET_NO);
1002 }
1003
1004
1005 /**
1006  * Select a subset of the items in the datastore and call
1007  * the given iterator for each of them.
1008  *
1009  * @param cls our "struct Plugin*"
1010  * @param type entries of which type should be considered?
1011  *        Use 0 for any type.
1012  * @param iter function to call on each matching value;
1013  *        will be called once with a NULL value at the end
1014  * @param iter_cls closure for iter
1015  */
1016 static void
1017 postgres_plugin_iter_low_priority (void *cls,
1018                                    enum GNUNET_BLOCK_Type type,
1019                                    PluginIterator iter,
1020                                    void *iter_cls)
1021 {
1022   struct Plugin *plugin = cls;
1023   
1024   postgres_iterate (plugin,
1025                     type, 
1026                     GNUNET_YES, 0, 
1027                     iter, iter_cls);
1028 }
1029
1030
1031
1032
1033 /**
1034  * Iterate over the results for a particular key
1035  * in the datastore.
1036  *
1037  * @param cls closure
1038  * @param key maybe NULL (to match all entries)
1039  * @param vhash hash of the value, maybe NULL (to
1040  *        match all values that have the right key).
1041  *        Note that for DBlocks there is no difference
1042  *        betwen key and vhash, but for other blocks
1043  *        there may be!
1044  * @param type entries of which type are relevant?
1045  *     Use 0 for any type.
1046  * @param iter function to call on each matching value;
1047  *        will be called once with a NULL value at the end
1048  * @param iter_cls closure for iter
1049  */
1050 static void
1051 postgres_plugin_get (void *cls,
1052                      const GNUNET_HashCode * key,
1053                      const GNUNET_HashCode * vhash,
1054                      enum GNUNET_BLOCK_Type type,
1055                      PluginIterator iter, void *iter_cls)
1056 {
1057   struct Plugin *plugin = cls;
1058   struct NextRequestClosure *nrc;
1059   const int paramFormats[] = { 1, 1, 1, 1, 1 };
1060   PGresult *ret;
1061
1062   if (key == NULL)
1063     {
1064       postgres_plugin_iter_low_priority (plugin, type, 
1065                                          iter, iter_cls);
1066       return;
1067     }
1068   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1069   nrc->plugin = plugin;
1070   nrc->iter = iter;
1071   nrc->iter_cls = iter_cls;
1072   nrc->key = *key;
1073   if (vhash != NULL)
1074     nrc->vhash = *vhash;
1075   nrc->paramValues[0] = (const char*) &nrc->key;
1076   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
1077   nrc->btype = htonl (type);
1078   if (type != 0)
1079     {
1080       if (vhash != NULL)
1081         {
1082           nrc->paramValues[1] = (const char *) &nrc->vhash;
1083           nrc->paramLengths[1] = sizeof (nrc->vhash);
1084           nrc->paramValues[2] = (const char *) &nrc->btype;
1085           nrc->paramLengths[2] = sizeof (nrc->btype);
1086           nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
1087           nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
1088           nrc->paramValues[4] = (const char *) &nrc->blimit_off;
1089           nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1090           nrc->nparams = 5;
1091           nrc->pname = "getvt";
1092           ret = PQexecParams (plugin->dbh,
1093                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
1094                               3,
1095                               NULL,
1096                               nrc->paramValues, 
1097                               nrc->paramLengths,
1098                               paramFormats, 1);
1099         }
1100       else
1101         {
1102           nrc->paramValues[1] = (const char *) &nrc->btype;
1103           nrc->paramLengths[1] = sizeof (nrc->btype);
1104           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1105           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1106           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1107           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1108           nrc->nparams = 4;
1109           nrc->pname = "gett";
1110           ret = PQexecParams (plugin->dbh,
1111                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
1112                               2,
1113                               NULL,
1114                               nrc->paramValues, 
1115                               nrc->paramLengths, 
1116                               paramFormats, 1);
1117         }
1118     }
1119   else
1120     {
1121       if (vhash != NULL)
1122         {
1123           nrc->paramValues[1] = (const char *) &nrc->vhash;
1124           nrc->paramLengths[1] = sizeof (nrc->vhash);
1125           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1126           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1127           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1128           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1129           nrc->nparams = 4;
1130           nrc->pname = "getv";
1131           ret = PQexecParams (plugin->dbh,
1132                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
1133                               2,
1134                               NULL,
1135                               nrc->paramValues, 
1136                               nrc->paramLengths,
1137                               paramFormats, 1);
1138         }
1139       else
1140         {
1141           nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1142           nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1143           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1144           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1145           nrc->nparams = 3;
1146           nrc->pname = "get";
1147           ret = PQexecParams (plugin->dbh,
1148                               "SELECT count(*) FROM gn090 WHERE hash=$1",
1149                               1,
1150                               NULL,
1151                               nrc->paramValues, 
1152                               nrc->paramLengths,
1153                               paramFormats, 1);
1154         }
1155     }
1156   if (GNUNET_OK != check_result (plugin,
1157                                  ret,
1158                                  PGRES_TUPLES_OK,
1159                                  "PQexecParams",
1160                                  nrc->pname,
1161                                  __LINE__))
1162     {
1163       iter (iter_cls, 
1164             NULL, NULL, 0, NULL, 0, 0, 0, 
1165             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1166       GNUNET_free (nrc);
1167       return;
1168     }
1169   if ((PQntuples (ret) != 1) ||
1170       (PQnfields (ret) != 1) ||
1171       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1172     {
1173       GNUNET_break (0);
1174       PQclear (ret);
1175       iter (iter_cls, 
1176             NULL, NULL, 0, NULL, 0, 0, 0, 
1177             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1178       GNUNET_free (nrc);
1179       return;
1180     }
1181   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1182   PQclear (ret);
1183   if (nrc->total == 0)
1184     {
1185       iter (iter_cls, 
1186             NULL, NULL, 0, NULL, 0, 0, 0, 
1187             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1188       GNUNET_free (nrc);
1189       return;
1190     }
1191   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1192                                        nrc->total);
1193   postgres_plugin_next_request (nrc,
1194                                 GNUNET_NO);
1195 }
1196
1197
1198 /**
1199  * Select a subset of the items in the datastore and call
1200  * the given iterator for each of them.
1201  *
1202  * @param cls our "struct Plugin*"
1203  * @param type entries of which type should be considered?
1204  *        Use 0 for any type.
1205  * @param iter function to call on each matching value;
1206  *        will be called once with a NULL value at the end
1207  * @param iter_cls closure for iter
1208  */
1209 static void
1210 postgres_plugin_iter_zero_anonymity (void *cls,
1211                                      enum GNUNET_BLOCK_Type type,
1212                                      PluginIterator iter,
1213                                      void *iter_cls)
1214 {
1215   struct Plugin *plugin = cls;
1216
1217   postgres_iterate (plugin, 
1218                     type, GNUNET_NO, 1,
1219                     iter, iter_cls);
1220 }
1221
1222
1223 /**
1224  * Select a subset of the items in the datastore and call
1225  * the given iterator for each of them.
1226  *
1227  * @param cls our "struct Plugin*"
1228  * @param type entries of which type should be considered?
1229  *        Use 0 for any type.
1230  * @param iter function to call on each matching value;
1231  *        will be called once with a NULL value at the end
1232  * @param iter_cls closure for iter
1233  */
1234 static void
1235 postgres_plugin_iter_ascending_expiration (void *cls,
1236                                            enum GNUNET_BLOCK_Type type,
1237                                            PluginIterator iter,
1238                                            void *iter_cls)
1239 {
1240   struct Plugin *plugin = cls;
1241
1242   postgres_iterate (plugin, type, GNUNET_YES, 2,
1243                     iter, iter_cls);
1244 }
1245
1246
1247 /**
1248  * Select a subset of the items in the datastore and call
1249  * the given iterator for each of them.
1250  *
1251  * @param cls our "struct Plugin*"
1252  * @param type entries of which type should be considered?
1253  *        Use 0 for any type.
1254  * @param iter function to call on each matching value;
1255  *        will be called once with a NULL value at the end
1256  * @param iter_cls closure for iter
1257  */
1258 static void
1259 postgres_plugin_iter_migration_order (void *cls,
1260                                       enum GNUNET_BLOCK_Type type,
1261                                       PluginIterator iter,
1262                                       void *iter_cls)
1263 {
1264   struct Plugin *plugin = cls;
1265
1266   postgres_iterate (plugin, 0, GNUNET_NO, 3, 
1267                     iter, iter_cls);
1268 }
1269
1270
1271 /**
1272  * Select a subset of the items in the datastore and call
1273  * the given iterator for each of them.
1274  *
1275  * @param cls our "struct Plugin*"
1276  * @param type entries of which type should be considered?
1277  *        Use 0 for any type.
1278  * @param iter function to call on each matching value;
1279  *        will be called once with a NULL value at the end
1280  * @param iter_cls closure for iter
1281  */
1282 static void
1283 postgres_plugin_iter_all_now (void *cls,
1284                               enum GNUNET_BLOCK_Type type,
1285                               PluginIterator iter,
1286                               void *iter_cls)
1287 {
1288   struct Plugin *plugin = cls;
1289
1290   postgres_iterate (plugin, 
1291                     0, GNUNET_YES, 0, 
1292                     iter, iter_cls);
1293 }
1294
1295
1296 /**
1297  * Drop database.
1298  */
1299 static void 
1300 postgres_plugin_drop (void *cls)
1301 {
1302   struct Plugin *plugin = cls;
1303
1304   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1305 }
1306
1307
1308 /**
1309  * Entry point for the plugin.
1310  *
1311  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1312  * @return our "struct Plugin*"
1313  */
1314 void *
1315 libgnunet_plugin_datastore_postgres_init (void *cls)
1316 {
1317   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1318   struct GNUNET_DATASTORE_PluginFunctions *api;
1319   struct Plugin *plugin;
1320
1321   plugin = GNUNET_malloc (sizeof (struct Plugin));
1322   plugin->env = env;
1323   if (GNUNET_OK != init_connection (plugin))
1324     {
1325       GNUNET_free (plugin);
1326       return NULL;
1327     }
1328   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1329   api->cls = plugin;
1330   api->get_size = &postgres_plugin_get_size;
1331   api->put = &postgres_plugin_put;
1332   api->next_request = &postgres_plugin_next_request;
1333   api->get = &postgres_plugin_get;
1334   api->update = &postgres_plugin_update;
1335   api->iter_low_priority = &postgres_plugin_iter_low_priority;
1336   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1337   api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1338   api->iter_migration_order = &postgres_plugin_iter_migration_order;
1339   api->iter_all_now = &postgres_plugin_iter_all_now;
1340   api->drop = &postgres_plugin_drop;
1341   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1342                    "datastore-postgres",
1343                    _("Postgres database running\n"));
1344   return api;
1345 }
1346
1347
1348 /**
1349  * Exit point from the plugin.
1350  * @param cls our "struct Plugin*"
1351  * @return always NULL
1352  */
1353 void *
1354 libgnunet_plugin_datastore_postgres_done (void *cls)
1355 {
1356   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1357   struct Plugin *plugin = api->cls;
1358   
1359   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1360     {
1361       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1362                                plugin->next_task);
1363       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1364       GNUNET_free (plugin->next_task_nc);
1365       plugin->next_task_nc = NULL;
1366     }
1367   PQfinish (plugin->dbh);
1368   GNUNET_free (plugin);
1369   GNUNET_free (api);
1370   return NULL;
1371 }
1372
1373 /* end of plugin_datastore_postgres.c */