glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / datacache / plugin_datacache_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2006, 2009, 2010, 2012, 2015, 2017, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file datacache/plugin_datacache_postgres.c
18  * @brief postgres for an implementation of a database backend for the datacache
19  * @author Christian Grothoff
20  */
21 #include "platform.h"
22 #include "gnunet_util_lib.h"
23 #include "gnunet_pq_lib.h"
24 #include "gnunet_datacache_plugin.h"
25
26 #define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
27
28 /**
29  * Per-entry overhead estimate
30  */
31 #define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
32
33 /**
34  * Context for all functions in this plugin.
35  */
36 struct Plugin
37 {
38   /**
39    * Our execution environment.
40    */
41   struct GNUNET_DATACACHE_PluginEnvironment *env;
42
43   /**
44    * Native Postgres database handle.
45    */
46   PGconn *dbh;
47
48   /**
49    * Number of key-value pairs in the database.
50    */
51   unsigned int num_items;
52 };
53
54
55 /**
56  * @brief Get a database handle
57  *
58  * @param plugin global context
59  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
60  */
61 static int
62 init_connection (struct Plugin *plugin)
63 {
64   struct GNUNET_PQ_ExecuteStatement es[] = {
65     GNUNET_PQ_make_execute ("CREATE TEMPORARY TABLE IF NOT EXISTS gn011dc ("
66                             "  type INTEGER NOT NULL,"
67                             "  prox INTEGER NOT NULL,"
68                             "  discard_time BIGINT NOT NULL,"
69                             "  key BYTEA NOT NULL,"
70                             "  value BYTEA NOT NULL,"
71                             "  path BYTEA DEFAULT NULL)"
72                             "WITH OIDS"),
73     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_key ON gn011dc (key)"),
74     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_dt ON gn011dc (discard_time)"),
75     GNUNET_PQ_make_execute ("ALTER TABLE gn011dc ALTER value SET STORAGE EXTERNAL"),
76     GNUNET_PQ_make_execute ("ALTER TABLE gn011dc ALTER key SET STORAGE PLAIN"),
77     GNUNET_PQ_EXECUTE_STATEMENT_END
78   };
79   struct GNUNET_PQ_PreparedStatement ps[] = {
80     GNUNET_PQ_make_prepare ("getkt",
81                             "SELECT discard_time,type,value,path FROM gn011dc "
82                             "WHERE key=$1 AND type=$2",
83                             2),
84     GNUNET_PQ_make_prepare ("getk",
85                             "SELECT discard_time,type,value,path FROM gn011dc "
86                             "WHERE key=$1",
87                             1),
88     GNUNET_PQ_make_prepare ("getex",
89                             "SELECT length(value) AS len,oid,key FROM gn011dc"
90                             " WHERE discard_time < $1"
91                             " ORDER BY discard_time ASC LIMIT 1",
92                             1),
93     GNUNET_PQ_make_prepare ("getm",
94                             "SELECT length(value) AS len,oid,key FROM gn011dc"
95                             " ORDER BY prox ASC, discard_time ASC LIMIT 1",
96                             0),
97     GNUNET_PQ_make_prepare ("getp",
98                             "SELECT length(value) AS len,oid,key FROM gn011dc "
99                             "ORDER BY discard_time ASC LIMIT 1",
100                             0),
101     GNUNET_PQ_make_prepare ("get_random",
102                             "SELECT discard_time,type,value,path,key FROM gn011dc "
103                             "ORDER BY key ASC LIMIT 1 OFFSET $1",
104                             1),
105     GNUNET_PQ_make_prepare ("get_closest",
106                             "SELECT discard_time,type,value,path,key FROM gn011dc "
107                             "WHERE key>=$1 ORDER BY key ASC LIMIT $2",
108                             1),
109     GNUNET_PQ_make_prepare ("delrow",
110                             "DELETE FROM gn011dc WHERE oid=$1",
111                             1),
112     GNUNET_PQ_make_prepare ("put",
113                             "INSERT INTO gn011dc (type, prox, discard_time, key, value, path) "
114                             "VALUES ($1, $2, $3, $4, $5, $6)",
115                             6),
116     GNUNET_PQ_PREPARED_STATEMENT_END
117   };
118
119   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
120                                             "datacache-postgres");
121   if (NULL == plugin->dbh)
122     return GNUNET_SYSERR;
123   if (GNUNET_OK !=
124       GNUNET_PQ_exec_statements (plugin->dbh,
125                                  es))
126   {
127     PQfinish (plugin->dbh);
128     plugin->dbh = NULL;
129     return GNUNET_SYSERR;
130   }
131
132   if (GNUNET_OK !=
133       GNUNET_PQ_prepare_statements (plugin->dbh,
134                                     ps))
135   {
136     PQfinish (plugin->dbh);
137     plugin->dbh = NULL;
138     return GNUNET_SYSERR;
139   }
140   return GNUNET_OK;
141 }
142
143
144 /**
145  * Store an item in the datastore.
146  *
147  * @param cls closure (our `struct Plugin`)
148  * @param key key to store @a data under
149  * @param prox proximity of @a key to my PID
150  * @param data_size number of bytes in @a data
151  * @param data data to store
152  * @param type type of the value
153  * @param discard_time when to discard the value in any case
154  * @param path_info_len number of entries in @a path_info
155  * @param path_info a path through the network
156  * @return 0 if duplicate, -1 on error, number of bytes used otherwise
157  */
158 static ssize_t
159 postgres_plugin_put (void *cls,
160                      const struct GNUNET_HashCode *key,
161                      uint32_t prox,
162                      size_t data_size,
163                      const char *data,
164                      enum GNUNET_BLOCK_Type type,
165                      struct GNUNET_TIME_Absolute discard_time,
166                      unsigned int path_info_len,
167                      const struct GNUNET_PeerIdentity *path_info)
168 {
169   struct Plugin *plugin = cls;
170   uint32_t type32 = (uint32_t) type;
171   struct GNUNET_PQ_QueryParam params[] = {
172     GNUNET_PQ_query_param_uint32 (&type32),
173     GNUNET_PQ_query_param_uint32 (&prox),
174     GNUNET_PQ_query_param_absolute_time (&discard_time),
175     GNUNET_PQ_query_param_auto_from_type (key),
176     GNUNET_PQ_query_param_fixed_size (data, data_size),
177     GNUNET_PQ_query_param_fixed_size (path_info,
178                                       path_info_len * sizeof (struct GNUNET_PeerIdentity)),
179     GNUNET_PQ_query_param_end
180   };
181   enum GNUNET_DB_QueryStatus ret;
182
183   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
184                                             "put",
185                                             params);
186   if (0 > ret)
187     return -1;
188   plugin->num_items++;
189   return data_size + OVERHEAD;
190 }
191
192
193 /**
194  * Closure for #handle_results.
195  */
196 struct HandleResultContext
197 {
198
199   /**
200    * Function to call on each result, may be NULL.
201    */
202   GNUNET_DATACACHE_Iterator iter;
203
204   /**
205    * Closure for @e iter.
206    */
207   void *iter_cls;
208
209   /**
210    * Key used.
211    */
212   const struct GNUNET_HashCode *key;
213 };
214
215
216 /**
217  * Function to be called with the results of a SELECT statement
218  * that has returned @a num_results results.  Parse the result
219  * and call the callback given in @a cls
220  *
221  * @param cls closure of type `struct HandleResultContext`
222  * @param result the postgres result
223  * @param num_result the number of results in @a result
224  */
225 static void
226 handle_results (void *cls,
227                 PGresult *result,
228                 unsigned int num_results)
229 {
230   struct HandleResultContext *hrc = cls;
231
232   for (unsigned int i=0;i<num_results;i++)
233   {
234     struct GNUNET_TIME_Absolute expiration_time;
235     uint32_t type;
236     void *data;
237     size_t data_size;
238     struct GNUNET_PeerIdentity *path;
239     size_t path_len;
240     struct GNUNET_PQ_ResultSpec rs[] = {
241       GNUNET_PQ_result_spec_absolute_time ("discard_time",
242                                            &expiration_time),
243       GNUNET_PQ_result_spec_uint32 ("type",
244                                     &type),
245       GNUNET_PQ_result_spec_variable_size ("value",
246                                            &data,
247                                            &data_size),
248       GNUNET_PQ_result_spec_variable_size ("path",
249                                            (void **) &path,
250                                            &path_len),
251       GNUNET_PQ_result_spec_end
252     };
253
254     if (GNUNET_YES !=
255         GNUNET_PQ_extract_result (result,
256                                   rs,
257                                   i))
258     {
259       GNUNET_break (0);
260       return;
261     }
262     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
263     {
264       GNUNET_break (0);
265       path_len = 0;
266     }
267     path_len %= sizeof (struct GNUNET_PeerIdentity);
268     LOG (GNUNET_ERROR_TYPE_DEBUG,
269          "Found result of size %u bytes and type %u in database\n",
270          (unsigned int) data_size,
271          (unsigned int) type);
272     if ( (NULL != hrc->iter) &&
273          (GNUNET_SYSERR ==
274           hrc->iter (hrc->iter_cls,
275                      hrc->key,
276                      data_size,
277                      data,
278                      (enum GNUNET_BLOCK_Type) type,
279                      expiration_time,
280                      path_len,
281                      path)) )
282     {
283       LOG (GNUNET_ERROR_TYPE_DEBUG,
284            "Ending iteration (client error)\n");
285       GNUNET_PQ_cleanup_result (rs);
286       return;
287     }
288     GNUNET_PQ_cleanup_result (rs);
289   }
290 }
291
292
293 /**
294  * Iterate over the results for a particular key
295  * in the datastore.
296  *
297  * @param cls closure (our `struct Plugin`)
298  * @param key key to look for
299  * @param type entries of which type are relevant?
300  * @param iter maybe NULL (to just count)
301  * @param iter_cls closure for @a iter
302  * @return the number of results found
303  */
304 static unsigned int
305 postgres_plugin_get (void *cls,
306                      const struct GNUNET_HashCode *key,
307                      enum GNUNET_BLOCK_Type type,
308                      GNUNET_DATACACHE_Iterator iter,
309                      void *iter_cls)
310 {
311   struct Plugin *plugin = cls;
312   uint32_t type32 = (uint32_t) type;
313   struct GNUNET_PQ_QueryParam paramk[] = {
314     GNUNET_PQ_query_param_auto_from_type (key),
315     GNUNET_PQ_query_param_end
316   };
317   struct GNUNET_PQ_QueryParam paramkt[] = {
318     GNUNET_PQ_query_param_auto_from_type (key),
319     GNUNET_PQ_query_param_uint32 (&type32),
320     GNUNET_PQ_query_param_end
321   };
322   enum GNUNET_DB_QueryStatus res;
323   struct HandleResultContext hr_ctx;
324
325   hr_ctx.iter = iter;
326   hr_ctx.iter_cls = iter_cls;
327   hr_ctx.key = key;
328   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
329                                               (0 == type) ? "getk" : "getkt",
330                                               (0 == type) ? paramk : paramkt,
331                                               &handle_results,
332                                               &hr_ctx);
333   if (res < 0)
334     return 0;
335   return res;
336 }
337
338
339 /**
340  * Delete the entry with the lowest expiration value
341  * from the datacache right now.
342  *
343  * @param cls closure (our `struct Plugin`)
344  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
345  */
346 static int
347 postgres_plugin_del (void *cls)
348 {
349   struct Plugin *plugin = cls;
350   struct GNUNET_PQ_QueryParam pempty[] = {
351     GNUNET_PQ_query_param_end
352   };
353   uint32_t size;
354   uint32_t oid;
355   struct GNUNET_HashCode key;
356   struct GNUNET_PQ_ResultSpec rs[] = {
357     GNUNET_PQ_result_spec_uint32 ("len",
358                                   &size),
359     GNUNET_PQ_result_spec_uint32 ("oid",
360                                   &oid),
361     GNUNET_PQ_result_spec_auto_from_type ("key",
362                                           &key),
363     GNUNET_PQ_result_spec_end
364   };
365   enum GNUNET_DB_QueryStatus res;
366   struct GNUNET_PQ_QueryParam dparam[] = {
367     GNUNET_PQ_query_param_uint32 (&oid),
368     GNUNET_PQ_query_param_end
369   };
370   struct GNUNET_TIME_Absolute now;
371   struct GNUNET_PQ_QueryParam xparam[] = {
372     GNUNET_PQ_query_param_absolute_time (&now),
373     GNUNET_PQ_query_param_end
374   };
375
376   now = GNUNET_TIME_absolute_get ();
377   res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
378                                                   "getex",
379                                                   xparam,
380                                                   rs);
381   if (0 >= res)
382     res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
383                                                     "getm",
384                                                     pempty,
385                                                     rs);
386   if (0 > res)
387     return GNUNET_SYSERR;
388   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
389   {
390     /* no result */
391     LOG (GNUNET_ERROR_TYPE_DEBUG,
392          "Ending iteration (no more results)\n");
393     return 0;
394   }
395   res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
396                                             "delrow",
397                                             dparam);
398   if (0 > res)
399   {
400     GNUNET_PQ_cleanup_result (rs);
401     return GNUNET_SYSERR;
402   }
403   plugin->num_items--;
404   plugin->env->delete_notify (plugin->env->cls,
405                               &key,
406                               size + OVERHEAD);
407   GNUNET_PQ_cleanup_result (rs);
408   return GNUNET_OK;
409 }
410
411
412 /**
413  * Obtain a random key-value pair from the datacache.
414  *
415  * @param cls closure (our `struct Plugin`)
416  * @param iter maybe NULL (to just count)
417  * @param iter_cls closure for @a iter
418  * @return the number of results found, zero (datacache empty) or one
419  */
420 static unsigned int
421 postgres_plugin_get_random (void *cls,
422                             GNUNET_DATACACHE_Iterator iter,
423                             void *iter_cls)
424 {
425   struct Plugin *plugin = cls;
426   uint32_t off;
427   struct GNUNET_TIME_Absolute expiration_time;
428   size_t data_size;
429   void *data;
430   size_t path_len;
431   struct GNUNET_PeerIdentity *path;
432   struct GNUNET_HashCode key;
433   uint32_t type;
434   enum GNUNET_DB_QueryStatus res;
435   struct GNUNET_PQ_QueryParam params[] = {
436     GNUNET_PQ_query_param_uint32 (&off),
437     GNUNET_PQ_query_param_end
438   };
439   struct GNUNET_PQ_ResultSpec rs[] = {
440     GNUNET_PQ_result_spec_absolute_time ("discard_time",
441                                          &expiration_time),
442     GNUNET_PQ_result_spec_uint32 ("type",
443                                   &type),
444     GNUNET_PQ_result_spec_variable_size ("value",
445                                          &data,
446                                          &data_size),
447     GNUNET_PQ_result_spec_variable_size ("path",
448                                          (void **) &path,
449                                          &path_len),
450     GNUNET_PQ_result_spec_auto_from_type ("key",
451                                           &key),
452     GNUNET_PQ_result_spec_end
453   };
454
455   if (0 == plugin->num_items)
456     return 0;
457   if (NULL == iter)
458     return 1;
459   off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
460                                   plugin->num_items);
461   res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
462                                                   "get_random",
463                                                   params,
464                                                   rs);
465   if (0 > res)
466   {
467     GNUNET_break (0);
468     return 0;
469   }
470   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
471   {
472     GNUNET_break (0);
473     return 0;
474   }
475   if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
476   {
477     GNUNET_break (0);
478     path_len = 0;
479   }
480   path_len %= sizeof (struct GNUNET_PeerIdentity);
481   LOG (GNUNET_ERROR_TYPE_DEBUG,
482        "Found random value with key %s of size %u bytes and type %u in database\n",
483        GNUNET_h2s (&key),
484        (unsigned int) data_size,
485        (unsigned int) type);
486   (void) iter (iter_cls,
487                &key,
488                data_size,
489                data,
490                (enum GNUNET_BLOCK_Type) type,
491                expiration_time,
492                path_len,
493                path);
494   GNUNET_PQ_cleanup_result (rs);
495   return 1;
496 }
497
498
499 /**
500  * Closure for #extract_result_cb.
501  */
502 struct ExtractResultContext
503 {
504   /**
505    * Function to call for each result found.
506    */
507   GNUNET_DATACACHE_Iterator iter;
508
509   /**
510    * Closure for @e iter.
511    */
512   void *iter_cls;
513
514 };
515
516
517 /**
518  * Function to be called with the results of a SELECT statement
519  * that has returned @a num_results results.  Calls the `iter`
520  * from @a cls for each result.
521  *
522  * @param cls closure with the `struct ExtractResultContext`
523  * @param result the postgres result
524  * @param num_result the number of results in @a result
525  */
526 static void
527 extract_result_cb (void *cls,
528                    PGresult *result,
529                    unsigned int num_results)
530 {
531   struct ExtractResultContext *erc = cls;
532
533   if (NULL == erc->iter)
534     return;
535   for (unsigned int i=0;i<num_results;i++)
536   {
537     struct GNUNET_TIME_Absolute expiration_time;
538     uint32_t type;
539     void *data;
540     size_t data_size;
541     struct GNUNET_PeerIdentity *path;
542     size_t path_len;
543     struct GNUNET_HashCode key;
544     struct GNUNET_PQ_ResultSpec rs[] = {
545       GNUNET_PQ_result_spec_absolute_time ("",
546                                            &expiration_time),
547       GNUNET_PQ_result_spec_uint32 ("type",
548                                     &type),
549       GNUNET_PQ_result_spec_variable_size ("value",
550                                            &data,
551                                            &data_size),
552       GNUNET_PQ_result_spec_variable_size ("path",
553                                            (void **) &path,
554                                            &path_len),
555       GNUNET_PQ_result_spec_auto_from_type ("key",
556                                             &key),
557       GNUNET_PQ_result_spec_end
558     };
559
560     if (GNUNET_YES !=
561         GNUNET_PQ_extract_result (result,
562                                   rs,
563                                   i))
564     {
565       GNUNET_break (0);
566       return;
567     }
568     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
569     {
570       GNUNET_break (0);
571       path_len = 0;
572     }
573     path_len %= sizeof (struct GNUNET_PeerIdentity);
574     LOG (GNUNET_ERROR_TYPE_DEBUG,
575          "Found result of size %u bytes and type %u in database\n",
576          (unsigned int) data_size,
577          (unsigned int) type);
578     if (GNUNET_SYSERR ==
579         erc->iter (erc->iter_cls,
580                    &key,
581                    data_size,
582                    data,
583                    (enum GNUNET_BLOCK_Type) type,
584                    expiration_time,
585                    path_len,
586                    path))
587     {
588       LOG (GNUNET_ERROR_TYPE_DEBUG,
589            "Ending iteration (client error)\n");
590       GNUNET_PQ_cleanup_result (rs);
591       break;
592     }
593     GNUNET_PQ_cleanup_result (rs);
594   }
595 }
596
597
598 /**
599  * Iterate over the results that are "close" to a particular key in
600  * the datacache.  "close" is defined as numerically larger than @a
601  * key (when interpreted as a circular address space), with small
602  * distance.
603  *
604  * @param cls closure (internal context for the plugin)
605  * @param key area of the keyspace to look into
606  * @param num_results number of results that should be returned to @a iter
607  * @param iter maybe NULL (to just count)
608  * @param iter_cls closure for @a iter
609  * @return the number of results found
610  */
611 static unsigned int
612 postgres_plugin_get_closest (void *cls,
613                              const struct GNUNET_HashCode *key,
614                              unsigned int num_results,
615                              GNUNET_DATACACHE_Iterator iter,
616                              void *iter_cls)
617 {
618   struct Plugin *plugin = cls;
619   uint32_t num_results32 = (uint32_t) num_results;
620   struct GNUNET_PQ_QueryParam params[] = {
621     GNUNET_PQ_query_param_auto_from_type (key),
622     GNUNET_PQ_query_param_uint32 (&num_results32),
623     GNUNET_PQ_query_param_end
624   };
625   enum GNUNET_DB_QueryStatus res;
626   struct ExtractResultContext erc;
627
628   erc.iter = iter;
629   erc.iter_cls = iter_cls;
630   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
631                                               "get_closest",
632                                               params,
633                                               &extract_result_cb,
634                                               &erc);
635   if (0 > res)
636   {
637     LOG (GNUNET_ERROR_TYPE_DEBUG,
638          "Ending iteration (postgres error)\n");
639     return 0;
640   }
641   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
642   {
643     /* no result */
644     LOG (GNUNET_ERROR_TYPE_DEBUG,
645          "Ending iteration (no more results)\n");
646     return 0;
647   }
648   return res;
649 }
650
651
652 /**
653  * Entry point for the plugin.
654  *
655  * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
656  * @return the plugin's closure (our `struct Plugin`)
657  */
658 void *
659 libgnunet_plugin_datacache_postgres_init (void *cls)
660 {
661   struct GNUNET_DATACACHE_PluginEnvironment *env = cls;
662   struct GNUNET_DATACACHE_PluginFunctions *api;
663   struct Plugin *plugin;
664
665   plugin = GNUNET_new (struct Plugin);
666   plugin->env = env;
667
668   if (GNUNET_OK != init_connection (plugin))
669   {
670     GNUNET_free (plugin);
671     return NULL;
672   }
673
674   api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
675   api->cls = plugin;
676   api->get = &postgres_plugin_get;
677   api->put = &postgres_plugin_put;
678   api->del = &postgres_plugin_del;
679   api->get_random = &postgres_plugin_get_random;
680   api->get_closest = &postgres_plugin_get_closest;
681   LOG (GNUNET_ERROR_TYPE_INFO,
682        "Postgres datacache running\n");
683   return api;
684 }
685
686
687 /**
688  * Exit point from the plugin.
689  *
690  * @param cls closure (our `struct Plugin`)
691  * @return NULL
692  */
693 void *
694 libgnunet_plugin_datacache_postgres_done (void *cls)
695 {
696   struct GNUNET_DATACACHE_PluginFunctions *api = cls;
697   struct Plugin *plugin = api->cls;
698
699   PQfinish (plugin->dbh);
700   GNUNET_free (plugin);
701   GNUNET_free (api);
702   return NULL;
703 }
704
705
706
707 /* end of plugin_datacache_postgres.c */