(no commit message)
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 #define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34                                "WHERE (prio = $1 AND oid > $2) "                        \
35                                "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36                                "UNION "\
37                                "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38                                "WHERE (prio > $1 AND oid != $2)"\
39                                "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40                                "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43                                 "WHERE (prio = $1 AND oid < $2)"\
44                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45                                 "UNION "\
46                                 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47                                 "WHERE (prio < $1 AND oid != $2)"\
48                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49                                 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52                                   "WHERE (expire = $1 AND oid > $2) "\
53                                   "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54                                   "UNION "\
55                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56                                   "WHERE (expire > $1 AND oid != $2) "          \
57                                   "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58                                   "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62                                   "WHERE (expire = $1 AND oid < $2)"\
63                                   " AND expire > $3 AND type!=3"\
64                                   " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65                                   "UNION "\
66                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67                                   "WHERE (expire < $1 AND oid != $2)"           \
68                                   " AND expire > $3 AND type!=3"\
69                                   " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70                                   "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72 /**
73  * After how many ms "busy" should a DB operation fail for good?
74  * A low value makes sure that we are more responsive to requests
75  * (especially PUTs).  A high value guarantees a higher success
76  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
77  *
78  * The default value of 1s should ensure that users do not experience
79  * huge latencies while at the same time allowing operations to succeed
80  * with reasonable probability.
81  */
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
83
84
85 /**
86  * Closure for 'postgres_next_request_cont'.
87  */
88 struct NextRequestClosure
89 {
90   /**
91    * Global plugin data.
92    */
93   struct Plugin *plugin;
94   
95   /**
96    * Function to call for each matching entry.
97    */
98   PluginIterator iter;
99   
100   /**
101    * Closure for 'iter'.
102    */
103   void *iter_cls;
104   
105   /**
106    * Parameters for the prepared statement.
107    */
108   const char *paramValues[5];
109   
110   /**
111    * Name of the prepared statement to run.
112    */
113   const char *pname;
114   
115   /**
116    * Size of values pointed to by paramValues.
117    */
118   int paramLengths[5];
119   
120   /**
121    * Number of paramters in paramValues/paramLengths.
122    */
123   int nparams; 
124   
125   /**
126    * Current time (possible parameter), big-endian.
127    */
128   uint64_t bnow;
129   
130   /**
131    * Key (possible parameter)
132    */
133   GNUNET_HashCode key;
134   
135   /**
136    * Hash of value (possible parameter)
137    */
138   GNUNET_HashCode vhash;
139   
140   /**
141    * Number of entries found so far
142    */
143   long long count;
144   
145   /**
146    * Offset this iteration starts at.
147    */
148   uint64_t off;
149   
150   /**
151    * Current offset to use in query, big-endian.
152    */
153   uint64_t blimit_off;
154   
155   /**
156    *  Overall number of matching entries.
157    */
158   unsigned long long total;
159   
160   /**
161    * Expiration value of previous result (possible parameter), big-endian.
162    */
163   uint64_t blast_expire;
164   
165   /**
166    * Row ID of last result (possible paramter), big-endian.
167    */
168   uint32_t blast_rowid;
169   
170   /**
171    * Priority of last result (possible parameter), big-endian.
172    */
173   uint32_t blast_prio;
174   
175   /**
176    * Type of block (possible paramter), big-endian.
177    */
178   uint32_t btype;
179   
180   /**
181    * Flag set to GNUNET_YES to stop iteration.
182    */
183   int end_it;
184 };
185
186
187 /**
188  * Context for all functions in this plugin.
189  */
190 struct Plugin 
191 {
192   /**
193    * Our execution environment.
194    */
195   struct GNUNET_DATASTORE_PluginEnvironment *env;
196
197   /**
198    * Native Postgres database handle.
199    */
200   PGconn *dbh;
201
202   /**
203    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
204    */
205   struct NextRequestClosure *next_task_nc;
206
207   /**
208    * Pending task with scheduler for running the next request.
209    */
210   GNUNET_SCHEDULER_TaskIdentifier next_task;
211
212 };
213
214
215 /**
216  * Check if the result obtained from Postgres has
217  * the desired status code.  If not, log an error, clear the
218  * result and return GNUNET_SYSERR.
219  * 
220  * @param plugin global context
221  * @param ret result to check
222  * @param expected_status expected return value
223  * @param command name of SQL command that was run
224  * @param args arguments to SQL command
225  * @param line line number for error reporting
226  * @return GNUNET_OK if the result is acceptable
227  */
228 static int
229 check_result (struct Plugin *plugin,
230               PGresult * ret,
231               int expected_status,
232               const char *command, const char *args, int line)
233 {
234   if (ret == NULL)
235     {
236       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
237                        "datastore-postgres",
238                        "Postgres failed to allocate result for `%s:%s' at %d\n",
239                        command, args, line);
240       return GNUNET_SYSERR;
241     }
242   if (PQresultStatus (ret) != expected_status)
243     {
244       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
245                        "datastore-postgres",
246                        _("`%s:%s' failed at %s:%d with error: %s"),
247                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
248       PQclear (ret);
249       return GNUNET_SYSERR;
250     }
251   return GNUNET_OK;
252 }
253
254 /**
255  * Run simple SQL statement (without results).
256  *
257  * @param plugin global context
258  * @param sql statement to run
259  * @param line code line for error reporting
260  */
261 static int
262 pq_exec (struct Plugin *plugin,
263          const char *sql, int line)
264 {
265   PGresult *ret;
266   ret = PQexec (plugin->dbh, sql);
267   if (GNUNET_OK != check_result (plugin,
268                                  ret, 
269                                  PGRES_COMMAND_OK, "PQexec", sql, line))
270     return GNUNET_SYSERR;
271   PQclear (ret);
272   return GNUNET_OK;
273 }
274
275 /**
276  * Prepare SQL statement.
277  *
278  * @param plugin global context
279  * @param name name for the prepared SQL statement
280  * @param sql SQL code to prepare
281  * @param nparams number of parameters in sql
282  * @param line code line for error reporting
283  * @return GNUNET_OK on success
284  */
285 static int
286 pq_prepare (struct Plugin *plugin,
287             const char *name, const char *sql, int nparams, int line)
288 {
289   PGresult *ret;
290   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
291   if (GNUNET_OK !=
292       check_result (plugin, 
293                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
294     return GNUNET_SYSERR;
295   PQclear (ret);
296   return GNUNET_OK;
297 }
298
299 /**
300  * @brief Get a database handle
301  *
302  * @param plugin global context
303  * @return GNUNET_OK on success, GNUNET_SYSERR on error
304  */
305 static int
306 init_connection (struct Plugin *plugin)
307 {
308   char *conninfo;
309   PGresult *ret;
310
311   /* Open database and precompile statements */
312   conninfo = NULL;
313   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
314                                          "datastore-postgres",
315                                          "CONFIG",
316                                          &conninfo);
317   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
318   if (NULL == plugin->dbh)
319     {
320       /* FIXME: warn about out-of-memory? */
321       GNUNET_free_non_null (conninfo);
322       return GNUNET_SYSERR;
323     }
324   if (PQstatus (plugin->dbh) != CONNECTION_OK)
325     {
326       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
327                        "datastore-postgres",
328                        _("Unable to initialize Postgres with configuration `%s': %s"),
329                        conninfo,
330                        PQerrorMessage (plugin->dbh));
331       PQfinish (plugin->dbh);
332       plugin->dbh = NULL;
333       GNUNET_free_non_null (conninfo);
334       return GNUNET_SYSERR;
335     }
336   GNUNET_free_non_null (conninfo);
337   ret = PQexec (plugin->dbh,
338                 "CREATE TABLE gn090 ("
339                 "  type INTEGER NOT NULL DEFAULT 0,"
340                 "  prio INTEGER NOT NULL DEFAULT 0,"
341                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
342                 "  expire BIGINT NOT NULL DEFAULT 0,"
343                 "  hash BYTEA NOT NULL DEFAULT '',"
344                 "  vhash BYTEA NOT NULL DEFAULT '',"
345                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
346   if ( (ret == NULL) || 
347        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
348          (0 != strcmp ("42P07",    /* duplicate table */
349                        PQresultErrorField
350                        (ret,
351                         PG_DIAG_SQLSTATE)))))
352     {
353       check_result (plugin,
354                     ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
355       PQfinish (plugin->dbh);
356       plugin->dbh = NULL;
357       return GNUNET_SYSERR;
358     }
359   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
360     {
361       if ((GNUNET_OK !=
362            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
363           (GNUNET_OK !=
364            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
365                     __LINE__))
366           || (GNUNET_OK !=
367               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
368           || (GNUNET_OK !=
369               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
370           || (GNUNET_OK !=
371               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
372                        __LINE__))
373           || (GNUNET_OK !=
374               pq_exec
375               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
376                __LINE__))
377           || (GNUNET_OK !=
378               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
379                        __LINE__)))
380         {
381           PQclear (ret);
382           PQfinish (plugin->dbh);
383           plugin->dbh = NULL;
384           return GNUNET_SYSERR;
385         }
386     }
387   PQclear (ret);
388 #if 1
389   ret = PQexec (plugin->dbh,
390                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
391   if (GNUNET_OK != 
392       check_result (plugin,
393                     ret, PGRES_COMMAND_OK,
394                     "ALTER TABLE", "gn090", __LINE__))
395     {
396       PQfinish (plugin->dbh);
397       plugin->dbh = NULL;
398       return GNUNET_SYSERR;
399     }
400   PQclear (ret);
401   ret = PQexec (plugin->dbh,
402                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
403   if (GNUNET_OK !=
404       check_result (plugin,
405                     ret, PGRES_COMMAND_OK,
406                     "ALTER TABLE", "gn090", __LINE__))
407     {
408       PQfinish (plugin->dbh);
409       plugin->dbh = NULL;
410       return GNUNET_SYSERR;
411     }
412   PQclear (ret);
413   ret = PQexec (plugin->dbh,
414                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
415   if (GNUNET_OK !=
416       check_result (plugin,
417                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
418     {
419       PQfinish (plugin->dbh);
420       plugin->dbh = NULL;
421       return GNUNET_SYSERR;
422     }
423   PQclear (ret);
424 #endif
425   if ((GNUNET_OK !=
426        pq_prepare (plugin,
427                    "getvt",
428                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
429                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
430                    "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
431                    5,
432                    __LINE__)) ||
433       (GNUNET_OK !=
434        pq_prepare (plugin,
435                    "gett",
436                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437                    "WHERE hash=$1 AND type=$2"
438                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
439                    4,
440                    __LINE__)) ||
441       (GNUNET_OK !=
442        pq_prepare (plugin,
443                    "getv",
444                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
445                    "WHERE hash=$1 AND vhash=$2"
446                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
447                    4,
448                    __LINE__)) ||
449       (GNUNET_OK !=
450        pq_prepare (plugin,
451                    "get",
452                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
453                    "WHERE hash=$1"
454                    "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
455                    3,
456                    __LINE__)) ||
457       (GNUNET_OK !=
458        pq_prepare (plugin,
459                    "put",
460                    "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
461                    "VALUES ($1, $2, $3, $4, $5, $6, $7)",
462                    8,
463                    __LINE__)) ||
464       (GNUNET_OK !=
465        pq_prepare (plugin,
466                    "update",
467                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
468                    "WHERE oid = $3",
469                    3,
470                    __LINE__)) ||
471       (GNUNET_OK !=
472        pq_prepare (plugin,
473                    "select_low_priority",
474                    SELECT_IT_LOW_PRIORITY,
475                    2,
476                    __LINE__)) ||
477       (GNUNET_OK !=
478        pq_prepare (plugin,
479                    "select_non_anonymous",
480                    SELECT_IT_NON_ANONYMOUS,
481                    2,
482                    __LINE__)) ||
483       (GNUNET_OK !=
484        pq_prepare (plugin,
485                    "select_expiration_time",
486                    SELECT_IT_EXPIRATION_TIME,
487                    2,
488                    __LINE__)) ||
489       (GNUNET_OK !=
490        pq_prepare (plugin,
491                    "select_migration_order",
492                    SELECT_IT_MIGRATION_ORDER,
493                    3,
494                    __LINE__)) ||
495       (GNUNET_OK !=
496        pq_prepare (plugin,
497                    "delrow",
498                    "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
499     {
500       PQfinish (plugin->dbh);
501       plugin->dbh = NULL;
502       return GNUNET_SYSERR;
503     }
504   return GNUNET_OK;
505 }
506
507
508 /**
509  * Delete the row identified by the given rowid (qid
510  * in postgres).
511  *
512  * @param plugin global context
513  * @param rowid which row to delete
514  * @return GNUNET_OK on success
515  */
516 static int
517 delete_by_rowid (struct Plugin *plugin,
518                  unsigned int rowid)
519 {
520   const char *paramValues[] = { (const char *) &rowid };
521   int paramLengths[] = { sizeof (rowid) };
522   const int paramFormats[] = { 1 };
523   PGresult *ret;
524
525   ret = PQexecPrepared (plugin->dbh,
526                         "delrow",
527                         1, paramValues, paramLengths, paramFormats, 1);
528   if (GNUNET_OK !=
529       check_result (plugin,
530                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
531                     __LINE__))
532     {
533       return GNUNET_SYSERR;
534     }
535   PQclear (ret);
536   return GNUNET_OK;
537 }
538
539
540 /**
541  * Get an estimate of how much space the database is
542  * currently using.
543  *
544  * @param cls our "struct Plugin*"
545  * @return number of bytes used on disk
546  */
547 static unsigned long long
548 postgres_plugin_get_size (void *cls)
549 {
550   struct Plugin *plugin = cls;
551   unsigned long long total;
552   PGresult *ret;
553
554   ret = PQexecParams (plugin->dbh,
555                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
556                       0, NULL, NULL, NULL, NULL, 1);
557   if (GNUNET_OK != check_result (plugin,
558                                  ret,
559                                  PGRES_TUPLES_OK,
560                                  "PQexecParams",
561                                  "get_size",
562                                  __LINE__))
563     {
564       return 0;
565     }
566   if ((PQntuples (ret) != 1) ||
567       (PQnfields (ret) != 1) ||
568       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
569     {
570       GNUNET_break (0);
571       PQclear (ret);
572       return 0;
573     }
574   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
575   PQclear (ret);
576   return total;
577 }
578
579
580 /**
581  * Store an item in the datastore.
582  *
583  * @param cls closure
584  * @param key key for the item
585  * @param size number of bytes in data
586  * @param data content stored
587  * @param type type of the content
588  * @param priority priority of the content
589  * @param anonymity anonymity-level for the content
590  * @param expiration expiration time for the content
591  * @param msg set to error message
592  * @return GNUNET_OK on success
593  */
594 static int
595 postgres_plugin_put (void *cls,
596                      const GNUNET_HashCode * key,
597                      uint32_t size,
598                      const void *data,
599                      enum GNUNET_BLOCK_Type type,
600                      uint32_t priority,
601                      uint32_t anonymity,
602                      struct GNUNET_TIME_Absolute expiration,
603                      char **msg)
604 {
605   struct Plugin *plugin = cls;
606   GNUNET_HashCode vhash;
607   PGresult *ret;
608   uint32_t btype = htonl (type);
609   uint32_t bprio = htonl (priority);
610   uint32_t banon = htonl (anonymity);
611   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
612   const char *paramValues[] = {
613     (const char *) &btype,
614     (const char *) &bprio,
615     (const char *) &banon,
616     (const char *) &bexpi,
617     (const char *) key,
618     (const char *) &vhash,
619     (const char *) data
620   };
621   int paramLengths[] = {
622     sizeof (btype),
623     sizeof (bprio),
624     sizeof (banon),
625     sizeof (bexpi),
626     sizeof (GNUNET_HashCode),
627     sizeof (GNUNET_HashCode),
628     size
629   };
630   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
631
632   GNUNET_CRYPTO_hash (data, size, &vhash);
633   ret = PQexecPrepared (plugin->dbh,
634                         "put", 7, paramValues, paramLengths, paramFormats, 1);
635   if (GNUNET_OK != check_result (plugin, ret,
636                                  PGRES_COMMAND_OK,
637                                  "PQexecPrepared", "put", __LINE__))
638     return GNUNET_SYSERR;
639   PQclear (ret);
640   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
641 #if DEBUG_POSTGRES
642   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
643                    "datastore-postgres",
644                    "Stored %u bytes in database\n",
645                    (unsigned int) size);
646 #endif
647   return GNUNET_OK;
648 }
649
650 /**
651  * Function invoked on behalf of a "PluginIterator"
652  * asking the database plugin to call the iterator
653  * with the next item.
654  *
655  * @param next_cls the 'struct NextRequestClosure'
656  * @param tc scheduler context
657  */
658 static void 
659 postgres_next_request_cont (void *next_cls,
660                             const struct GNUNET_SCHEDULER_TaskContext *tc)
661 {
662   struct NextRequestClosure *nrc = next_cls;
663   struct Plugin *plugin = nrc->plugin;
664   const int paramFormats[] = { 1, 1, 1, 1, 1 };
665   int iret;
666   PGresult *res;
667   enum GNUNET_BLOCK_Type type;
668   uint32_t anonymity;
669   uint32_t priority;
670   uint32_t size;
671   unsigned int rowid;
672   struct GNUNET_TIME_Absolute expiration_time;
673   GNUNET_HashCode key;
674
675   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
676   plugin->next_task_nc = NULL;
677   if ( (GNUNET_YES == nrc->end_it) ||
678        (nrc->count == nrc->total) )
679     {
680 #if DEBUG_POSTGRES
681       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
682                        "datastore-postgres",
683                        "Ending iteration (%s)\n",
684                        (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
685 #endif
686       nrc->iter (nrc->iter_cls, 
687                  NULL, NULL, 0, NULL, 0, 0, 0, 
688                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
689       GNUNET_free (nrc);
690       return;
691     }
692   
693   if (nrc->count == 0)
694     nrc->blimit_off = GNUNET_htonll (nrc->off);
695   else
696     nrc->blimit_off = GNUNET_htonll (0);
697   if (nrc->count + nrc->off == nrc->total)
698     nrc->blast_rowid = htonl (0); /* back to start */
699   
700   res = PQexecPrepared (plugin->dbh,
701                         nrc->pname,
702                         nrc->nparams,
703                         nrc->paramValues, 
704                         nrc->paramLengths,
705                         paramFormats, 1);
706   if (GNUNET_OK != check_result (plugin,
707                                  res,
708                                  PGRES_TUPLES_OK,
709                                  "PQexecPrepared",
710                                  nrc->pname,
711                                  __LINE__))
712     {
713 #if DEBUG_POSTGRES
714       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
715                        "datastore-postgres",
716                        "Ending iteration (postgres error)\n");
717 #endif
718       nrc->iter (nrc->iter_cls, 
719                  NULL, NULL, 0, NULL, 0, 0, 0, 
720                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
721       GNUNET_free (nrc);
722       return;
723     }
724
725   if (0 == PQntuples (res))
726     {
727       /* no result */
728 #if DEBUG_POSTGRES
729       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
730                        "datastore-postgres",
731                        "Ending iteration (no more results)\n");
732 #endif
733       nrc->iter (nrc->iter_cls, 
734                  NULL, NULL, 0, NULL, 0, 0, 0, 
735                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
736       PQclear (res);
737       GNUNET_free (nrc);
738       return; 
739     }
740   if ((1 != PQntuples (res)) ||
741       (7 != PQnfields (res)) ||
742       (sizeof (uint32_t) != PQfsize (res, 0)) ||
743       (sizeof (uint32_t) != PQfsize (res, 6)))
744     {
745       GNUNET_break (0);
746       nrc->iter (nrc->iter_cls, 
747                  NULL, NULL, 0, NULL, 0, 0, 0, 
748                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
749       PQclear (res);
750       GNUNET_free (nrc);
751       return;
752     }
753   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
754   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
755       (sizeof (uint32_t) != PQfsize (res, 1)) ||
756       (sizeof (uint32_t) != PQfsize (res, 2)) ||
757       (sizeof (uint64_t) != PQfsize (res, 3)) ||
758       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
759     {
760       GNUNET_break (0);
761       PQclear (res);
762       delete_by_rowid (plugin, rowid);
763       nrc->iter (nrc->iter_cls, 
764                  NULL, NULL, 0, NULL, 0, 0, 0, 
765                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
766       GNUNET_free (nrc);
767       return;
768     }
769
770   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
771   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
772   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
773   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
774   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
775   size = PQgetlength (res, 0, 5);
776
777   nrc->blast_prio = htonl (priority);
778   nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
779   nrc->blast_rowid = htonl (rowid);
780   nrc->count++;
781
782 #if DEBUG_POSTGRES
783   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
784                    "datastore-postgres",
785                    "Found result of size %u bytes and type %u in database\n",
786                    (unsigned int) size,
787                    (unsigned int) type);
788 #endif
789   iret = nrc->iter (nrc->iter_cls,
790                     nrc,
791                     &key,
792                     size,
793                     PQgetvalue (res, 0, 5),
794                     (enum GNUNET_BLOCK_Type) type,
795                     priority,
796                     anonymity,
797                     expiration_time,
798                     rowid);
799   PQclear (res);
800   if (iret == GNUNET_SYSERR)
801     {
802 #if DEBUG_POSTGRES
803       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
804                        "datastore-postgres",
805                        "Ending iteration (client error)\n");
806 #endif
807       return;
808     }
809   if (iret == GNUNET_NO)
810     {
811       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
812         {
813 #if DEBUG_POSTGRES
814           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
815                            "datastore-postgres",
816                            "Deleting %u bytes from database\n",
817                            (unsigned int) size);
818 #endif
819           plugin->env->duc (plugin->env->cls,
820                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
821 #if DEBUG_POSTGRES
822           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
823                            "datastore-postgres",
824                            "Deleted %u bytes from database\n",
825                            (unsigned int) size);
826 #endif
827         }
828     }
829 }
830
831
832 /**
833  * Function invoked on behalf of a "PluginIterator"
834  * asking the database plugin to call the iterator
835  * with the next item.
836  *
837  * @param next_cls whatever argument was given
838  *        to the PluginIterator as "next_cls".
839  * @param end_it set to GNUNET_YES if we
840  *        should terminate the iteration early
841  *        (iterator should be still called once more
842  *         to signal the end of the iteration).
843  */
844 static void 
845 postgres_plugin_next_request (void *next_cls,
846                               int end_it)
847 {
848   struct NextRequestClosure *nrc = next_cls;
849
850   if (GNUNET_YES == end_it)
851     nrc->end_it = GNUNET_YES;
852   nrc->plugin->next_task_nc = nrc;
853   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
854                                                      nrc);
855 }
856
857
858 /**
859  * Update the priority for a particular key in the datastore.  If
860  * the expiration time in value is different than the time found in
861  * the datastore, the higher value should be kept.  For the
862  * anonymity level, the lower value is to be used.  The specified
863  * priority should be added to the existing priority, ignoring the
864  * priority in value.
865  *
866  * Note that it is possible for multiple values to match this put.
867  * In that case, all of the respective values are updated.
868  *
869  * @param cls our "struct Plugin*"
870  * @param uid unique identifier of the datum
871  * @param delta by how much should the priority
872  *     change?  If priority + delta < 0 the
873  *     priority should be set to 0 (never go
874  *     negative).
875  * @param expire new expiration time should be the
876  *     MAX of any existing expiration time and
877  *     this value
878  * @param msg set to error message
879  * @return GNUNET_OK on success
880  */
881 static int
882 postgres_plugin_update (void *cls,
883                         uint64_t uid,
884                         int delta, struct GNUNET_TIME_Absolute expire,
885                         char **msg)
886 {
887   struct Plugin *plugin = cls;
888   PGresult *ret;
889   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
890   uint32_t boid = htonl ( (uint32_t) uid);
891   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
892   const char *paramValues[] = {
893     (const char *) &bdelta,
894     (const char *) &bexpire,
895     (const char *) &boid,
896   };
897   int paramLengths[] = {
898     sizeof (bdelta),
899     sizeof (bexpire),
900     sizeof (boid),
901   };
902   const int paramFormats[] = { 1, 1, 1 };
903
904   ret = PQexecPrepared (plugin->dbh,
905                         "update",
906                         3, paramValues, paramLengths, paramFormats, 1);
907   if (GNUNET_OK != check_result (plugin,
908                                  ret,
909                                  PGRES_COMMAND_OK,
910                                  "PQexecPrepared", "update", __LINE__))
911     return GNUNET_SYSERR;
912   PQclear (ret);
913   return GNUNET_OK;
914 }
915
916
917 /**
918  * Call a method for each key in the database and
919  * call the callback method on it.
920  *
921  * @param plugin global context
922  * @param type entries of which type should be considered?
923  * @param is_asc ascending or descending iteration?
924  * @param iter_select which SELECT method should be used?
925  * @param iter maybe NULL (to just count); iter
926  *     should return GNUNET_SYSERR to abort the
927  *     iteration, GNUNET_NO to delete the entry and
928  *     continue and GNUNET_OK to continue iterating
929  * @param iter_cls closure for 'iter'
930  */
931 static void
932 postgres_iterate (struct Plugin *plugin,
933                   unsigned int type,
934                   int is_asc,
935                   unsigned int iter_select,
936                   PluginIterator iter, void *iter_cls)
937 {
938   struct NextRequestClosure *nrc;
939
940   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
941   nrc->count = UINT32_MAX;
942   nrc->plugin = plugin;
943   nrc->iter = iter;
944   nrc->iter_cls = iter_cls;
945   if (is_asc)
946     {
947       nrc->blast_prio = htonl (0);
948       nrc->blast_rowid = htonl (0);
949       nrc->blast_expire = htonl (0);
950     }
951   else
952     {
953       nrc->blast_prio = htonl (0x7FFFFFFFL);
954       nrc->blast_rowid = htonl (0xFFFFFFFF);
955       nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
956     }
957   switch (iter_select)
958     {
959     case 0:
960       nrc->pname = "select_low_priority";
961       nrc->nparams = 2;
962       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
963       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
964       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
965       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
966       break;
967     case 1:
968       nrc->pname = "select_non_anonymous";
969       nrc->nparams = 2;
970       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
971       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
972       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
973       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
974       break;
975     case 2:
976       nrc->pname = "select_expiration_time";
977       nrc->nparams = 2;
978       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
979       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
980       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
981       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
982       break;
983     case 3:
984       nrc->pname = "select_migration_order";
985       nrc->nparams = 3;
986       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
987       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
988       nrc->paramValues[2] = (const char *) &nrc->bnow;
989       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
990       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
991       nrc->paramLengths[2] = sizeof (nrc->bnow);
992       break;
993     default:
994       GNUNET_break (0);
995       iter (iter_cls, 
996             NULL, NULL, 0, NULL, 0, 0, 0, 
997             GNUNET_TIME_UNIT_ZERO_ABS, 0);
998       GNUNET_free (nrc);
999       return;
1000     }
1001   nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
1002   postgres_plugin_next_request (nrc,
1003                                 GNUNET_NO);
1004 }
1005
1006
1007 /**
1008  * Select a subset of the items in the datastore and call
1009  * the given iterator for each of them.
1010  *
1011  * @param cls our "struct Plugin*"
1012  * @param type entries of which type should be considered?
1013  *        Use 0 for any type.
1014  * @param iter function to call on each matching value;
1015  *        will be called once with a NULL value at the end
1016  * @param iter_cls closure for iter
1017  */
1018 static void
1019 postgres_plugin_iter_low_priority (void *cls,
1020                                    enum GNUNET_BLOCK_Type type,
1021                                    PluginIterator iter,
1022                                    void *iter_cls)
1023 {
1024   struct Plugin *plugin = cls;
1025   
1026   postgres_iterate (plugin,
1027                     type, 
1028                     GNUNET_YES, 0, 
1029                     iter, iter_cls);
1030 }
1031
1032
1033
1034
1035 /**
1036  * Iterate over the results for a particular key
1037  * in the datastore.
1038  *
1039  * @param cls closure
1040  * @param key maybe NULL (to match all entries)
1041  * @param vhash hash of the value, maybe NULL (to
1042  *        match all values that have the right key).
1043  *        Note that for DBlocks there is no difference
1044  *        betwen key and vhash, but for other blocks
1045  *        there may be!
1046  * @param type entries of which type are relevant?
1047  *     Use 0 for any type.
1048  * @param iter function to call on each matching value;
1049  *        will be called once with a NULL value at the end
1050  * @param iter_cls closure for iter
1051  */
1052 static void
1053 postgres_plugin_get (void *cls,
1054                      const GNUNET_HashCode * key,
1055                      const GNUNET_HashCode * vhash,
1056                      enum GNUNET_BLOCK_Type type,
1057                      PluginIterator iter, void *iter_cls)
1058 {
1059   struct Plugin *plugin = cls;
1060   struct NextRequestClosure *nrc;
1061   const int paramFormats[] = { 1, 1, 1, 1, 1 };
1062   PGresult *ret;
1063
1064   if (key == NULL)
1065     {
1066       postgres_plugin_iter_low_priority (plugin, type, 
1067                                          iter, iter_cls);
1068       return;
1069     }
1070   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1071   nrc->plugin = plugin;
1072   nrc->iter = iter;
1073   nrc->iter_cls = iter_cls;
1074   nrc->key = *key;
1075   if (vhash != NULL)
1076     nrc->vhash = *vhash;
1077   nrc->paramValues[0] = (const char*) &nrc->key;
1078   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
1079   nrc->btype = htonl (type);
1080   if (type != 0)
1081     {
1082       if (vhash != NULL)
1083         {
1084           nrc->paramValues[1] = (const char *) &nrc->vhash;
1085           nrc->paramLengths[1] = sizeof (nrc->vhash);
1086           nrc->paramValues[2] = (const char *) &nrc->btype;
1087           nrc->paramLengths[2] = sizeof (nrc->btype);
1088           nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
1089           nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
1090           nrc->paramValues[4] = (const char *) &nrc->blimit_off;
1091           nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1092           nrc->nparams = 5;
1093           nrc->pname = "getvt";
1094           ret = PQexecParams (plugin->dbh,
1095                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
1096                               3,
1097                               NULL,
1098                               nrc->paramValues, 
1099                               nrc->paramLengths,
1100                               paramFormats, 1);
1101         }
1102       else
1103         {
1104           nrc->paramValues[1] = (const char *) &nrc->btype;
1105           nrc->paramLengths[1] = sizeof (nrc->btype);
1106           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1107           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1108           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1109           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1110           nrc->nparams = 4;
1111           nrc->pname = "gett";
1112           ret = PQexecParams (plugin->dbh,
1113                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
1114                               2,
1115                               NULL,
1116                               nrc->paramValues, 
1117                               nrc->paramLengths, 
1118                               paramFormats, 1);
1119         }
1120     }
1121   else
1122     {
1123       if (vhash != NULL)
1124         {
1125           nrc->paramValues[1] = (const char *) &nrc->vhash;
1126           nrc->paramLengths[1] = sizeof (nrc->vhash);
1127           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1128           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1129           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1130           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1131           nrc->nparams = 4;
1132           nrc->pname = "getv";
1133           ret = PQexecParams (plugin->dbh,
1134                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
1135                               2,
1136                               NULL,
1137                               nrc->paramValues, 
1138                               nrc->paramLengths,
1139                               paramFormats, 1);
1140         }
1141       else
1142         {
1143           nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1144           nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1145           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1146           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1147           nrc->nparams = 3;
1148           nrc->pname = "get";
1149           ret = PQexecParams (plugin->dbh,
1150                               "SELECT count(*) FROM gn090 WHERE hash=$1",
1151                               1,
1152                               NULL,
1153                               nrc->paramValues, 
1154                               nrc->paramLengths,
1155                               paramFormats, 1);
1156         }
1157     }
1158   if (GNUNET_OK != check_result (plugin,
1159                                  ret,
1160                                  PGRES_TUPLES_OK,
1161                                  "PQexecParams",
1162                                  nrc->pname,
1163                                  __LINE__))
1164     {
1165       iter (iter_cls, 
1166             NULL, NULL, 0, NULL, 0, 0, 0, 
1167             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1168       GNUNET_free (nrc);
1169       return;
1170     }
1171   if ((PQntuples (ret) != 1) ||
1172       (PQnfields (ret) != 1) ||
1173       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1174     {
1175       GNUNET_break (0);
1176       PQclear (ret);
1177       iter (iter_cls, 
1178             NULL, NULL, 0, NULL, 0, 0, 0, 
1179             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1180       GNUNET_free (nrc);
1181       return;
1182     }
1183   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1184   PQclear (ret);
1185   if (nrc->total == 0)
1186     {
1187       iter (iter_cls, 
1188             NULL, NULL, 0, NULL, 0, 0, 0, 
1189             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1190       GNUNET_free (nrc);
1191       return;
1192     }
1193   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1194                                        nrc->total);
1195   postgres_plugin_next_request (nrc,
1196                                 GNUNET_NO);
1197 }
1198
1199
1200 /**
1201  * Select a subset of the items in the datastore and call
1202  * the given iterator for each of them.
1203  *
1204  * @param cls our "struct Plugin*"
1205  * @param type entries of which type should be considered?
1206  *        Use 0 for any type.
1207  * @param iter function to call on each matching value;
1208  *        will be called once with a NULL value at the end
1209  * @param iter_cls closure for iter
1210  */
1211 static void
1212 postgres_plugin_iter_zero_anonymity (void *cls,
1213                                      enum GNUNET_BLOCK_Type type,
1214                                      PluginIterator iter,
1215                                      void *iter_cls)
1216 {
1217   struct Plugin *plugin = cls;
1218
1219   postgres_iterate (plugin, 
1220                     type, GNUNET_NO, 1,
1221                     iter, iter_cls);
1222 }
1223
1224
1225 /**
1226  * Select a subset of the items in the datastore and call
1227  * the given iterator for each of them.
1228  *
1229  * @param cls our "struct Plugin*"
1230  * @param type entries of which type should be considered?
1231  *        Use 0 for any type.
1232  * @param iter function to call on each matching value;
1233  *        will be called once with a NULL value at the end
1234  * @param iter_cls closure for iter
1235  */
1236 static void
1237 postgres_plugin_iter_ascending_expiration (void *cls,
1238                                            enum GNUNET_BLOCK_Type type,
1239                                            PluginIterator iter,
1240                                            void *iter_cls)
1241 {
1242   struct Plugin *plugin = cls;
1243
1244   postgres_iterate (plugin, type, GNUNET_YES, 2,
1245                     iter, iter_cls);
1246 }
1247
1248
1249 /**
1250  * Select a subset of the items in the datastore and call
1251  * the given iterator for each of them.
1252  *
1253  * @param cls our "struct Plugin*"
1254  * @param type entries of which type should be considered?
1255  *        Use 0 for any type.
1256  * @param iter function to call on each matching value;
1257  *        will be called once with a NULL value at the end
1258  * @param iter_cls closure for iter
1259  */
1260 static void
1261 postgres_plugin_iter_migration_order (void *cls,
1262                                       enum GNUNET_BLOCK_Type type,
1263                                       PluginIterator iter,
1264                                       void *iter_cls)
1265 {
1266   struct Plugin *plugin = cls;
1267
1268   postgres_iterate (plugin, 0, GNUNET_NO, 3, 
1269                     iter, iter_cls);
1270 }
1271
1272
1273 /**
1274  * Select a subset of the items in the datastore and call
1275  * the given iterator for each of them.
1276  *
1277  * @param cls our "struct Plugin*"
1278  * @param type entries of which type should be considered?
1279  *        Use 0 for any type.
1280  * @param iter function to call on each matching value;
1281  *        will be called once with a NULL value at the end
1282  * @param iter_cls closure for iter
1283  */
1284 static void
1285 postgres_plugin_iter_all_now (void *cls,
1286                               enum GNUNET_BLOCK_Type type,
1287                               PluginIterator iter,
1288                               void *iter_cls)
1289 {
1290   struct Plugin *plugin = cls;
1291
1292   postgres_iterate (plugin, 
1293                     0, GNUNET_YES, 0, 
1294                     iter, iter_cls);
1295 }
1296
1297
1298 /**
1299  * Drop database.
1300  */
1301 static void 
1302 postgres_plugin_drop (void *cls)
1303 {
1304   struct Plugin *plugin = cls;
1305
1306   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1307 }
1308
1309
1310 /**
1311  * Entry point for the plugin.
1312  *
1313  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1314  * @return our "struct Plugin*"
1315  */
1316 void *
1317 libgnunet_plugin_datastore_postgres_init (void *cls)
1318 {
1319   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1320   struct GNUNET_DATASTORE_PluginFunctions *api;
1321   struct Plugin *plugin;
1322
1323   plugin = GNUNET_malloc (sizeof (struct Plugin));
1324   plugin->env = env;
1325   if (GNUNET_OK != init_connection (plugin))
1326     {
1327       GNUNET_free (plugin);
1328       return NULL;
1329     }
1330   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1331   api->cls = plugin;
1332   api->get_size = &postgres_plugin_get_size;
1333   api->put = &postgres_plugin_put;
1334   api->next_request = &postgres_plugin_next_request;
1335   api->get = &postgres_plugin_get;
1336   api->update = &postgres_plugin_update;
1337   api->iter_low_priority = &postgres_plugin_iter_low_priority;
1338   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1339   api->iter_ascending_expiration = &postgres_plugin_iter_ascending_expiration;
1340   api->iter_migration_order = &postgres_plugin_iter_migration_order;
1341   api->iter_all_now = &postgres_plugin_iter_all_now;
1342   api->drop = &postgres_plugin_drop;
1343   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1344                    "datastore-postgres",
1345                    _("Postgres database running\n"));
1346   return api;
1347 }
1348
1349
1350 /**
1351  * Exit point from the plugin.
1352  * @param cls our "struct Plugin*"
1353  * @return always NULL
1354  */
1355 void *
1356 libgnunet_plugin_datastore_postgres_done (void *cls)
1357 {
1358   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1359   struct Plugin *plugin = api->cls;
1360   
1361   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1362     {
1363       GNUNET_SCHEDULER_cancel (plugin->next_task);
1364       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1365       GNUNET_free (plugin->next_task_nc);
1366       plugin->next_task_nc = NULL;
1367     }
1368   PQfinish (plugin->dbh);
1369   GNUNET_free (plugin);
1370   GNUNET_free (api);
1371   return NULL;
1372 }
1373
1374 /* end of plugin_datastore_postgres.c */