e69f2b2c6b85045a008ef58fbbc9df0a3a742a73
[oweals/gnunet.git] / src / datacache / plugin_datacache_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2006, 2009, 2010, 2012, 2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datacache/plugin_datacache_postgres.c
23  * @brief postgres for an implementation of a database backend for the datacache
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_postgres_lib.h"
29 #include "gnunet_pq_lib.h"
30 #include "gnunet_datacache_plugin.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
33
34 /**
35  * Per-entry overhead estimate
36  */
37 #define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
38
39 /**
40  * Context for all functions in this plugin.
41  */
42 struct Plugin
43 {
44   /**
45    * Our execution environment.
46    */
47   struct GNUNET_DATACACHE_PluginEnvironment *env;
48
49   /**
50    * Native Postgres database handle.
51    */
52   PGconn *dbh;
53
54   /**
55    * Number of key-value pairs in the database.
56    */
57   unsigned int num_items;
58 };
59
60
61 /**
62  * @brief Get a database handle
63  *
64  * @param plugin global context
65  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
66  */
67 static int
68 init_connection (struct Plugin *plugin)
69 {
70   struct GNUNET_PQ_ExecuteStatement es[] = {
71     GNUNET_PQ_make_execute ("CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
72                             "  type INTEGER NOT NULL DEFAULT 0,"
73                             "  discard_time BIGINT NOT NULL DEFAULT 0,"
74                             "  key BYTEA NOT NULL DEFAULT '',"
75                             "  value BYTEA NOT NULL DEFAULT '',"
76                             "  path BYTEA DEFAULT '')"
77                             "WITH OIDS"),
78     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)"),
79     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)"),
80     GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL"),
81     GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN"),
82     GNUNET_PQ_EXECUTE_STATEMENT_END
83   };
84   struct GNUNET_PQ_PreparedStatement ps[] = {
85     GNUNET_PQ_make_prepare ("getkt",
86                             "SELECT discard_time,type,value,path FROM gn090dc "
87                             "WHERE key=$1 AND type=$2",
88                             2),
89     GNUNET_PQ_make_prepare ("getk",
90                             "SELECT discard_time,type,value,path FROM gn090dc "
91                             "WHERE key=$1",
92                             1),
93     GNUNET_PQ_make_prepare ("getm",
94                             "SELECT length(value) AS len,oid,key FROM gn090dc "
95                             "ORDER BY discard_time ASC LIMIT 1",
96                             0),
97     GNUNET_PQ_make_prepare ("get_random",
98                             "SELECT discard_time,type,value,path,key FROM gn090dc "
99                             "ORDER BY key ASC LIMIT 1 OFFSET $1",
100                             1),
101     GNUNET_PQ_make_prepare ("get_closest",
102                             "SELECT discard_time,type,value,path,key FROM gn090dc "
103                             "WHERE key>=$1 ORDER BY key ASC LIMIT $2",
104                             1),
105     GNUNET_PQ_make_prepare ("delrow",
106                             "DELETE FROM gn090dc WHERE oid=$1",
107                             1),
108     GNUNET_PQ_make_prepare ("put",
109                             "INSERT INTO gn090dc (type, discard_time, key, value, path) "
110                             "VALUES ($1, $2, $3, $4, $5)",
111                             5),
112     GNUNET_PQ_PREPARED_STATEMENT_END
113   };
114
115   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
116                                             "datacache-postgres");
117   if (NULL == plugin->dbh)
118     return GNUNET_SYSERR;
119   if (GNUNET_OK !=
120       GNUNET_PQ_exec_statements (plugin->dbh,
121                                  es))
122   {
123     PQfinish (plugin->dbh);
124     plugin->dbh = NULL;
125     return GNUNET_SYSERR;
126   }
127
128   if (GNUNET_OK !=
129       GNUNET_PQ_prepare_statements (plugin->dbh,
130                                     ps))
131   {
132     PQfinish (plugin->dbh);
133     plugin->dbh = NULL;
134     return GNUNET_SYSERR;
135   }
136   return GNUNET_OK;
137 }
138
139
140 /**
141  * Store an item in the datastore.
142  *
143  * @param cls closure (our `struct Plugin`)
144  * @param key key to store @a data under
145  * @param data_size number of bytes in @a data
146  * @param data data to store
147  * @param type type of the value
148  * @param discard_time when to discard the value in any case
149  * @param path_info_len number of entries in @a path_info
150  * @param path_info a path through the network
151  * @return 0 if duplicate, -1 on error, number of bytes used otherwise
152  */
153 static ssize_t
154 postgres_plugin_put (void *cls,
155                      const struct GNUNET_HashCode *key,
156                      size_t data_size,
157                      const char *data,
158                      enum GNUNET_BLOCK_Type type,
159                      struct GNUNET_TIME_Absolute discard_time,
160                      unsigned int path_info_len,
161                      const struct GNUNET_PeerIdentity *path_info)
162 {
163   struct Plugin *plugin = cls;
164   uint32_t type32 = (uint32_t) type;
165   struct GNUNET_PQ_QueryParam params[] = {
166     GNUNET_PQ_query_param_uint32 (&type32),
167     GNUNET_PQ_query_param_absolute_time (&discard_time),
168     GNUNET_PQ_query_param_auto_from_type (key),
169     GNUNET_PQ_query_param_fixed_size (data, data_size),
170     GNUNET_PQ_query_param_fixed_size (path_info,
171                                       path_info_len * sizeof (struct GNUNET_PeerIdentity)),
172     GNUNET_PQ_query_param_end
173   };
174   enum GNUNET_PQ_QueryStatus ret;
175
176   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
177                                             "put",
178                                             params);
179   if (0 > ret)
180     return -1;
181   plugin->num_items++;
182   return data_size + OVERHEAD;
183 }
184
185
186 /**
187  * Closure for #handle_results.
188  */
189 struct HandleResultContext
190 {
191
192   /**
193    * Function to call on each result, may be NULL.
194    */
195   GNUNET_DATACACHE_Iterator iter;
196
197   /**
198    * Closure for @e iter.
199    */
200   void *iter_cls;
201
202   /**
203    * Key used.
204    */
205   const struct GNUNET_HashCode *key;
206 };
207
208
209 /**
210  * Function to be called with the results of a SELECT statement
211  * that has returned @a num_results results.  Parse the result
212  * and call the callback given in @a cls
213  *
214  * @param cls closure of type `struct HandleResultContext`
215  * @param result the postgres result
216  * @param num_result the number of results in @a result
217  */
218 static void
219 handle_results (void *cls,
220                 PGresult *result,
221                 unsigned int num_results)
222 {
223   struct HandleResultContext *hrc = cls;
224
225   for (unsigned int i=0;i<num_results;i++)
226   {
227     struct GNUNET_TIME_Absolute expiration_time;
228     uint32_t type;
229     void *data;
230     size_t data_size;
231     struct GNUNET_PeerIdentity *path;
232     size_t path_len;
233     struct GNUNET_PQ_ResultSpec rs[] = {
234       GNUNET_PQ_result_spec_absolute_time ("discard_time",
235                                            &expiration_time),
236       GNUNET_PQ_result_spec_uint32 ("type",
237                                     &type),
238       GNUNET_PQ_result_spec_variable_size ("value",
239                                            &data,
240                                            &data_size),
241       GNUNET_PQ_result_spec_variable_size ("path",
242                                            (void **) &path,
243                                            &path_len),
244       GNUNET_PQ_result_spec_end
245     };
246
247     if (GNUNET_YES !=
248         GNUNET_PQ_extract_result (result,
249                                   rs,
250                                   i))
251     {
252       GNUNET_break (0);
253       return;
254     }
255     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
256     {
257       GNUNET_break (0);
258       path_len = 0;
259     }
260     path_len %= sizeof (struct GNUNET_PeerIdentity);
261     LOG (GNUNET_ERROR_TYPE_DEBUG,
262          "Found result of size %u bytes and type %u in database\n",
263          (unsigned int) data_size,
264          (unsigned int) type);
265     if ( (NULL != hrc->iter) &&
266          (GNUNET_SYSERR ==
267           hrc->iter (hrc->iter_cls,
268                      hrc->key,
269                      data_size,
270                      data,
271                      (enum GNUNET_BLOCK_Type) type,
272                      expiration_time,
273                      path_len,
274                      path)) )
275     {
276       LOG (GNUNET_ERROR_TYPE_DEBUG,
277            "Ending iteration (client error)\n");
278       GNUNET_PQ_cleanup_result (rs);
279       return;
280     }
281     GNUNET_PQ_cleanup_result (rs);
282   }
283 }
284
285
286 /**
287  * Iterate over the results for a particular key
288  * in the datastore.
289  *
290  * @param cls closure (our `struct Plugin`)
291  * @param key key to look for
292  * @param type entries of which type are relevant?
293  * @param iter maybe NULL (to just count)
294  * @param iter_cls closure for @a iter
295  * @return the number of results found
296  */
297 static unsigned int
298 postgres_plugin_get (void *cls,
299                      const struct GNUNET_HashCode *key,
300                      enum GNUNET_BLOCK_Type type,
301                      GNUNET_DATACACHE_Iterator iter,
302                      void *iter_cls)
303 {
304   struct Plugin *plugin = cls;
305   uint32_t type32 = (uint32_t) type;
306   struct GNUNET_PQ_QueryParam paramk[] = {
307     GNUNET_PQ_query_param_auto_from_type (key),
308     GNUNET_PQ_query_param_end
309   };
310   struct GNUNET_PQ_QueryParam paramkt[] = {
311     GNUNET_PQ_query_param_auto_from_type (key),
312     GNUNET_PQ_query_param_uint32 (&type32),
313     GNUNET_PQ_query_param_end
314   };
315   enum GNUNET_PQ_QueryStatus res;
316   struct HandleResultContext hr_ctx;
317
318   hr_ctx.iter = iter;
319   hr_ctx.iter_cls = iter_cls;
320   hr_ctx.key = key;
321   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
322                                               (0 == type) ? "getk" : "getkt",
323                                               (0 == type) ? paramk : paramkt,
324                                               &handle_results,
325                                               &hr_ctx);
326   if (res < 0)
327     return 0;
328   return res;
329 }
330
331
332 /**
333  * Delete the entry with the lowest expiration value
334  * from the datacache right now.
335  *
336  * @param cls closure (our `struct Plugin`)
337  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
338  */
339 static int
340 postgres_plugin_del (void *cls)
341 {
342   struct Plugin *plugin = cls;
343   struct GNUNET_PQ_QueryParam pempty[] = {
344     GNUNET_PQ_query_param_end
345   };
346   uint32_t size;
347   uint32_t oid;
348   struct GNUNET_HashCode key;
349   struct GNUNET_PQ_ResultSpec rs[] = {
350     GNUNET_PQ_result_spec_uint32 ("len",
351                                   &size),
352     GNUNET_PQ_result_spec_uint32 ("oid",
353                                   &oid),
354     GNUNET_PQ_result_spec_auto_from_type ("key",
355                                           &key),
356     GNUNET_PQ_result_spec_end
357   };
358   enum GNUNET_PQ_QueryStatus res;
359   struct GNUNET_PQ_QueryParam dparam[] = {
360     GNUNET_PQ_query_param_uint32 (&oid),
361     GNUNET_PQ_query_param_end
362   };
363
364   res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
365                                                   "getm",
366                                                   pempty,
367                                                   rs);
368   if (0 > res)
369     return GNUNET_SYSERR;
370   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS == res)
371   {
372     /* no result */
373     LOG (GNUNET_ERROR_TYPE_DEBUG,
374          "Ending iteration (no more results)\n");
375     return 0;
376   }
377   res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
378                                             "delrow",
379                                             dparam);
380   if (0 > res)
381   {
382     GNUNET_PQ_cleanup_result (rs);
383     return GNUNET_SYSERR;
384   }
385   plugin->num_items--;
386   plugin->env->delete_notify (plugin->env->cls,
387                               &key,
388                               size + OVERHEAD);
389   GNUNET_PQ_cleanup_result (rs);
390   return GNUNET_OK;
391 }
392
393
394 /**
395  * Obtain a random key-value pair from the datacache.
396  *
397  * @param cls closure (our `struct Plugin`)
398  * @param iter maybe NULL (to just count)
399  * @param iter_cls closure for @a iter
400  * @return the number of results found, zero (datacache empty) or one
401  */
402 static unsigned int
403 postgres_plugin_get_random (void *cls,
404                             GNUNET_DATACACHE_Iterator iter,
405                             void *iter_cls)
406 {
407   struct Plugin *plugin = cls;
408   unsigned int off;
409   uint32_t off_be;
410   struct GNUNET_TIME_Absolute expiration_time;
411   uint32_t size;
412   unsigned int path_len;
413   const struct GNUNET_PeerIdentity *path;
414   const struct GNUNET_HashCode *key;
415   unsigned int type;
416   PGresult *res;
417   const char *paramValues[] = {
418     (const char *) &off_be
419   };
420   int paramLengths[] = {
421     sizeof (off_be)
422   };
423   const int paramFormats[] = { 1 };
424
425   if (0 == plugin->num_items)
426     return 0;
427   if (NULL == iter)
428     return 1;
429   off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
430                                   plugin->num_items);
431   off_be = htonl (off);
432   res =
433     PQexecPrepared (plugin->dbh, "get_random",
434                     1, paramValues, paramLengths, paramFormats,
435                     1);
436   if (GNUNET_OK !=
437       GNUNET_POSTGRES_check_result (plugin->dbh,
438                                     res,
439                                     PGRES_TUPLES_OK,
440                                     "PQexecPrepared",
441                                     "get_random"))
442   {
443     GNUNET_break (0);
444     return 0;
445   }
446   if (0 == PQntuples (res))
447   {
448     GNUNET_break (0);
449     return 0;
450   }
451   if ( (5 != PQnfields (res)) ||
452        (sizeof (uint64_t) != PQfsize (res, 0)) ||
453        (sizeof (uint32_t) != PQfsize (res, 1)) ||
454        (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
455   {
456     GNUNET_break (0);
457     PQclear (res);
458     return 0;
459   }
460   expiration_time.abs_value_us =
461     GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 0));
462   type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
463   size = PQgetlength (res, 0, 2);
464   path_len = PQgetlength (res, 0, 3);
465   if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
466   {
467     GNUNET_break (0);
468     path_len = 0;
469   }
470   path_len %= sizeof (struct GNUNET_PeerIdentity);
471   path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, 0, 3);
472   key = (const struct GNUNET_HashCode *) PQgetvalue (res, 0, 4);
473   LOG (GNUNET_ERROR_TYPE_DEBUG,
474        "Found random value with key %s of size %u bytes and type %u in database\n",
475        GNUNET_h2s (key),
476        (unsigned int) size,
477        (unsigned int) type);
478   (void) iter (iter_cls,
479                key,
480                size,
481                PQgetvalue (res, 0, 2),
482                (enum GNUNET_BLOCK_Type) type,
483                expiration_time,
484                path_len,
485                path);
486   PQclear (res);
487   return 1;
488 }
489
490
491 /**
492  * Iterate over the results that are "close" to a particular key in
493  * the datacache.  "close" is defined as numerically larger than @a
494  * key (when interpreted as a circular address space), with small
495  * distance.
496  *
497  * @param cls closure (internal context for the plugin)
498  * @param key area of the keyspace to look into
499  * @param num_results number of results that should be returned to @a iter
500  * @param iter maybe NULL (to just count)
501  * @param iter_cls closure for @a iter
502  * @return the number of results found
503  */
504 static unsigned int
505 postgres_plugin_get_closest (void *cls,
506                              const struct GNUNET_HashCode *key,
507                              unsigned int num_results,
508                              GNUNET_DATACACHE_Iterator iter,
509                              void *iter_cls)
510 {
511   struct Plugin *plugin = cls;
512   uint32_t nbo_limit = htonl (num_results);
513   const char *paramValues[] = {
514     (const char *) key,
515     (const char *) &nbo_limit,
516   };
517   int paramLengths[] = {
518     sizeof (struct GNUNET_HashCode),
519     sizeof (nbo_limit)
520
521   };
522   const int paramFormats[] = { 1, 1 };
523   struct GNUNET_TIME_Absolute expiration_time;
524   uint32_t size;
525   unsigned int type;
526   unsigned int cnt;
527   unsigned int i;
528   unsigned int path_len;
529   const struct GNUNET_PeerIdentity *path;
530   PGresult *res;
531
532   res =
533       PQexecPrepared (plugin->dbh,
534                       "get_closest",
535                       2,
536                       paramValues,
537                       paramLengths,
538                       paramFormats,
539                       1);
540   if (GNUNET_OK !=
541       GNUNET_POSTGRES_check_result (plugin->dbh,
542                                     res,
543                                     PGRES_TUPLES_OK,
544                                     "PQexecPrepared",
545                                     "get_closest"))
546   {
547     LOG (GNUNET_ERROR_TYPE_DEBUG,
548          "Ending iteration (postgres error)\n");
549     return 0;
550   }
551
552   if (0 == (cnt = PQntuples (res)))
553   {
554     /* no result */
555     LOG (GNUNET_ERROR_TYPE_DEBUG,
556          "Ending iteration (no more results)\n");
557     PQclear (res);
558     return 0;
559   }
560   if (NULL == iter)
561   {
562     PQclear (res);
563     return cnt;
564   }
565   if ( (5 != PQnfields (res)) ||
566        (sizeof (uint64_t) != PQfsize (res, 0)) ||
567        (sizeof (uint32_t) != PQfsize (res, 1)) ||
568        (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
569   {
570     GNUNET_break (0);
571     PQclear (res);
572     return 0;
573   }
574   for (i = 0; i < cnt; i++)
575   {
576     expiration_time.abs_value_us =
577         GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
578     type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
579     size = PQgetlength (res, i, 2);
580     path_len = PQgetlength (res, i, 3);
581     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
582     {
583       GNUNET_break (0);
584       path_len = 0;
585     }
586     path_len %= sizeof (struct GNUNET_PeerIdentity);
587     path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, i, 3);
588     key = (const struct GNUNET_HashCode *) PQgetvalue (res, i, 4);
589     LOG (GNUNET_ERROR_TYPE_DEBUG,
590          "Found result of size %u bytes and type %u in database\n",
591          (unsigned int) size,
592          (unsigned int) type);
593     if (GNUNET_SYSERR ==
594         iter (iter_cls,
595               key,
596               size,
597               PQgetvalue (res, i, 2),
598               (enum GNUNET_BLOCK_Type) type,
599               expiration_time,
600               path_len,
601               path))
602     {
603       LOG (GNUNET_ERROR_TYPE_DEBUG,
604            "Ending iteration (client error)\n");
605       PQclear (res);
606       return cnt;
607     }
608   }
609   PQclear (res);
610   return cnt;
611 }
612
613
614 /**
615  * Entry point for the plugin.
616  *
617  * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
618  * @return the plugin's closure (our `struct Plugin`)
619  */
620 void *
621 libgnunet_plugin_datacache_postgres_init (void *cls)
622 {
623   struct GNUNET_DATACACHE_PluginEnvironment *env = cls;
624   struct GNUNET_DATACACHE_PluginFunctions *api;
625   struct Plugin *plugin;
626
627   plugin = GNUNET_new (struct Plugin);
628   plugin->env = env;
629
630   if (GNUNET_OK != init_connection (plugin))
631   {
632     GNUNET_free (plugin);
633     return NULL;
634   }
635
636   api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
637   api->cls = plugin;
638   api->get = &postgres_plugin_get;
639   api->put = &postgres_plugin_put;
640   api->del = &postgres_plugin_del;
641   api->get_random = &postgres_plugin_get_random;
642   api->get_closest = &postgres_plugin_get_closest;
643   LOG (GNUNET_ERROR_TYPE_INFO,
644        "Postgres datacache running\n");
645   return api;
646 }
647
648
649 /**
650  * Exit point from the plugin.
651  *
652  * @param cls closure (our `struct Plugin`)
653  * @return NULL
654  */
655 void *
656 libgnunet_plugin_datacache_postgres_done (void *cls)
657 {
658   struct GNUNET_DATACACHE_PluginFunctions *api = cls;
659   struct Plugin *plugin = api->cls;
660
661   PQfinish (plugin->dbh);
662   GNUNET_free (plugin);
663   GNUNET_free (api);
664   return NULL;
665 }
666
667
668
669 /* end of plugin_datacache_postgres.c */