insanity
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 #define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34                                "WHERE (prio = $1 AND oid > $2) "                        \
35                                "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36                                "UNION "\
37                                "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38                                "WHERE (prio > $1 AND oid != $2)"\
39                                "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40                                "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42 #define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43                                 "WHERE (prio = $1 AND oid < $2)"\
44                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45                                 "UNION "\
46                                 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47                                 "WHERE (prio < $1 AND oid != $2)"\
48                                 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49                                 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51 #define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52                                   "WHERE (expire = $1 AND oid > $2) "\
53                                   "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54                                   "UNION "\
55                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56                                   "WHERE (expire > $1 AND oid != $2) "          \
57                                   "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58                                   "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61 #define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62                                   "WHERE (expire = $1 AND oid < $2)"\
63                                   " AND expire > $3 AND type!=3"\
64                                   " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65                                   "UNION "\
66                                   "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67                                   "WHERE (expire < $1 AND oid != $2)"           \
68                                   " AND expire > $3 AND type!=3"\
69                                   " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70                                   "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72 /**
73  * After how many ms "busy" should a DB operation fail for good?
74  * A low value makes sure that we are more responsive to requests
75  * (especially PUTs).  A high value guarantees a higher success
76  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
77  *
78  * The default value of 1s should ensure that users do not experience
79  * huge latencies while at the same time allowing operations to succeed
80  * with reasonable probability.
81  */
82 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
83
84
85 /**
86  * Closure for 'postgres_next_request_cont'.
87  */
88 struct NextRequestClosure
89 {
90   /**
91    * Global plugin data.
92    */
93   struct Plugin *plugin;
94   
95   /**
96    * Function to call for each matching entry.
97    */
98   PluginIterator iter;
99   
100   /**
101    * Closure for 'iter'.
102    */
103   void *iter_cls;
104   
105   /**
106    * Parameters for the prepared statement.
107    */
108   const char *paramValues[5];
109   
110   /**
111    * Name of the prepared statement to run.
112    */
113   const char *pname;
114   
115   /**
116    * Size of values pointed to by paramValues.
117    */
118   int paramLengths[5];
119   
120   /**
121    * Number of paramters in paramValues/paramLengths.
122    */
123   int nparams; 
124   
125   /**
126    * Current time (possible parameter), big-endian.
127    */
128   uint64_t bnow;
129   
130   /**
131    * Key (possible parameter)
132    */
133   GNUNET_HashCode key;
134   
135   /**
136    * Hash of value (possible parameter)
137    */
138   GNUNET_HashCode vhash;
139   
140   /**
141    * Number of entries found so far
142    */
143   long long count;
144   
145   /**
146    * Offset this iteration starts at.
147    */
148   uint64_t off;
149   
150   /**
151    * Current offset to use in query, big-endian.
152    */
153   uint64_t blimit_off;
154   
155   /**
156    *  Overall number of matching entries.
157    */
158   unsigned long long total;
159   
160   /**
161    * Expiration value of previous result (possible parameter), big-endian.
162    */
163   uint64_t blast_expire;
164   
165   /**
166    * Row ID of last result (possible paramter), big-endian.
167    */
168   uint32_t blast_rowid;
169   
170   /**
171    * Priority of last result (possible parameter), big-endian.
172    */
173   uint32_t blast_prio;
174   
175   /**
176    * Type of block (possible paramter), big-endian.
177    */
178   uint32_t btype;
179   
180   /**
181    * Flag set to GNUNET_YES to stop iteration.
182    */
183   int end_it;
184 };
185
186
187 /**
188  * Context for all functions in this plugin.
189  */
190 struct Plugin 
191 {
192   /**
193    * Our execution environment.
194    */
195   struct GNUNET_DATASTORE_PluginEnvironment *env;
196
197   /**
198    * Native Postgres database handle.
199    */
200   PGconn *dbh;
201
202   /**
203    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
204    */
205   struct NextRequestClosure *next_task_nc;
206
207   /**
208    * Pending task with scheduler for running the next request.
209    */
210   GNUNET_SCHEDULER_TaskIdentifier next_task;
211
212 };
213
214
215 /**
216  * Check if the result obtained from Postgres has
217  * the desired status code.  If not, log an error, clear the
218  * result and return GNUNET_SYSERR.
219  * 
220  * @param plugin global context
221  * @param ret result to check
222  * @param expected_status expected return value
223  * @param command name of SQL command that was run
224  * @param args arguments to SQL command
225  * @param line line number for error reporting
226  * @return GNUNET_OK if the result is acceptable
227  */
228 static int
229 check_result (struct Plugin *plugin,
230               PGresult * ret,
231               int expected_status,
232               const char *command, const char *args, int line)
233 {
234   if (ret == NULL)
235     {
236       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
237                        "datastore-postgres",
238                        "Postgres failed to allocate result for `%s:%s' at %d\n",
239                        command, args, line);
240       return GNUNET_SYSERR;
241     }
242   if (PQresultStatus (ret) != expected_status)
243     {
244       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
245                        "datastore-postgres",
246                        _("`%s:%s' failed at %s:%d with error: %s"),
247                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
248       PQclear (ret);
249       return GNUNET_SYSERR;
250     }
251   return GNUNET_OK;
252 }
253
254 /**
255  * Run simple SQL statement (without results).
256  *
257  * @param plugin global context
258  * @param sql statement to run
259  * @param line code line for error reporting
260  */
261 static int
262 pq_exec (struct Plugin *plugin,
263          const char *sql, int line)
264 {
265   PGresult *ret;
266   ret = PQexec (plugin->dbh, sql);
267   if (GNUNET_OK != check_result (plugin,
268                                  ret, 
269                                  PGRES_COMMAND_OK, "PQexec", sql, line))
270     return GNUNET_SYSERR;
271   PQclear (ret);
272   return GNUNET_OK;
273 }
274
275 /**
276  * Prepare SQL statement.
277  *
278  * @param plugin global context
279  * @param name name for the prepared SQL statement
280  * @param sql SQL code to prepare
281  * @param nparams number of parameters in sql
282  * @param line code line for error reporting
283  * @return GNUNET_OK on success
284  */
285 static int
286 pq_prepare (struct Plugin *plugin,
287             const char *name, const char *sql, int nparams, int line)
288 {
289   PGresult *ret;
290   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
291   if (GNUNET_OK !=
292       check_result (plugin, 
293                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
294     return GNUNET_SYSERR;
295   PQclear (ret);
296   return GNUNET_OK;
297 }
298
299 /**
300  * @brief Get a database handle
301  *
302  * @param plugin global context
303  * @return GNUNET_OK on success, GNUNET_SYSERR on error
304  */
305 static int
306 init_connection (struct Plugin *plugin)
307 {
308   char *conninfo;
309   PGresult *ret;
310
311   /* Open database and precompile statements */
312   conninfo = NULL;
313   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
314                                          "datastore-postgres",
315                                          "CONFIG",
316                                          &conninfo);
317   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
318   if (NULL == plugin->dbh)
319     {
320       /* FIXME: warn about out-of-memory? */
321       GNUNET_free_non_null (conninfo);
322       return GNUNET_SYSERR;
323     }
324   if (PQstatus (plugin->dbh) != CONNECTION_OK)
325     {
326       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
327                        "datastore-postgres",
328                        _("Unable to initialize Postgres with configuration `%s': %s"),
329                        conninfo,
330                        PQerrorMessage (plugin->dbh));
331       PQfinish (plugin->dbh);
332       plugin->dbh = NULL;
333       GNUNET_free_non_null (conninfo);
334       return GNUNET_SYSERR;
335     }
336   GNUNET_free_non_null (conninfo);
337   ret = PQexec (plugin->dbh,
338                 "CREATE TABLE gn090 ("
339                 "  type INTEGER NOT NULL DEFAULT 0,"
340                 "  prio INTEGER NOT NULL DEFAULT 0,"
341                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
342                 "  expire BIGINT NOT NULL DEFAULT 0,"
343                 "  hash BYTEA NOT NULL DEFAULT '',"
344                 "  vhash BYTEA NOT NULL DEFAULT '',"
345                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
346   if ( (ret == NULL) || 
347        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
348          (0 != strcmp ("42P07",    /* duplicate table */
349                        PQresultErrorField
350                        (ret,
351                         PG_DIAG_SQLSTATE)))))
352     {
353       (void) check_result (plugin,
354                            ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
355       PQfinish (plugin->dbh);
356       plugin->dbh = NULL;
357       return GNUNET_SYSERR;
358     }
359   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
360     {
361       if ((GNUNET_OK !=
362            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
363           (GNUNET_OK !=
364            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
365                     __LINE__))
366           || (GNUNET_OK !=
367               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
368           || (GNUNET_OK !=
369               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
370           || (GNUNET_OK !=
371               pq_exec (plugin, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)",
372                        __LINE__))
373           || (GNUNET_OK !=
374               pq_exec
375               (plugin, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
376                __LINE__))
377           || (GNUNET_OK !=
378               pq_exec (plugin, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)",
379                        __LINE__)))
380         {
381           PQclear (ret);
382           PQfinish (plugin->dbh);
383           plugin->dbh = NULL;
384           return GNUNET_SYSERR;
385         }
386     }
387   PQclear (ret);
388 #if 1
389   ret = PQexec (plugin->dbh,
390                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
391   if (GNUNET_OK != 
392       check_result (plugin,
393                     ret, PGRES_COMMAND_OK,
394                     "ALTER TABLE", "gn090", __LINE__))
395     {
396       PQfinish (plugin->dbh);
397       plugin->dbh = NULL;
398       return GNUNET_SYSERR;
399     }
400   PQclear (ret);
401   ret = PQexec (plugin->dbh,
402                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
403   if (GNUNET_OK !=
404       check_result (plugin,
405                     ret, PGRES_COMMAND_OK,
406                     "ALTER TABLE", "gn090", __LINE__))
407     {
408       PQfinish (plugin->dbh);
409       plugin->dbh = NULL;
410       return GNUNET_SYSERR;
411     }
412   PQclear (ret);
413   ret = PQexec (plugin->dbh,
414                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
415   if (GNUNET_OK !=
416       check_result (plugin,
417                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
418     {
419       PQfinish (plugin->dbh);
420       plugin->dbh = NULL;
421       return GNUNET_SYSERR;
422     }
423   PQclear (ret);
424 #endif
425   if ((GNUNET_OK !=
426        pq_prepare (plugin,
427                    "getvt",
428                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
429                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
430                    "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
431                    5,
432                    __LINE__)) ||
433       (GNUNET_OK !=
434        pq_prepare (plugin,
435                    "gett",
436                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437                    "WHERE hash=$1 AND type=$2"
438                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
439                    4,
440                    __LINE__)) ||
441       (GNUNET_OK !=
442        pq_prepare (plugin,
443                    "getv",
444                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
445                    "WHERE hash=$1 AND vhash=$2"
446                    "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
447                    4,
448                    __LINE__)) ||
449       (GNUNET_OK !=
450        pq_prepare (plugin,
451                    "get",
452                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
453                    "WHERE hash=$1"
454                    "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
455                    3,
456                    __LINE__)) ||
457       (GNUNET_OK !=
458        pq_prepare (plugin,
459                    "put",
460                    "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
461                    "VALUES ($1, $2, $3, $4, $5, $6, $7)",
462                    8,
463                    __LINE__)) ||
464       (GNUNET_OK !=
465        pq_prepare (plugin,
466                    "update",
467                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
468                    "WHERE oid = $3",
469                    3,
470                    __LINE__)) ||
471       (GNUNET_OK !=
472        pq_prepare (plugin,
473                    "select_low_priority",
474                    SELECT_IT_LOW_PRIORITY,
475                    2,
476                    __LINE__)) ||
477       (GNUNET_OK !=
478        pq_prepare (plugin,
479                    "select_non_anonymous",
480                    SELECT_IT_NON_ANONYMOUS,
481                    2,
482                    __LINE__)) ||
483       (GNUNET_OK !=
484        pq_prepare (plugin,
485                    "select_expiration_time",
486                    SELECT_IT_EXPIRATION_TIME,
487                    2,
488                    __LINE__)) ||
489       (GNUNET_OK !=
490        pq_prepare (plugin,
491                    "select_migration_order",
492                    SELECT_IT_MIGRATION_ORDER,
493                    3,
494                    __LINE__)) ||
495       (GNUNET_OK !=
496        pq_prepare (plugin,
497                    "delrow",
498                    "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
499     {
500       PQfinish (plugin->dbh);
501       plugin->dbh = NULL;
502       return GNUNET_SYSERR;
503     }
504   return GNUNET_OK;
505 }
506
507
508 /**
509  * Delete the row identified by the given rowid (qid
510  * in postgres).
511  *
512  * @param plugin global context
513  * @param rowid which row to delete
514  * @return GNUNET_OK on success
515  */
516 static int
517 delete_by_rowid (struct Plugin *plugin,
518                  unsigned int rowid)
519 {
520   const char *paramValues[] = { (const char *) &rowid };
521   int paramLengths[] = { sizeof (rowid) };
522   const int paramFormats[] = { 1 };
523   PGresult *ret;
524
525   ret = PQexecPrepared (plugin->dbh,
526                         "delrow",
527                         1, paramValues, paramLengths, paramFormats, 1);
528   if (GNUNET_OK !=
529       check_result (plugin,
530                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
531                     __LINE__))
532     {
533       return GNUNET_SYSERR;
534     }
535   PQclear (ret);
536   return GNUNET_OK;
537 }
538
539
540 /**
541  * Get an estimate of how much space the database is
542  * currently using.
543  *
544  * @param cls our "struct Plugin*"
545  * @return number of bytes used on disk
546  */
547 static unsigned long long
548 postgres_plugin_get_size (void *cls)
549 {
550   struct Plugin *plugin = cls;
551   unsigned long long total;
552   PGresult *ret;
553
554   ret = PQexecParams (plugin->dbh,
555                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
556                       0, NULL, NULL, NULL, NULL, 1);
557   if (GNUNET_OK != check_result (plugin,
558                                  ret,
559                                  PGRES_TUPLES_OK,
560                                  "PQexecParams",
561                                  "get_size",
562                                  __LINE__))
563     {
564       return 0;
565     }
566   if ((PQntuples (ret) != 1) ||
567       (PQnfields (ret) != 1) ||
568       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
569     {
570       GNUNET_break (0);
571       PQclear (ret);
572       return 0;
573     }
574   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
575   PQclear (ret);
576   return total;
577 }
578
579
580 /**
581  * Store an item in the datastore.
582  *
583  * @param cls closure
584  * @param key key for the item
585  * @param size number of bytes in data
586  * @param data content stored
587  * @param type type of the content
588  * @param priority priority of the content
589  * @param anonymity anonymity-level for the content
590  * @param replication replication-level for the content
591  * @param expiration expiration time for the content
592  * @param msg set to error message
593  * @return GNUNET_OK on success
594  */
595 static int
596 postgres_plugin_put (void *cls,
597                      const GNUNET_HashCode * key,
598                      uint32_t size,
599                      const void *data,
600                      enum GNUNET_BLOCK_Type type,
601                      uint32_t priority,
602                      uint32_t anonymity,
603                      uint32_t replication,
604                      struct GNUNET_TIME_Absolute expiration,
605                      char **msg)
606 {
607   struct Plugin *plugin = cls;
608   GNUNET_HashCode vhash;
609   PGresult *ret;
610   uint32_t btype = htonl (type);
611   uint32_t bprio = htonl (priority);
612   uint32_t banon = htonl (anonymity);
613   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
614   const char *paramValues[] = {
615     (const char *) &btype,
616     (const char *) &bprio,
617     (const char *) &banon,
618     (const char *) &bexpi,
619     (const char *) key,
620     (const char *) &vhash,
621     (const char *) data
622   };
623   int paramLengths[] = {
624     sizeof (btype),
625     sizeof (bprio),
626     sizeof (banon),
627     sizeof (bexpi),
628     sizeof (GNUNET_HashCode),
629     sizeof (GNUNET_HashCode),
630     size
631   };
632   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
633
634   GNUNET_CRYPTO_hash (data, size, &vhash);
635   ret = PQexecPrepared (plugin->dbh,
636                         "put", 7, paramValues, paramLengths, paramFormats, 1);
637   if (GNUNET_OK != check_result (plugin, ret,
638                                  PGRES_COMMAND_OK,
639                                  "PQexecPrepared", "put", __LINE__))
640     return GNUNET_SYSERR;
641   PQclear (ret);
642   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
643 #if DEBUG_POSTGRES
644   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
645                    "datastore-postgres",
646                    "Stored %u bytes in database\n",
647                    (unsigned int) size);
648 #endif
649   return GNUNET_OK;
650 }
651
652 /**
653  * Function invoked on behalf of a "PluginIterator"
654  * asking the database plugin to call the iterator
655  * with the next item.
656  *
657  * @param next_cls the 'struct NextRequestClosure'
658  * @param tc scheduler context
659  */
660 static void 
661 postgres_next_request_cont (void *next_cls,
662                             const struct GNUNET_SCHEDULER_TaskContext *tc)
663 {
664   struct NextRequestClosure *nrc = next_cls;
665   struct Plugin *plugin = nrc->plugin;
666   const int paramFormats[] = { 1, 1, 1, 1, 1 };
667   int iret;
668   PGresult *res;
669   enum GNUNET_BLOCK_Type type;
670   uint32_t anonymity;
671   uint32_t priority;
672   uint32_t size;
673   unsigned int rowid;
674   struct GNUNET_TIME_Absolute expiration_time;
675   GNUNET_HashCode key;
676
677   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
678   plugin->next_task_nc = NULL;
679   if ( (GNUNET_YES == nrc->end_it) ||
680        (nrc->count == nrc->total) )
681     {
682 #if DEBUG_POSTGRES
683       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
684                        "datastore-postgres",
685                        "Ending iteration (%s)\n",
686                        (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
687 #endif
688       nrc->iter (nrc->iter_cls, 
689                  NULL, NULL, 0, NULL, 0, 0, 0, 
690                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
691       GNUNET_free (nrc);
692       return;
693     }
694   
695   if (nrc->count == 0)
696     nrc->blimit_off = GNUNET_htonll (nrc->off);
697   else
698     nrc->blimit_off = GNUNET_htonll (0);
699   if (nrc->count + nrc->off == nrc->total)
700     nrc->blast_rowid = htonl (0); /* back to start */
701   
702   res = PQexecPrepared (plugin->dbh,
703                         nrc->pname,
704                         nrc->nparams,
705                         nrc->paramValues, 
706                         nrc->paramLengths,
707                         paramFormats, 1);
708   if (GNUNET_OK != check_result (plugin,
709                                  res,
710                                  PGRES_TUPLES_OK,
711                                  "PQexecPrepared",
712                                  nrc->pname,
713                                  __LINE__))
714     {
715 #if DEBUG_POSTGRES
716       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
717                        "datastore-postgres",
718                        "Ending iteration (postgres error)\n");
719 #endif
720       nrc->iter (nrc->iter_cls, 
721                  NULL, NULL, 0, NULL, 0, 0, 0, 
722                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
723       GNUNET_free (nrc);
724       return;
725     }
726
727   if (0 == PQntuples (res))
728     {
729       /* no result */
730 #if DEBUG_POSTGRES
731       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
732                        "datastore-postgres",
733                        "Ending iteration (no more results)\n");
734 #endif
735       nrc->iter (nrc->iter_cls, 
736                  NULL, NULL, 0, NULL, 0, 0, 0, 
737                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
738       PQclear (res);
739       GNUNET_free (nrc);
740       return; 
741     }
742   if ((1 != PQntuples (res)) ||
743       (7 != PQnfields (res)) ||
744       (sizeof (uint32_t) != PQfsize (res, 0)) ||
745       (sizeof (uint32_t) != PQfsize (res, 6)))
746     {
747       GNUNET_break (0);
748       nrc->iter (nrc->iter_cls, 
749                  NULL, NULL, 0, NULL, 0, 0, 0, 
750                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
751       PQclear (res);
752       GNUNET_free (nrc);
753       return;
754     }
755   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
756   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
757       (sizeof (uint32_t) != PQfsize (res, 1)) ||
758       (sizeof (uint32_t) != PQfsize (res, 2)) ||
759       (sizeof (uint64_t) != PQfsize (res, 3)) ||
760       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
761     {
762       GNUNET_break (0);
763       PQclear (res);
764       delete_by_rowid (plugin, rowid);
765       nrc->iter (nrc->iter_cls, 
766                  NULL, NULL, 0, NULL, 0, 0, 0, 
767                  GNUNET_TIME_UNIT_ZERO_ABS, 0);
768       GNUNET_free (nrc);
769       return;
770     }
771
772   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
773   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
774   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
775   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
776   memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
777   size = PQgetlength (res, 0, 5);
778
779   nrc->blast_prio = htonl (priority);
780   nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
781   nrc->blast_rowid = htonl (rowid);
782   nrc->count++;
783
784 #if DEBUG_POSTGRES
785   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
786                    "datastore-postgres",
787                    "Found result of size %u bytes and type %u in database\n",
788                    (unsigned int) size,
789                    (unsigned int) type);
790 #endif
791   iret = nrc->iter (nrc->iter_cls,
792                     nrc,
793                     &key,
794                     size,
795                     PQgetvalue (res, 0, 5),
796                     (enum GNUNET_BLOCK_Type) type,
797                     priority,
798                     anonymity,
799                     expiration_time,
800                     rowid);
801   PQclear (res);
802   if (iret == GNUNET_SYSERR)
803     {
804 #if DEBUG_POSTGRES
805       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
806                        "datastore-postgres",
807                        "Ending iteration (client error)\n");
808 #endif
809       return;
810     }
811   if (iret == GNUNET_NO)
812     {
813       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
814         {
815 #if DEBUG_POSTGRES
816           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
817                            "datastore-postgres",
818                            "Deleting %u bytes from database\n",
819                            (unsigned int) size);
820 #endif
821           plugin->env->duc (plugin->env->cls,
822                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
823 #if DEBUG_POSTGRES
824           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
825                            "datastore-postgres",
826                            "Deleted %u bytes from database\n",
827                            (unsigned int) size);
828 #endif
829         }
830     }
831 }
832
833
834 /**
835  * Function invoked on behalf of a "PluginIterator"
836  * asking the database plugin to call the iterator
837  * with the next item.
838  *
839  * @param next_cls whatever argument was given
840  *        to the PluginIterator as "next_cls".
841  * @param end_it set to GNUNET_YES if we
842  *        should terminate the iteration early
843  *        (iterator should be still called once more
844  *         to signal the end of the iteration).
845  */
846 static void 
847 postgres_plugin_next_request (void *next_cls,
848                               int end_it)
849 {
850   struct NextRequestClosure *nrc = next_cls;
851
852   if (GNUNET_YES == end_it)
853     nrc->end_it = GNUNET_YES;
854   nrc->plugin->next_task_nc = nrc;
855   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
856                                                      nrc);
857 }
858
859
860 /**
861  * Update the priority for a particular key in the datastore.  If
862  * the expiration time in value is different than the time found in
863  * the datastore, the higher value should be kept.  For the
864  * anonymity level, the lower value is to be used.  The specified
865  * priority should be added to the existing priority, ignoring the
866  * priority in value.
867  *
868  * Note that it is possible for multiple values to match this put.
869  * In that case, all of the respective values are updated.
870  *
871  * @param cls our "struct Plugin*"
872  * @param uid unique identifier of the datum
873  * @param delta by how much should the priority
874  *     change?  If priority + delta < 0 the
875  *     priority should be set to 0 (never go
876  *     negative).
877  * @param expire new expiration time should be the
878  *     MAX of any existing expiration time and
879  *     this value
880  * @param msg set to error message
881  * @return GNUNET_OK on success
882  */
883 static int
884 postgres_plugin_update (void *cls,
885                         uint64_t uid,
886                         int delta, struct GNUNET_TIME_Absolute expire,
887                         char **msg)
888 {
889   struct Plugin *plugin = cls;
890   PGresult *ret;
891   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
892   uint32_t boid = htonl ( (uint32_t) uid);
893   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
894   const char *paramValues[] = {
895     (const char *) &bdelta,
896     (const char *) &bexpire,
897     (const char *) &boid,
898   };
899   int paramLengths[] = {
900     sizeof (bdelta),
901     sizeof (bexpire),
902     sizeof (boid),
903   };
904   const int paramFormats[] = { 1, 1, 1 };
905
906   ret = PQexecPrepared (plugin->dbh,
907                         "update",
908                         3, paramValues, paramLengths, paramFormats, 1);
909   if (GNUNET_OK != check_result (plugin,
910                                  ret,
911                                  PGRES_COMMAND_OK,
912                                  "PQexecPrepared", "update", __LINE__))
913     return GNUNET_SYSERR;
914   PQclear (ret);
915   return GNUNET_OK;
916 }
917
918
919 /**
920  * Call a method for each key in the database and
921  * call the callback method on it.
922  *
923  * @param plugin global context
924  * @param type entries of which type should be considered?
925  * @param is_asc ascending or descending iteration?
926  * @param iter_select which SELECT method should be used?
927  * @param iter maybe NULL (to just count); iter
928  *     should return GNUNET_SYSERR to abort the
929  *     iteration, GNUNET_NO to delete the entry and
930  *     continue and GNUNET_OK to continue iterating
931  * @param iter_cls closure for 'iter'
932  */
933 static void
934 postgres_iterate (struct Plugin *plugin,
935                   unsigned int type,
936                   int is_asc,
937                   unsigned int iter_select,
938                   PluginIterator iter, void *iter_cls)
939 {
940   struct NextRequestClosure *nrc;
941
942   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
943   nrc->count = UINT32_MAX;
944   nrc->plugin = plugin;
945   nrc->iter = iter;
946   nrc->iter_cls = iter_cls;
947   if (is_asc)
948     {
949       nrc->blast_prio = htonl (0);
950       nrc->blast_rowid = htonl (0);
951       nrc->blast_expire = htonl (0);
952     }
953   else
954     {
955       nrc->blast_prio = htonl (0x7FFFFFFFL);
956       nrc->blast_rowid = htonl (0xFFFFFFFF);
957       nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
958     }
959   switch (iter_select)
960     {
961     case 0:
962       nrc->pname = "select_low_priority";
963       nrc->nparams = 2;
964       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
965       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
966       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
967       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
968       break;
969     case 1:
970       nrc->pname = "select_non_anonymous";
971       nrc->nparams = 2;
972       nrc->paramValues[0] = (const char *) &nrc->blast_prio;
973       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
974       nrc->paramLengths[0] = sizeof (nrc->blast_prio);
975       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
976       break;
977     case 2:
978       nrc->pname = "select_expiration_time";
979       nrc->nparams = 2;
980       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
981       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
982       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
983       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
984       break;
985     case 3:
986       nrc->pname = "select_migration_order";
987       nrc->nparams = 3;
988       nrc->paramValues[0] = (const char *) &nrc->blast_expire;
989       nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
990       nrc->paramValues[2] = (const char *) &nrc->bnow;
991       nrc->paramLengths[0] = sizeof (nrc->blast_expire);
992       nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
993       nrc->paramLengths[2] = sizeof (nrc->bnow);
994       break;
995     default:
996       GNUNET_break (0);
997       iter (iter_cls, 
998             NULL, NULL, 0, NULL, 0, 0, 0, 
999             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1000       GNUNET_free (nrc);
1001       return;
1002     }
1003   nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
1004   postgres_plugin_next_request (nrc,
1005                                 GNUNET_NO);
1006 }
1007
1008
1009 /**
1010  * Select a subset of the items in the datastore and call
1011  * the given iterator for each of them.
1012  *
1013  * @param cls our "struct Plugin*"
1014  * @param type entries of which type should be considered?
1015  *        Use 0 for any type.
1016  * @param iter function to call on each matching value;
1017  *        will be called once with a NULL value at the end
1018  * @param iter_cls closure for iter
1019  */
1020 static void
1021 postgres_plugin_iter_low_priority (void *cls,
1022                                    enum GNUNET_BLOCK_Type type,
1023                                    PluginIterator iter,
1024                                    void *iter_cls)
1025 {
1026   struct Plugin *plugin = cls;
1027   
1028   postgres_iterate (plugin,
1029                     type, 
1030                     GNUNET_YES, 0, 
1031                     iter, iter_cls);
1032 }
1033
1034
1035
1036
1037 /**
1038  * Iterate over the results for a particular key
1039  * in the datastore.
1040  *
1041  * @param cls closure
1042  * @param key maybe NULL (to match all entries)
1043  * @param vhash hash of the value, maybe NULL (to
1044  *        match all values that have the right key).
1045  *        Note that for DBlocks there is no difference
1046  *        betwen key and vhash, but for other blocks
1047  *        there may be!
1048  * @param type entries of which type are relevant?
1049  *     Use 0 for any type.
1050  * @param iter function to call on each matching value;
1051  *        will be called once with a NULL value at the end
1052  * @param iter_cls closure for iter
1053  */
1054 static void
1055 postgres_plugin_get (void *cls,
1056                      const GNUNET_HashCode * key,
1057                      const GNUNET_HashCode * vhash,
1058                      enum GNUNET_BLOCK_Type type,
1059                      PluginIterator iter, void *iter_cls)
1060 {
1061   struct Plugin *plugin = cls;
1062   struct NextRequestClosure *nrc;
1063   const int paramFormats[] = { 1, 1, 1, 1, 1 };
1064   PGresult *ret;
1065
1066   if (key == NULL)
1067     {
1068       postgres_plugin_iter_low_priority (plugin, type, 
1069                                          iter, iter_cls);
1070       return;
1071     }
1072   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1073   nrc->plugin = plugin;
1074   nrc->iter = iter;
1075   nrc->iter_cls = iter_cls;
1076   nrc->key = *key;
1077   if (vhash != NULL)
1078     nrc->vhash = *vhash;
1079   nrc->paramValues[0] = (const char*) &nrc->key;
1080   nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
1081   nrc->btype = htonl (type);
1082   if (type != 0)
1083     {
1084       if (vhash != NULL)
1085         {
1086           nrc->paramValues[1] = (const char *) &nrc->vhash;
1087           nrc->paramLengths[1] = sizeof (nrc->vhash);
1088           nrc->paramValues[2] = (const char *) &nrc->btype;
1089           nrc->paramLengths[2] = sizeof (nrc->btype);
1090           nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
1091           nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
1092           nrc->paramValues[4] = (const char *) &nrc->blimit_off;
1093           nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1094           nrc->nparams = 5;
1095           nrc->pname = "getvt";
1096           ret = PQexecParams (plugin->dbh,
1097                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
1098                               3,
1099                               NULL,
1100                               nrc->paramValues, 
1101                               nrc->paramLengths,
1102                               paramFormats, 1);
1103         }
1104       else
1105         {
1106           nrc->paramValues[1] = (const char *) &nrc->btype;
1107           nrc->paramLengths[1] = sizeof (nrc->btype);
1108           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1109           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1110           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1111           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1112           nrc->nparams = 4;
1113           nrc->pname = "gett";
1114           ret = PQexecParams (plugin->dbh,
1115                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
1116                               2,
1117                               NULL,
1118                               nrc->paramValues, 
1119                               nrc->paramLengths, 
1120                               paramFormats, 1);
1121         }
1122     }
1123   else
1124     {
1125       if (vhash != NULL)
1126         {
1127           nrc->paramValues[1] = (const char *) &nrc->vhash;
1128           nrc->paramLengths[1] = sizeof (nrc->vhash);
1129           nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
1130           nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
1131           nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1132           nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1133           nrc->nparams = 4;
1134           nrc->pname = "getv";
1135           ret = PQexecParams (plugin->dbh,
1136                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
1137                               2,
1138                               NULL,
1139                               nrc->paramValues, 
1140                               nrc->paramLengths,
1141                               paramFormats, 1);
1142         }
1143       else
1144         {
1145           nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
1146           nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
1147           nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1148           nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1149           nrc->nparams = 3;
1150           nrc->pname = "get";
1151           ret = PQexecParams (plugin->dbh,
1152                               "SELECT count(*) FROM gn090 WHERE hash=$1",
1153                               1,
1154                               NULL,
1155                               nrc->paramValues, 
1156                               nrc->paramLengths,
1157                               paramFormats, 1);
1158         }
1159     }
1160   if (GNUNET_OK != check_result (plugin,
1161                                  ret,
1162                                  PGRES_TUPLES_OK,
1163                                  "PQexecParams",
1164                                  nrc->pname,
1165                                  __LINE__))
1166     {
1167       iter (iter_cls, 
1168             NULL, NULL, 0, NULL, 0, 0, 0, 
1169             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1170       GNUNET_free (nrc);
1171       return;
1172     }
1173   if ((PQntuples (ret) != 1) ||
1174       (PQnfields (ret) != 1) ||
1175       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
1176     {
1177       GNUNET_break (0);
1178       PQclear (ret);
1179       iter (iter_cls, 
1180             NULL, NULL, 0, NULL, 0, 0, 0, 
1181             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1182       GNUNET_free (nrc);
1183       return;
1184     }
1185   nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
1186   PQclear (ret);
1187   if (nrc->total == 0)
1188     {
1189       iter (iter_cls, 
1190             NULL, NULL, 0, NULL, 0, 0, 0, 
1191             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1192       GNUNET_free (nrc);
1193       return;
1194     }
1195   nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1196                                        nrc->total);
1197   postgres_plugin_next_request (nrc,
1198                                 GNUNET_NO);
1199 }
1200
1201
1202 /**
1203  * Get a random item for replication.  Returns a single, not expired, random item
1204  * from those with the highest replication counters.  The item's 
1205  * replication counter is decremented by one IF it was positive before.
1206  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1207  *
1208  * @param cls closure
1209  * @param iter function to call the value (once only).
1210  * @param iter_cls closure for iter
1211  */
1212 static void
1213 postgres_plugin_replication_get (void *cls,
1214                                  PluginIterator iter, void *iter_cls)
1215 {
1216   /* FIXME: not implemented! */
1217   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1218         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1219 }
1220
1221
1222 /**
1223  * Get a random item for expiration.
1224  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1225  *
1226  * @param cls closure
1227  * @param iter function to call the value (once only).
1228  * @param iter_cls closure for iter
1229  */
1230 static void
1231 postgres_plugin_expiration_get (void *cls,
1232                                 PluginIterator iter, void *iter_cls)
1233 {
1234   /* FIXME: not implemented! */
1235   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1236         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1237 }
1238
1239
1240 /**
1241  * Select a subset of the items in the datastore and call
1242  * the given iterator for each of them.
1243  *
1244  * @param cls our "struct Plugin*"
1245  * @param type entries of which type should be considered?
1246  *        Use 0 for any type.
1247  * @param iter function to call on each matching value;
1248  *        will be called once with a NULL value at the end
1249  * @param iter_cls closure for iter
1250  */
1251 static void
1252 postgres_plugin_iter_zero_anonymity (void *cls,
1253                                      enum GNUNET_BLOCK_Type type,
1254                                      PluginIterator iter,
1255                                      void *iter_cls)
1256 {
1257   struct Plugin *plugin = cls;
1258
1259   postgres_iterate (plugin, 
1260                     type, GNUNET_NO, 1,
1261                     iter, iter_cls);
1262 }
1263
1264
1265 /**
1266  * Drop database.
1267  */
1268 static void 
1269 postgres_plugin_drop (void *cls)
1270 {
1271   struct Plugin *plugin = cls;
1272
1273   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1274 }
1275
1276
1277 /**
1278  * Entry point for the plugin.
1279  *
1280  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1281  * @return our "struct Plugin*"
1282  */
1283 void *
1284 libgnunet_plugin_datastore_postgres_init (void *cls)
1285 {
1286   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1287   struct GNUNET_DATASTORE_PluginFunctions *api;
1288   struct Plugin *plugin;
1289
1290   plugin = GNUNET_malloc (sizeof (struct Plugin));
1291   plugin->env = env;
1292   if (GNUNET_OK != init_connection (plugin))
1293     {
1294       GNUNET_free (plugin);
1295       return NULL;
1296     }
1297   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1298   api->cls = plugin;
1299   api->get_size = &postgres_plugin_get_size;
1300   api->put = &postgres_plugin_put;
1301   api->next_request = &postgres_plugin_next_request;
1302   api->get = &postgres_plugin_get;
1303   api->replication_get = &postgres_plugin_replication_get;
1304   api->expiration_get = &postgres_plugin_expiration_get;
1305   api->update = &postgres_plugin_update;
1306   api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
1307   api->drop = &postgres_plugin_drop;
1308   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1309                    "datastore-postgres",
1310                    _("Postgres database running\n"));
1311   return api;
1312 }
1313
1314
1315 /**
1316  * Exit point from the plugin.
1317  * @param cls our "struct Plugin*"
1318  * @return always NULL
1319  */
1320 void *
1321 libgnunet_plugin_datastore_postgres_done (void *cls)
1322 {
1323   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1324   struct Plugin *plugin = api->cls;
1325   
1326   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1327     {
1328       GNUNET_SCHEDULER_cancel (plugin->next_task);
1329       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1330       GNUNET_free (plugin->next_task_nc);
1331       plugin->next_task_nc = NULL;
1332     }
1333   PQfinish (plugin->dbh);
1334   GNUNET_free (plugin);
1335   GNUNET_free (api);
1336   return NULL;
1337 }
1338
1339 /* end of plugin_datastore_postgres.c */