implementing postgres version of get_random
[oweals/gnunet.git] / src / datacache / plugin_datacache_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2006, 2009, 2010, 2012, 2015 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datacache/plugin_datacache_postgres.c
23  * @brief postgres for an implementation of a database backend for the datacache
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_postgres_lib.h"
29 #include "gnunet_datacache_plugin.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
32
33 /**
34  * Per-entry overhead estimate
35  */
36 #define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
37
38 /**
39  * Context for all functions in this plugin.
40  */
41 struct Plugin
42 {
43   /**
44    * Our execution environment.
45    */
46   struct GNUNET_DATACACHE_PluginEnvironment *env;
47
48   /**
49    * Native Postgres database handle.
50    */
51   PGconn *dbh;
52
53   /**
54    * Number of key-value pairs in the database.
55    */
56   unsigned int num_items;
57 };
58
59
60 /**
61  * @brief Get a database handle
62  *
63  * @param plugin global context
64  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
65  */
66 static int
67 init_connection (struct Plugin *plugin)
68 {
69   PGresult *ret;
70
71   plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg,
72                                          "datacache-postgres");
73   if (NULL == plugin->dbh)
74     return GNUNET_SYSERR;
75   ret =
76       PQexec (plugin->dbh,
77               "CREATE TEMPORARY TABLE gn090dc ("
78               "  type INTEGER NOT NULL DEFAULT 0,"
79               "  discard_time BIGINT NOT NULL DEFAULT 0,"
80               "  key BYTEA NOT NULL DEFAULT '',"
81               "  value BYTEA NOT NULL DEFAULT '',"
82               "  path BYTEA DEFAULT '')"
83               "WITH OIDS");
84   if ( (ret == NULL) ||
85        ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
86         (0 != strcmp ("42P07",    /* duplicate table */
87                       PQresultErrorField
88                       (ret,
89                        PG_DIAG_SQLSTATE)))))
90   {
91     (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret,
92                                          PGRES_COMMAND_OK,
93                                          "CREATE TABLE",
94                                          "gn090dc");
95     PQfinish (plugin->dbh);
96     plugin->dbh = NULL;
97     return GNUNET_SYSERR;
98   }
99   if (PQresultStatus (ret) == PGRES_COMMAND_OK)
100   {
101     if ((GNUNET_OK !=
102          GNUNET_POSTGRES_exec (plugin->dbh,
103                                "CREATE INDEX idx_key ON gn090dc (key)")) ||
104         (GNUNET_OK !=
105          GNUNET_POSTGRES_exec (plugin->dbh,
106                                "CREATE INDEX idx_dt ON gn090dc (discard_time)")))
107     {
108       PQclear (ret);
109       PQfinish (plugin->dbh);
110       plugin->dbh = NULL;
111       return GNUNET_SYSERR;
112     }
113   }
114   PQclear (ret);
115   ret =
116       PQexec (plugin->dbh,
117               "ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL");
118   if (GNUNET_OK !=
119       GNUNET_POSTGRES_check_result (plugin->dbh,
120                                     ret,
121                                     PGRES_COMMAND_OK,
122                                     "ALTER TABLE",
123                                     "gn090dc"))
124   {
125     PQfinish (plugin->dbh);
126     plugin->dbh = NULL;
127     return GNUNET_SYSERR;
128   }
129   PQclear (ret);
130   ret = PQexec (plugin->dbh,
131                 "ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN");
132   if (GNUNET_OK !=
133       GNUNET_POSTGRES_check_result (plugin->dbh,
134                                     ret,
135                                     PGRES_COMMAND_OK,
136                                     "ALTER TABLE",
137                                     "gn090dc"))
138   {
139     PQfinish (plugin->dbh);
140     plugin->dbh = NULL;
141     return GNUNET_SYSERR;
142   }
143   PQclear (ret);
144   if ((GNUNET_OK !=
145        GNUNET_POSTGRES_prepare (plugin->dbh,
146                                 "getkt",
147                                 "SELECT discard_time,type,value,path FROM gn090dc "
148                                 "WHERE key=$1 AND type=$2 ", 2)) ||
149       (GNUNET_OK !=
150        GNUNET_POSTGRES_prepare (plugin->dbh,
151                                 "getk",
152                                 "SELECT discard_time,type,value,path FROM gn090dc "
153                                 "WHERE key=$1", 1)) ||
154       (GNUNET_OK !=
155        GNUNET_POSTGRES_prepare (plugin->dbh,
156                                 "getm",
157                                 "SELECT length(value),oid,key FROM gn090dc "
158                                 "ORDER BY discard_time ASC LIMIT 1", 0)) ||
159       (GNUNET_OK !=
160        GNUNET_POSTGRES_prepare (plugin->dbh,
161                                 "get_random",
162                                 "SELECT discard_time,type,value,path,key FROM gn090dc "
163                                 "ORDER BY key ASC LIMIT 1 OFFSET $1", 1)) ||
164       (GNUNET_OK !=
165        GNUNET_POSTGRES_prepare (plugin->dbh,
166                                 "delrow",
167                                 "DELETE FROM gn090dc WHERE oid=$1", 1)) ||
168       (GNUNET_OK !=
169        GNUNET_POSTGRES_prepare (plugin->dbh,
170                                 "put",
171                                 "INSERT INTO gn090dc (type, discard_time, key, value, path) "
172                                 "VALUES ($1, $2, $3, $4, $5)", 5)))
173   {
174     PQfinish (plugin->dbh);
175     plugin->dbh = NULL;
176     return GNUNET_SYSERR;
177   }
178   return GNUNET_OK;
179 }
180
181
182 /**
183  * Store an item in the datastore.
184  *
185  * @param cls closure (our `struct Plugin`)
186  * @param key key to store @a data under
187  * @param size number of bytes in @a data
188  * @param data data to store
189  * @param type type of the value
190  * @param discard_time when to discard the value in any case
191  * @param path_info_len number of entries in @a path_info
192  * @param path_info a path through the network
193  * @return 0 if duplicate, -1 on error, number of bytes used otherwise
194  */
195 static ssize_t
196 postgres_plugin_put (void *cls,
197                      const struct GNUNET_HashCode *key,
198                      size_t size,
199                      const char *data,
200                      enum GNUNET_BLOCK_Type type,
201                      struct GNUNET_TIME_Absolute discard_time,
202                      unsigned int path_info_len,
203                      const struct GNUNET_PeerIdentity *path_info)
204 {
205   struct Plugin *plugin = cls;
206   PGresult *ret;
207   uint32_t btype = htonl (type);
208   uint64_t bexpi = GNUNET_TIME_absolute_hton (discard_time).abs_value_us__;
209
210   const char *paramValues[] = {
211     (const char *) &btype,
212     (const char *) &bexpi,
213     (const char *) key,
214     (const char *) data,
215     (const char *) path_info
216   };
217   int paramLengths[] = {
218     sizeof (btype),
219     sizeof (bexpi),
220     sizeof (struct GNUNET_HashCode),
221     size,
222     path_info_len * sizeof (struct GNUNET_PeerIdentity)
223   };
224   const int paramFormats[] = { 1, 1, 1, 1, 1 };
225
226   ret =
227       PQexecPrepared (plugin->dbh, "put", 5, paramValues, paramLengths,
228                       paramFormats, 1);
229   if (GNUNET_OK !=
230       GNUNET_POSTGRES_check_result (plugin->dbh, ret,
231                                     PGRES_COMMAND_OK, "PQexecPrepared", "put"))
232     return -1;
233   plugin->num_items++;
234   PQclear (ret);
235   return size + OVERHEAD;
236 }
237
238
239 /**
240  * Iterate over the results for a particular key
241  * in the datastore.
242  *
243  * @param cls closure (our `struct Plugin`)
244  * @param key key to look for
245  * @param type entries of which type are relevant?
246  * @param iter maybe NULL (to just count)
247  * @param iter_cls closure for @a iter
248  * @return the number of results found
249  */
250 static unsigned int
251 postgres_plugin_get (void *cls,
252                      const struct GNUNET_HashCode *key,
253                      enum GNUNET_BLOCK_Type type,
254                      GNUNET_DATACACHE_Iterator iter,
255                      void *iter_cls)
256 {
257   struct Plugin *plugin = cls;
258   uint32_t btype = htonl (type);
259
260   const char *paramValues[] = {
261     (const char *) key,
262     (const char *) &btype,
263   };
264   int paramLengths[] = {
265     sizeof (struct GNUNET_HashCode),
266     sizeof (btype),
267   };
268   const int paramFormats[] = { 1, 1 };
269   struct GNUNET_TIME_Absolute expiration_time;
270   uint32_t size;
271   unsigned int cnt;
272   unsigned int i;
273   unsigned int path_len;
274   const struct GNUNET_PeerIdentity *path;
275   PGresult *res;
276
277   res =
278       PQexecPrepared (plugin->dbh, (type == 0) ? "getk" : "getkt",
279                       (type == 0) ? 1 : 2, paramValues, paramLengths,
280                       paramFormats, 1);
281   if (GNUNET_OK !=
282       GNUNET_POSTGRES_check_result (plugin->dbh,
283                                     res,
284                                     PGRES_TUPLES_OK,
285                                     "PQexecPrepared",
286                                     (type == 0) ? "getk" : "getkt"))
287   {
288     LOG (GNUNET_ERROR_TYPE_DEBUG,
289          "Ending iteration (postgres error)\n");
290     return 0;
291   }
292
293   if (0 == (cnt = PQntuples (res)))
294   {
295     /* no result */
296     LOG (GNUNET_ERROR_TYPE_DEBUG,
297          "Ending iteration (no more results)\n");
298     PQclear (res);
299     return 0;
300   }
301   if (iter == NULL)
302   {
303     PQclear (res);
304     return cnt;
305   }
306   if ( (4 != PQnfields (res)) ||
307        (sizeof (uint64_t) != PQfsize (res, 0)) ||
308        (sizeof (uint32_t) != PQfsize (res, 1)))
309   {
310     GNUNET_break (0);
311     PQclear (res);
312     return 0;
313   }
314   for (i = 0; i < cnt; i++)
315   {
316     expiration_time.abs_value_us =
317         GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
318     type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
319     size = PQgetlength (res, i, 2);
320     path_len = PQgetlength (res, i, 3);
321     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
322     {
323       GNUNET_break (0);
324       path_len = 0;
325     }
326     path_len %= sizeof (struct GNUNET_PeerIdentity);
327     path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, i, 3);
328     LOG (GNUNET_ERROR_TYPE_DEBUG,
329          "Found result of size %u bytes and type %u in database\n",
330          (unsigned int) size, (unsigned int) type);
331     if (GNUNET_SYSERR ==
332         iter (iter_cls, key, size, PQgetvalue (res, i, 2),
333               (enum GNUNET_BLOCK_Type) type,
334               expiration_time,
335               path_len,
336               path))
337     {
338       LOG (GNUNET_ERROR_TYPE_DEBUG,
339            "Ending iteration (client error)\n");
340       PQclear (res);
341       return cnt;
342     }
343   }
344   PQclear (res);
345   return cnt;
346 }
347
348
349 /**
350  * Delete the entry with the lowest expiration value
351  * from the datacache right now.
352  *
353  * @param cls closure (our `struct Plugin`)
354  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
355  */
356 static int
357 postgres_plugin_del (void *cls)
358 {
359   struct Plugin *plugin = cls;
360   uint32_t size;
361   uint32_t oid;
362   struct GNUNET_HashCode key;
363   PGresult *res;
364
365   res = PQexecPrepared (plugin->dbh,
366                         "getm",
367                         0, NULL, NULL, NULL, 1);
368   if (GNUNET_OK !=
369       GNUNET_POSTGRES_check_result (plugin->dbh,
370                                     res,
371                                     PGRES_TUPLES_OK,
372                                     "PQexecPrepared",
373                                     "getm"))
374   {
375     LOG (GNUNET_ERROR_TYPE_DEBUG,
376          "Ending iteration (postgres error)\n");
377     return 0;
378   }
379   if (0 == PQntuples (res))
380   {
381     /* no result */
382     LOG (GNUNET_ERROR_TYPE_DEBUG,
383          "Ending iteration (no more results)\n");
384     PQclear (res);
385     return GNUNET_SYSERR;
386   }
387   if ((3 != PQnfields (res)) || (sizeof (size) != PQfsize (res, 0)) ||
388       (sizeof (oid) != PQfsize (res, 1)) ||
389       (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 2)))
390   {
391     GNUNET_break (0);
392     PQclear (res);
393     return 0;
394   }
395   size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
396   oid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
397   memcpy (&key, PQgetvalue (res, 0, 2), sizeof (struct GNUNET_HashCode));
398   PQclear (res);
399   if (GNUNET_OK !=
400       GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
401                                        "delrow",
402                                        oid))
403     return GNUNET_SYSERR;
404   plugin->num_items--;
405   plugin->env->delete_notify (plugin->env->cls,
406                               &key,
407                               size + OVERHEAD);
408   return GNUNET_OK;
409 }
410
411
412 /**
413  * Obtain a random key-value pair from the datacache.
414  *
415  * @param cls closure (our `struct Plugin`)
416  * @param iter maybe NULL (to just count)
417  * @param iter_cls closure for @a iter
418  * @return the number of results found, zero (datacache empty) or one
419  */
420 static unsigned int
421 postgres_plugin_get_random (void *cls,
422                             GNUNET_DATACACHE_Iterator iter,
423                             void *iter_cls)
424 {
425   struct Plugin *plugin = cls;
426   unsigned int off;
427   uint32_t off_be;
428   struct GNUNET_TIME_Absolute expiration_time;
429   uint32_t size;
430   unsigned int path_len;
431   const struct GNUNET_PeerIdentity *path;
432   const struct GNUNET_HashCode *key;
433   unsigned int type;
434   PGresult *res;
435   const char *paramValues[] = {
436     (const char *) &off_be,
437   };
438   int paramLengths[] = {
439     sizeof (off_be),
440   };
441   const int paramFormats[] = { 1 };
442
443   if (0 == plugin->num_items)
444     return 0;
445   if (NULL == iter)
446     return 1;
447   off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
448                                   plugin->num_items);
449   off_be = htonl (off);
450   res =
451     PQexecPrepared (plugin->dbh, "get_random",
452                     1, paramValues, paramLengths, paramFormats,
453                     1);
454   if (GNUNET_OK !=
455       GNUNET_POSTGRES_check_result (plugin->dbh,
456                                     res,
457                                     PGRES_TUPLES_OK,
458                                     "PQexecPrepared",
459                                     "get_random"))
460   {
461     GNUNET_break (0);
462     return 0;
463   }
464   if (0 == PQntuples (res))
465   {
466     GNUNET_break (0);
467     return 0;
468   }
469   if ( (5 != PQnfields (res)) ||
470        (sizeof (uint64_t) != PQfsize (res, 0)) ||
471        (sizeof (uint32_t) != PQfsize (res, 1)) ||
472        (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
473   {
474     GNUNET_break (0);
475     PQclear (res);
476     return 0;
477   }
478   expiration_time.abs_value_us =
479     GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 0));
480   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
481   size = PQgetlength (res, 0, 2);
482   path_len = PQgetlength (res, 0, 3);
483   if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
484   {
485     GNUNET_break (0);
486     path_len = 0;
487   }
488   path_len %= sizeof (struct GNUNET_PeerIdentity);
489   path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, 0, 3);
490   key = (const struct GNUNET_HashCode *) PQgetvalue (res, 0, 4);
491   LOG (GNUNET_ERROR_TYPE_DEBUG,
492        "Found random value with key %s of size %u bytes and type %u in database\n",
493        GNUNET_h2s (key),
494        (unsigned int) size,
495        (unsigned int) type);
496   (void) iter (iter_cls,
497                key,
498                size,
499                PQgetvalue (res, 0, 2),
500                (enum GNUNET_BLOCK_Type) type,
501                expiration_time,
502                path_len,
503                path);
504   PQclear (res);
505   return 1;
506 }
507
508
509 /**
510  * Entry point for the plugin.
511  *
512  * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
513  * @return the plugin's closure (our `struct Plugin`)
514  */
515 void *
516 libgnunet_plugin_datacache_postgres_init (void *cls)
517 {
518   struct GNUNET_DATACACHE_PluginEnvironment *env = cls;
519   struct GNUNET_DATACACHE_PluginFunctions *api;
520   struct Plugin *plugin;
521
522   plugin = GNUNET_new (struct Plugin);
523   plugin->env = env;
524
525   if (GNUNET_OK != init_connection (plugin))
526   {
527     GNUNET_free (plugin);
528     return NULL;
529   }
530
531   api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
532   api->cls = plugin;
533   api->get = &postgres_plugin_get;
534   api->put = &postgres_plugin_put;
535   api->del = &postgres_plugin_del;
536   api->get_random = &postgres_plugin_get_random;
537   LOG (GNUNET_ERROR_TYPE_INFO,
538        "Postgres datacache running\n");
539   return api;
540 }
541
542
543 /**
544  * Exit point from the plugin.
545  *
546  * @param cls closure (our `struct Plugin`)
547  * @return NULL
548  */
549 void *
550 libgnunet_plugin_datacache_postgres_done (void *cls)
551 {
552   struct GNUNET_DATACACHE_PluginFunctions *api = cls;
553   struct Plugin *plugin = api->cls;
554
555   PQfinish (plugin->dbh);
556   GNUNET_free (plugin);
557   GNUNET_free (api);
558   return NULL;
559 }
560
561
562
563 /* end of plugin_datacache_postgres.c */