remove send on connect
[oweals/gnunet.git] / src / datastore / plugin_datastore_postgres.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_postgres.c
23  * @brief postgres-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <postgresql/libpq-fe.h>
30
31 #define DEBUG_POSTGRES GNUNET_NO
32
33 /**
34  * After how many ms "busy" should a DB operation fail for good?
35  * A low value makes sure that we are more responsive to requests
36  * (especially PUTs).  A high value guarantees a higher success
37  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
38  *
39  * The default value of 1s should ensure that users do not experience
40  * huge latencies while at the same time allowing operations to succeed
41  * with reasonable probability.
42  */
43 #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
44
45
46 /**
47  * Context for all functions in this plugin.
48  */
49 struct Plugin 
50 {
51   /**
52    * Our execution environment.
53    */
54   struct GNUNET_DATASTORE_PluginEnvironment *env;
55
56   /**
57    * Native Postgres database handle.
58    */
59   PGconn *dbh;
60
61 };
62
63
64 /**
65  * Check if the result obtained from Postgres has
66  * the desired status code.  If not, log an error, clear the
67  * result and return GNUNET_SYSERR.
68  * 
69  * @param plugin global context
70  * @param ret result to check
71  * @param expected_status expected return value
72  * @param command name of SQL command that was run
73  * @param args arguments to SQL command
74  * @param line line number for error reporting
75  * @return GNUNET_OK if the result is acceptable
76  */
77 static int
78 check_result (struct Plugin *plugin,
79               PGresult * ret,
80               int expected_status,
81               const char *command, const char *args, int line)
82 {
83   if (ret == NULL)
84     {
85       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
86                        "datastore-postgres",
87                        "Postgres failed to allocate result for `%s:%s' at %d\n",
88                        command, args, line);
89       return GNUNET_SYSERR;
90     }
91   if (PQresultStatus (ret) != expected_status)
92     {
93       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
94                        "datastore-postgres",
95                        _("`%s:%s' failed at %s:%d with error: %s"),
96                        command, args, __FILE__, line, PQerrorMessage (plugin->dbh));
97       PQclear (ret);
98       return GNUNET_SYSERR;
99     }
100   return GNUNET_OK;
101 }
102
103 /**
104  * Run simple SQL statement (without results).
105  *
106  * @param plugin global context
107  * @param sql statement to run
108  * @param line code line for error reporting
109  */
110 static int
111 pq_exec (struct Plugin *plugin,
112          const char *sql, int line)
113 {
114   PGresult *ret;
115   ret = PQexec (plugin->dbh, sql);
116   if (GNUNET_OK != check_result (plugin,
117                                  ret, 
118                                  PGRES_COMMAND_OK, "PQexec", sql, line))
119     return GNUNET_SYSERR;
120   PQclear (ret);
121   return GNUNET_OK;
122 }
123
124 /**
125  * Prepare SQL statement.
126  *
127  * @param plugin global context
128  * @param name name for the prepared SQL statement
129  * @param sql SQL code to prepare
130  * @param nparams number of parameters in sql
131  * @param line code line for error reporting
132  * @return GNUNET_OK on success
133  */
134 static int
135 pq_prepare (struct Plugin *plugin,
136             const char *name, const char *sql, int nparams, int line)
137 {
138   PGresult *ret;
139   ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
140   if (GNUNET_OK !=
141       check_result (plugin, 
142                     ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
143     return GNUNET_SYSERR;
144   PQclear (ret);
145   return GNUNET_OK;
146 }
147
148 /**
149  * @brief Get a database handle
150  *
151  * @param plugin global context
152  * @return GNUNET_OK on success, GNUNET_SYSERR on error
153  */
154 static int
155 init_connection (struct Plugin *plugin)
156 {
157   char *conninfo;
158   PGresult *ret;
159
160   /* Open database and precompile statements */
161   conninfo = NULL;
162   (void) GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
163                                                 "datastore-postgres",
164                                                 "CONFIG",
165                                                 &conninfo);
166   plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
167   if (NULL == plugin->dbh)
168     {
169       /* FIXME: warn about out-of-memory? */
170       GNUNET_free_non_null (conninfo);
171       return GNUNET_SYSERR;
172     }
173   if (PQstatus (plugin->dbh) != CONNECTION_OK)
174     {
175       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
176                        "datastore-postgres",
177                        _("Unable to initialize Postgres with configuration `%s': %s"),
178                        conninfo,
179                        PQerrorMessage (plugin->dbh));
180       PQfinish (plugin->dbh);
181       plugin->dbh = NULL;
182       GNUNET_free_non_null (conninfo);
183       return GNUNET_SYSERR;
184     }
185   GNUNET_free_non_null (conninfo);
186   ret = PQexec (plugin->dbh,
187                 "CREATE TABLE gn090 ("
188                 "  repl INTEGER NOT NULL DEFAULT 0,"
189                 "  type INTEGER NOT NULL DEFAULT 0,"
190                 "  prio INTEGER NOT NULL DEFAULT 0,"
191                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
192                 "  expire BIGINT NOT NULL DEFAULT 0,"
193                 "  rvalue BIGINT NOT NULL DEFAULT 0,"
194                 "  hash BYTEA NOT NULL DEFAULT '',"
195                 "  vhash BYTEA NOT NULL DEFAULT '',"
196                 "  value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
197   if ( (ret == NULL) || 
198        ( (PQresultStatus (ret) != PGRES_COMMAND_OK) && 
199          (0 != strcmp ("42P07",    /* duplicate table */
200                        PQresultErrorField
201                        (ret,
202                         PG_DIAG_SQLSTATE)))))
203     {
204       (void) check_result (plugin,
205                            ret, PGRES_COMMAND_OK, "CREATE TABLE", "gn090", __LINE__);
206       PQfinish (plugin->dbh);
207       plugin->dbh = NULL;
208       return GNUNET_SYSERR;
209     }
210   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
211     {
212       if ((GNUNET_OK !=
213            pq_exec (plugin, "CREATE INDEX idx_hash ON gn090 (hash)", __LINE__)) ||
214           (GNUNET_OK !=
215            pq_exec (plugin, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)",
216                     __LINE__))
217           || (GNUNET_OK !=
218               pq_exec (plugin, "CREATE INDEX idx_prio ON gn090 (prio)", __LINE__))
219           || (GNUNET_OK !=
220               pq_exec (plugin, "CREATE INDEX idx_expire ON gn090 (expire)", __LINE__))
221           || (GNUNET_OK !=
222               pq_exec (plugin, "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)",
223                        __LINE__))
224           || (GNUNET_OK !=
225               pq_exec
226               (plugin, "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)",
227                __LINE__))
228           || (GNUNET_OK !=
229               pq_exec
230               (plugin, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)",
231                __LINE__))
232           || (GNUNET_OK !=
233               pq_exec (plugin, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)",
234                        __LINE__)))
235         {
236           PQclear (ret);
237           PQfinish (plugin->dbh);
238           plugin->dbh = NULL;
239           return GNUNET_SYSERR;
240         }
241     }
242   PQclear (ret);
243   ret = PQexec (plugin->dbh,
244                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
245   if (GNUNET_OK != 
246       check_result (plugin,
247                     ret, PGRES_COMMAND_OK,
248                     "ALTER TABLE", "gn090", __LINE__))
249     {
250       PQfinish (plugin->dbh);
251       plugin->dbh = NULL;
252       return GNUNET_SYSERR;
253     }
254   PQclear (ret);
255   ret = PQexec (plugin->dbh,
256                 "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
257   if (GNUNET_OK !=
258       check_result (plugin,
259                     ret, PGRES_COMMAND_OK,
260                     "ALTER TABLE", "gn090", __LINE__))
261     {
262       PQfinish (plugin->dbh);
263       plugin->dbh = NULL;
264       return GNUNET_SYSERR;
265     }
266   PQclear (ret);
267   ret = PQexec (plugin->dbh,
268                 "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
269   if (GNUNET_OK !=
270       check_result (plugin,
271                     ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090", __LINE__))
272     {
273       PQfinish (plugin->dbh);
274       plugin->dbh = NULL;
275       return GNUNET_SYSERR;
276     }
277   PQclear (ret);
278   if ((GNUNET_OK !=
279        pq_prepare (plugin,
280                    "getvt",
281                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
282                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
283                    "ORDER BY oid ASC LIMIT 1 OFFSET $4",
284                    4,
285                    __LINE__)) ||
286       (GNUNET_OK !=
287        pq_prepare (plugin,
288                    "gett",
289                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
290                    "WHERE hash=$1 AND type=$2 "
291                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
292                    3,
293                    __LINE__)) ||
294       (GNUNET_OK !=
295        pq_prepare (plugin,
296                    "getv",
297                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
298                    "WHERE hash=$1 AND vhash=$2 "
299                    "ORDER BY oid ASC LIMIT 1 OFFSET $3",
300                    3,
301                    __LINE__)) ||
302       (GNUNET_OK !=
303        pq_prepare (plugin,
304                    "get",
305                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
306                    "WHERE hash=$1 "
307                    "ORDER BY oid ASC LIMIT 1 OFFSET $2",
308                    2,
309                    __LINE__)) ||
310       (GNUNET_OK !=
311        pq_prepare (plugin,
312                    "put",
313                    "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
314                    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
315                    9,
316                    __LINE__)) ||
317       (GNUNET_OK !=
318        pq_prepare (plugin,
319                    "update",
320                    "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
321                    "WHERE oid = $3",
322                    3,
323                    __LINE__)) ||
324       (GNUNET_OK !=
325        pq_prepare (plugin,
326                    "decrepl",
327                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
328                    "WHERE oid = $1",
329                    1,
330                    __LINE__)) ||
331       (GNUNET_OK !=
332        pq_prepare (plugin,
333                    "select_non_anonymous",
334                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
335                    "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
336                    1,
337                    __LINE__)) ||
338       (GNUNET_OK !=
339        pq_prepare (plugin,
340                    "select_expiration_order",
341                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
342                    "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "                      
343                    "UNION "                                             
344                    "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 
345                    "ORDER BY prio ASC LIMIT 1) "                        
346                    "ORDER BY expire ASC LIMIT 1",
347                    1,
348                    __LINE__)) ||
349       (GNUNET_OK !=
350        pq_prepare (plugin,
351                    "select_replication_order",
352                    "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
353                    "ORDER BY repl DESC,RANDOM() LIMIT 1",
354                    0,
355                    __LINE__)) ||
356       (GNUNET_OK !=
357        pq_prepare (plugin,
358                    "delrow",
359                    "DELETE FROM gn090 " "WHERE oid=$1", 
360                    1,
361                    __LINE__)))
362     {
363       PQfinish (plugin->dbh);
364       plugin->dbh = NULL;
365       return GNUNET_SYSERR;
366     }
367   return GNUNET_OK;
368 }
369
370
371 /**
372  * Delete the row identified by the given rowid (qid
373  * in postgres).
374  *
375  * @param plugin global context
376  * @param rowid which row to delete
377  * @return GNUNET_OK on success
378  */
379 static int
380 delete_by_rowid (struct Plugin *plugin,
381                  unsigned int rowid)
382 {
383   uint32_t browid;
384   const char *paramValues[] = { (const char *) &browid };
385   int paramLengths[] = { sizeof (browid) };
386   const int paramFormats[] = { 1 };
387   PGresult *ret;
388
389   browid = htonl (rowid);
390   ret = PQexecPrepared (plugin->dbh,
391                         "delrow",
392                         1, paramValues, paramLengths, paramFormats, 1);
393   if (GNUNET_OK !=
394       check_result (plugin,
395                     ret, PGRES_COMMAND_OK, "PQexecPrepared", "delrow",
396                     __LINE__))
397     {
398       return GNUNET_SYSERR;
399     }
400   PQclear (ret);
401   return GNUNET_OK;
402 }
403
404
405 /**
406  * Get an estimate of how much space the database is
407  * currently using.
408  *
409  * @param cls our "struct Plugin*"
410  * @return number of bytes used on disk
411  */
412 static unsigned long long
413 postgres_plugin_estimate_size (void *cls)
414 {
415   struct Plugin *plugin = cls;
416   unsigned long long total;
417   PGresult *ret;
418
419   ret = PQexecParams (plugin->dbh,
420                       "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
421                       0, NULL, NULL, NULL, NULL, 1);
422   if (GNUNET_OK != check_result (plugin,
423                                  ret,
424                                  PGRES_TUPLES_OK,
425                                  "PQexecParams",
426                                  "get_size",
427                                  __LINE__))
428     {
429       return 0;
430     }
431   if ((PQntuples (ret) != 1) ||
432       (PQnfields (ret) != 1) ||
433       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
434     {
435       GNUNET_break (0);
436       PQclear (ret);
437       return 0;
438     }
439   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
440   PQclear (ret);
441   return total;
442 }
443
444
445 /**
446  * Store an item in the datastore.
447  *
448  * @param cls closure
449  * @param key key for the item
450  * @param size number of bytes in data
451  * @param data content stored
452  * @param type type of the content
453  * @param priority priority of the content
454  * @param anonymity anonymity-level for the content
455  * @param replication replication-level for the content
456  * @param expiration expiration time for the content
457  * @param msg set to error message
458  * @return GNUNET_OK on success
459  */
460 static int
461 postgres_plugin_put (void *cls,
462                      const GNUNET_HashCode * key,
463                      uint32_t size,
464                      const void *data,
465                      enum GNUNET_BLOCK_Type type,
466                      uint32_t priority,
467                      uint32_t anonymity,
468                      uint32_t replication,
469                      struct GNUNET_TIME_Absolute expiration,
470                      char **msg)
471 {
472   struct Plugin *plugin = cls;
473   GNUNET_HashCode vhash;
474   PGresult *ret;
475   uint32_t btype = htonl (type);
476   uint32_t bprio = htonl (priority);
477   uint32_t banon = htonl (anonymity);
478   uint32_t brepl = htonl (replication);
479   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
480   uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
481   const char *paramValues[] = {
482     (const char *) &brepl,
483     (const char *) &btype,
484     (const char *) &bprio,
485     (const char *) &banon,
486     (const char *) &bexpi,
487     (const char *) &rvalue,
488     (const char *) key,
489     (const char *) &vhash,
490     (const char *) data
491   };
492   int paramLengths[] = {
493     sizeof (brepl),
494     sizeof (btype),
495     sizeof (bprio),
496     sizeof (banon),
497     sizeof (bexpi),
498     sizeof (rvalue),
499     sizeof (GNUNET_HashCode),
500     sizeof (GNUNET_HashCode),
501     size
502   };
503   const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1 };
504
505   GNUNET_CRYPTO_hash (data, size, &vhash);
506   ret = PQexecPrepared (plugin->dbh,
507                         "put", 9, paramValues, paramLengths, paramFormats, 1);
508   if (GNUNET_OK != check_result (plugin, ret,
509                                  PGRES_COMMAND_OK,
510                                  "PQexecPrepared", "put", __LINE__))
511     return GNUNET_SYSERR;
512   PQclear (ret);
513   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
514 #if DEBUG_POSTGRES
515   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
516                    "datastore-postgres",
517                    "Stored %u bytes in database\n",
518                    (unsigned int) size);
519 #endif
520   return GNUNET_OK;
521 }
522
523
524 /**
525  * Function invoked to process the result and call
526  * the processor.
527  *
528  * @param plugin global plugin data
529  * @param proc function to call the value (once only).
530  * @param proc_cls closure for proc
531  * @param res result from exec
532  * @param line line number for error messages
533  */
534 static void 
535 process_result (struct Plugin *plugin,
536                 PluginDatumProcessor proc, void *proc_cls,
537                 PGresult *res,
538                 int line)
539 {
540   int iret;
541   enum GNUNET_BLOCK_Type type;
542   uint32_t anonymity;
543   uint32_t priority;
544   uint32_t size;
545   unsigned int rowid;
546   struct GNUNET_TIME_Absolute expiration_time;
547   GNUNET_HashCode key;
548
549   if (GNUNET_OK != check_result (plugin,
550                                  res,
551                                  PGRES_TUPLES_OK,
552                                  "PQexecPrepared",
553                                  "select",
554                                  line))
555     {
556 #if DEBUG_POSTGRES
557       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
558                        "datastore-postgres",
559                        "Ending iteration (postgres error)\n");
560 #endif
561       proc (proc_cls, 
562             NULL, 0, NULL, 0, 0, 0, 
563             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
564       return;
565     }
566
567   if (0 == PQntuples (res))
568     {
569       /* no result */
570 #if DEBUG_POSTGRES
571       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
572                        "datastore-postgres",
573                        "Ending iteration (no more results)\n");
574 #endif
575       proc (proc_cls, 
576             NULL, 0, NULL, 0, 0, 0, 
577             GNUNET_TIME_UNIT_ZERO_ABS, 0);
578       PQclear (res);
579       return; 
580     }
581   if ((1 != PQntuples (res)) ||
582       (7 != PQnfields (res)) ||
583       (sizeof (uint32_t) != PQfsize (res, 0)) ||
584       (sizeof (uint32_t) != PQfsize (res, 6)))
585     {
586       GNUNET_break (0);
587       proc (proc_cls, 
588             NULL, 0, NULL, 0, 0, 0, 
589             GNUNET_TIME_UNIT_ZERO_ABS, 0);
590       PQclear (res);
591       return;
592     }
593   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
594   if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
595       (sizeof (uint32_t) != PQfsize (res, 1)) ||
596       (sizeof (uint32_t) != PQfsize (res, 2)) ||
597       (sizeof (uint64_t) != PQfsize (res, 3)) ||
598       (sizeof (GNUNET_HashCode) != PQgetlength (res, 0, 4)) )
599     {
600       GNUNET_break (0);
601       PQclear (res);
602       delete_by_rowid (plugin, rowid);
603       proc (proc_cls, 
604             NULL, 0, NULL, 0, 0, 0, 
605             GNUNET_TIME_UNIT_ZERO_ABS, 0);
606       return;
607     }
608
609   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
610   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
611   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
612   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
613   memcpy (&key, 
614           PQgetvalue (res, 0, 4), 
615           sizeof (GNUNET_HashCode));
616   size = PQgetlength (res, 0, 5);
617 #if DEBUG_POSTGRES
618   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
619                    "datastore-postgres",
620                    "Found result of size %u bytes and type %u in database\n",
621                    (unsigned int) size,
622                    (unsigned int) type);
623 #endif
624   iret = proc (proc_cls,
625                &key,
626                size,
627                PQgetvalue (res, 0, 5),
628                (enum GNUNET_BLOCK_Type) type,
629                priority,
630                anonymity,
631                expiration_time,
632                rowid);
633   PQclear (res);
634   if (iret == GNUNET_NO)
635     {
636 #if DEBUG_POSTGRES
637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638                   "Processor asked for item %u to be removed.\n",
639                   rowid);
640 #endif
641       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
642         {
643 #if DEBUG_POSTGRES
644           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
645                            "datastore-postgres",
646                            "Deleting %u bytes from database\n",
647                            (unsigned int) size);
648 #endif
649           plugin->env->duc (plugin->env->cls,
650                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
651 #if DEBUG_POSTGRES
652           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
653                            "datastore-postgres",
654                            "Deleted %u bytes from database\n",
655                            (unsigned int) size);
656 #endif
657         }
658     }
659 }
660
661
662 /**
663  * Iterate over the results for a particular key
664  * in the datastore.
665  *
666  * @param cls closure
667  * @param offset offset of the result (modulo num-results); 
668  *        specific ordering does not matter for the offset
669  * @param key maybe NULL (to match all entries)
670  * @param vhash hash of the value, maybe NULL (to
671  *        match all values that have the right key).
672  *        Note that for DBlocks there is no difference
673  *        betwen key and vhash, but for other blocks
674  *        there may be!
675  * @param type entries of which type are relevant?
676  *     Use 0 for any type.
677  * @param proc function to call on the matching value;
678  *        will be called once with a NULL if no value matches
679  * @param proc_cls closure for iter
680  */
681 static void
682 postgres_plugin_get_key (void *cls,
683                          uint64_t offset,
684                          const GNUNET_HashCode *key,
685                          const GNUNET_HashCode *vhash,
686                          enum GNUNET_BLOCK_Type type,
687                          PluginDatumProcessor proc, void *proc_cls)
688 {
689   struct Plugin *plugin = cls;
690   const int paramFormats[] = { 1, 1, 1, 1, 1 };
691   int paramLengths[4];
692   const char *paramValues[4];
693   int nparams;
694   const char *pname;
695   PGresult *ret;
696   uint64_t total;
697   uint64_t blimit_off;
698   uint32_t btype;
699
700   GNUNET_assert (key != NULL);
701   paramValues[0] = (const char*) key;
702   paramLengths[0] = sizeof (GNUNET_HashCode);
703   btype = htonl (type);
704   if (type != 0)
705     {
706       if (vhash != NULL)
707         {
708           paramValues[1] = (const char *) vhash;
709           paramLengths[1] = sizeof (GNUNET_HashCode);
710           paramValues[2] = (const char *) &btype;
711           paramLengths[2] = sizeof (btype);
712           paramValues[3] = (const char *) &blimit_off;
713           paramLengths[3] = sizeof (blimit_off);
714           nparams = 4;
715           pname = "getvt";
716           ret = PQexecParams (plugin->dbh,
717                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
718                               3,
719                               NULL,
720                               paramValues, 
721                               paramLengths,
722                               paramFormats, 1);
723         }
724       else
725         {
726           paramValues[1] = (const char *) &btype;
727           paramLengths[1] = sizeof (btype);
728           paramValues[2] = (const char *) &blimit_off;
729           paramLengths[2] = sizeof (blimit_off);
730           nparams = 3;
731           pname = "gett";
732           ret = PQexecParams (plugin->dbh,
733                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
734                               2,
735                               NULL,
736                               paramValues, 
737                               paramLengths, 
738                               paramFormats, 1);
739         }
740     }
741   else
742     {
743       if (vhash != NULL)
744         {
745           paramValues[1] = (const char *) vhash;
746           paramLengths[1] = sizeof (GNUNET_HashCode);
747           paramValues[2] = (const char *) &blimit_off;
748           paramLengths[2] = sizeof (blimit_off);
749           nparams = 3;
750           pname = "getv";
751           ret = PQexecParams (plugin->dbh,
752                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
753                               2,
754                               NULL,
755                               paramValues, 
756                               paramLengths,
757                               paramFormats, 1);
758         }
759       else
760         {
761           paramValues[1] = (const char *) &blimit_off;
762           paramLengths[1] = sizeof (blimit_off);
763           nparams = 2;
764           pname = "get";
765           ret = PQexecParams (plugin->dbh,
766                               "SELECT count(*) FROM gn090 WHERE hash=$1",
767                               1,
768                               NULL,
769                               paramValues, 
770                               paramLengths,
771                               paramFormats, 1);
772         }
773     }
774   if (GNUNET_OK != check_result (plugin,
775                                  ret,
776                                  PGRES_TUPLES_OK,
777                                  "PQexecParams",
778                                  pname,
779                                  __LINE__))
780     {
781       proc (proc_cls, 
782             NULL, 0, NULL, 0, 0, 0, 
783             GNUNET_TIME_UNIT_ZERO_ABS, 0);
784       return;
785     }
786   if ((PQntuples (ret) != 1) ||
787       (PQnfields (ret) != 1) ||
788       (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
789     {
790       GNUNET_break (0);
791       PQclear (ret);
792       proc (proc_cls, 
793             NULL, 0, NULL, 0, 0, 0, 
794             GNUNET_TIME_UNIT_ZERO_ABS, 0);
795       return;
796     }
797   total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
798   PQclear (ret);
799   if (total == 0)
800     {
801       proc (proc_cls, 
802             NULL, 0, NULL, 0, 0, 0, 
803             GNUNET_TIME_UNIT_ZERO_ABS, 0);
804       return;
805     }
806   blimit_off = GNUNET_htonll (offset % total);
807   ret = PQexecPrepared (plugin->dbh,
808                         pname,
809                         nparams,
810                         paramValues, 
811                         paramLengths,
812                         paramFormats, 1);
813   process_result (plugin,
814                   proc, proc_cls,
815                   ret, __LINE__);
816 }
817
818
819 /**
820  * Select a subset of the items in the datastore and call
821  * the given iterator for each of them.
822  *
823  * @param cls our "struct Plugin*"
824  * @param type entries of which type should be considered?
825  *        Use 0 for any type.
826  * @param proc function to call on the matching value;
827  *        will be called with a NULL if no value matches
828  * @param proc_cls closure for proc
829  */
830 static void
831 postgres_plugin_get_zero_anonymity (void *cls,
832                                     uint64_t offset,
833                                     enum GNUNET_BLOCK_Type type,
834                                     PluginDatumProcessor proc, void *proc_cls)
835 {
836   struct Plugin *plugin = cls;
837   uint32_t btype;
838   uint64_t boff;
839   const int paramFormats[] = { 1, 1 };
840   int paramLengths[] = { sizeof (btype), sizeof (boff) };
841   const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
842   PGresult *ret;
843
844   btype = htonl ((uint32_t) type);
845   boff = GNUNET_htonll (offset);
846   ret = PQexecPrepared (plugin->dbh,
847                         "select_non_anonymous",
848                         2,
849                         paramValues, 
850                         paramLengths,
851                         paramFormats, 1);
852   process_result (plugin,
853                   proc, proc_cls,
854                   ret, __LINE__);
855 }
856
857
858 /**
859  * Context for 'repl_iter' function.
860  */
861 struct ReplCtx
862 {
863   
864   /**
865    * Plugin handle.
866    */
867   struct Plugin *plugin;
868   
869   /**
870    * Function to call for the result (or the NULL).
871    */
872   PluginDatumProcessor proc;
873   
874   /**
875    * Closure for proc.
876    */
877   void *proc_cls;
878 };
879
880
881 /**
882  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
883  * Decrements the replication counter and calls the original
884  * iterator.
885  *
886  * @param cls closure
887  * @param next_cls closure to pass to the "next" function.
888  * @param key key for the content
889  * @param size number of bytes in data
890  * @param data content stored
891  * @param type type of the content
892  * @param priority priority of the content
893  * @param anonymity anonymity-level for the content
894  * @param expiration expiration time for the content
895  * @param uid unique identifier for the datum;
896  *        maybe 0 if no unique identifier is available
897  *
898  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
899  *         (continue on call to "next", of course),
900  *         GNUNET_NO to delete the item and continue (if supported)
901  */
902 static int
903 repl_proc (void *cls,
904            const GNUNET_HashCode *key,
905            uint32_t size,
906            const void *data,
907            enum GNUNET_BLOCK_Type type,
908            uint32_t priority,
909            uint32_t anonymity,
910            struct GNUNET_TIME_Absolute expiration, 
911            uint64_t uid)
912 {
913   struct ReplCtx *rc = cls;
914   struct Plugin *plugin = rc->plugin;
915   int ret;
916   PGresult *qret;
917   uint32_t boid;
918
919   ret = rc->proc (rc->proc_cls,
920                   key,
921                   size, data, 
922                   type, priority, anonymity, expiration,
923                   uid);
924   if (NULL != key)
925     {
926       boid = htonl ( (uint32_t) uid);
927       const char *paramValues[] = {
928         (const char *) &boid,
929       };
930       int paramLengths[] = {
931         sizeof (boid),
932       };
933       const int paramFormats[] = { 1 };
934       qret = PQexecPrepared (plugin->dbh,
935                             "decrepl",
936                             1, paramValues, paramLengths, paramFormats, 1);
937       if (GNUNET_OK != check_result (plugin,
938                                      qret,
939                                      PGRES_COMMAND_OK,
940                                      "PQexecPrepared", 
941                                      "decrepl", __LINE__))
942         return GNUNET_SYSERR;
943       PQclear (qret);
944     }
945   return ret;
946 }
947
948
949 /**
950  * Get a random item for replication.  Returns a single, not expired, random item
951  * from those with the highest replication counters.  The item's 
952  * replication counter is decremented by one IF it was positive before.
953  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
954  *
955  * @param cls closure
956  * @param proc function to call the value (once only).
957  * @param proc_cls closure for proc
958  */
959 static void
960 postgres_plugin_get_replication (void *cls,
961                                  PluginDatumProcessor proc, void *proc_cls)
962 {
963   struct Plugin *plugin = cls;
964   struct ReplCtx rc;
965   PGresult *ret;
966
967   rc.plugin = plugin;
968   rc.proc = proc;
969   rc.proc_cls = proc_cls;
970   ret = PQexecPrepared (plugin->dbh,
971                         "select_replication_order",
972                         0,
973                         NULL, NULL, NULL, 1);
974   process_result (plugin,
975                   &repl_proc, &rc,
976                   ret, __LINE__);
977 }
978
979
980 /**
981  * Get a random item for expiration.
982  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
983  *
984  * @param cls closure
985  * @param proc function to call the value (once only).
986  * @param proc_cls closure for proc
987  */
988 static void
989 postgres_plugin_get_expiration (void *cls,
990                                 PluginDatumProcessor proc, void *proc_cls)
991 {
992   struct Plugin *plugin = cls;
993   uint64_t btime;
994   const int paramFormats[] = { 1 };
995   int paramLengths[] = { sizeof (btime) };
996   const char *paramValues[] = { (const char*) &btime };
997   PGresult *ret;
998   
999   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
1000   ret = PQexecPrepared (plugin->dbh,
1001                         "select_expiration_order",
1002                         1,
1003                         paramValues,
1004                         paramLengths,
1005                         paramFormats, 
1006                         1);
1007   process_result (plugin,
1008                   proc, proc_cls,
1009                   ret, __LINE__);
1010 }
1011
1012
1013 /**
1014  * Update the priority for a particular key in the datastore.  If
1015  * the expiration time in value is different than the time found in
1016  * the datastore, the higher value should be kept.  For the
1017  * anonymity level, the lower value is to be used.  The specified
1018  * priority should be added to the existing priority, ignoring the
1019  * priority in value.
1020  *
1021  * Note that it is possible for multiple values to match this put.
1022  * In that case, all of the respective values are updated.
1023  *
1024  * @param cls our "struct Plugin*"
1025  * @param uid unique identifier of the datum
1026  * @param delta by how much should the priority
1027  *     change?  If priority + delta < 0 the
1028  *     priority should be set to 0 (never go
1029  *     negative).
1030  * @param expire new expiration time should be the
1031  *     MAX of any existing expiration time and
1032  *     this value
1033  * @param msg set to error message
1034  * @return GNUNET_OK on success
1035  */
1036 static int
1037 postgres_plugin_update (void *cls,
1038                         uint64_t uid,
1039                         int delta, struct GNUNET_TIME_Absolute expire,
1040                         char **msg)
1041 {
1042   struct Plugin *plugin = cls;
1043   PGresult *ret;
1044   int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1045   uint32_t boid = htonl ( (uint32_t) uid);
1046   uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1047   const char *paramValues[] = {
1048     (const char *) &bdelta,
1049     (const char *) &bexpire,
1050     (const char *) &boid,
1051   };
1052   int paramLengths[] = {
1053     sizeof (bdelta),
1054     sizeof (bexpire),
1055     sizeof (boid),
1056   };
1057   const int paramFormats[] = { 1, 1, 1 };
1058
1059   ret = PQexecPrepared (plugin->dbh,
1060                         "update",
1061                         3, paramValues, paramLengths, paramFormats, 1);
1062   if (GNUNET_OK != check_result (plugin,
1063                                  ret,
1064                                  PGRES_COMMAND_OK,
1065                                  "PQexecPrepared", "update", __LINE__))
1066     return GNUNET_SYSERR;
1067   PQclear (ret);
1068   return GNUNET_OK;
1069 }
1070
1071
1072 /**
1073  * Drop database.
1074  */
1075 static void 
1076 postgres_plugin_drop (void *cls)
1077 {
1078   struct Plugin *plugin = cls;
1079
1080   pq_exec (plugin, "DROP TABLE gn090", __LINE__);
1081 }
1082
1083
1084 /**
1085  * Entry point for the plugin.
1086  *
1087  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1088  * @return our "struct Plugin*"
1089  */
1090 void *
1091 libgnunet_plugin_datastore_postgres_init (void *cls)
1092 {
1093   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1094   struct GNUNET_DATASTORE_PluginFunctions *api;
1095   struct Plugin *plugin;
1096
1097   plugin = GNUNET_malloc (sizeof (struct Plugin));
1098   plugin->env = env;
1099   if (GNUNET_OK != init_connection (plugin))
1100     {
1101       GNUNET_free (plugin);
1102       return NULL;
1103     }
1104   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1105   api->cls = plugin;
1106   api->estimate_size = &postgres_plugin_estimate_size;
1107   api->put = &postgres_plugin_put;
1108   api->update = &postgres_plugin_update;
1109   api->get_key = &postgres_plugin_get_key;
1110   api->get_replication = &postgres_plugin_get_replication;
1111   api->get_expiration = &postgres_plugin_get_expiration;
1112   api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1113   api->drop = &postgres_plugin_drop;
1114   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1115                    "datastore-postgres",
1116                    _("Postgres database running\n"));
1117   return api;
1118 }
1119
1120
1121 /**
1122  * Exit point from the plugin.
1123  * @param cls our "struct Plugin*"
1124  * @return always NULL
1125  */
1126 void *
1127 libgnunet_plugin_datastore_postgres_done (void *cls)
1128 {
1129   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1130   struct Plugin *plugin = api->cls;
1131   
1132   PQfinish (plugin->dbh);
1133   GNUNET_free (plugin);
1134   GNUNET_free (api);
1135   return NULL;
1136 }
1137
1138 /* end of plugin_datastore_postgres.c */