migrate another function to libgnunetpq
[oweals/gnunet.git] / src / datacache / plugin_datacache_postgres.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2006, 2009, 2010, 2012, 2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datacache/plugin_datacache_postgres.c
23  * @brief postgres for an implementation of a database backend for the datacache
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_postgres_lib.h"
29 #include "gnunet_pq_lib.h"
30 #include "gnunet_datacache_plugin.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
33
34 /**
35  * Per-entry overhead estimate
36  */
37 #define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
38
39 /**
40  * Context for all functions in this plugin.
41  */
42 struct Plugin
43 {
44   /**
45    * Our execution environment.
46    */
47   struct GNUNET_DATACACHE_PluginEnvironment *env;
48
49   /**
50    * Native Postgres database handle.
51    */
52   PGconn *dbh;
53
54   /**
55    * Number of key-value pairs in the database.
56    */
57   unsigned int num_items;
58 };
59
60
61 /**
62  * @brief Get a database handle
63  *
64  * @param plugin global context
65  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
66  */
67 static int
68 init_connection (struct Plugin *plugin)
69 {
70   struct GNUNET_PQ_ExecuteStatement es[] = {
71     GNUNET_PQ_make_execute ("CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
72                             "  type INTEGER NOT NULL DEFAULT 0,"
73                             "  discard_time BIGINT NOT NULL DEFAULT 0,"
74                             "  key BYTEA NOT NULL DEFAULT '',"
75                             "  value BYTEA NOT NULL DEFAULT '',"
76                             "  path BYTEA DEFAULT '')"
77                             "WITH OIDS"),
78     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)"),
79     GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)"),
80     GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL"),
81     GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN"),
82     GNUNET_PQ_EXECUTE_STATEMENT_END
83   };
84   struct GNUNET_PQ_PreparedStatement ps[] = {
85     GNUNET_PQ_make_prepare ("getkt",
86                             "SELECT discard_time,type,value,path FROM gn090dc "
87                             "WHERE key=$1 AND type=$2",
88                             2),
89     GNUNET_PQ_make_prepare ("getk",
90                             "SELECT discard_time,type,value,path FROM gn090dc "
91                             "WHERE key=$1",
92                             1),
93     GNUNET_PQ_make_prepare ("getm",
94                             "SELECT length(value) AS len,oid,key FROM gn090dc "
95                             "ORDER BY discard_time ASC LIMIT 1",
96                             0),
97     GNUNET_PQ_make_prepare ("get_random",
98                             "SELECT discard_time,type,value,path,key FROM gn090dc "
99                             "ORDER BY key ASC LIMIT 1 OFFSET $1",
100                             1),
101     GNUNET_PQ_make_prepare ("get_closest",
102                             "SELECT discard_time,type,value,path,key FROM gn090dc "
103                             "WHERE key>=$1 ORDER BY key ASC LIMIT $2",
104                             1),
105     GNUNET_PQ_make_prepare ("delrow",
106                             "DELETE FROM gn090dc WHERE oid=$1",
107                             1),
108     GNUNET_PQ_make_prepare ("put",
109                             "INSERT INTO gn090dc (type, discard_time, key, value, path) "
110                             "VALUES ($1, $2, $3, $4, $5)",
111                             5),
112     GNUNET_PQ_PREPARED_STATEMENT_END
113   };
114
115   plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
116                                             "datacache-postgres");
117   if (NULL == plugin->dbh)
118     return GNUNET_SYSERR;
119   if (GNUNET_OK !=
120       GNUNET_PQ_exec_statements (plugin->dbh,
121                                  es))
122   {
123     PQfinish (plugin->dbh);
124     plugin->dbh = NULL;
125     return GNUNET_SYSERR;
126   }
127
128   if (GNUNET_OK !=
129       GNUNET_PQ_prepare_statements (plugin->dbh,
130                                     ps))
131   {
132     PQfinish (plugin->dbh);
133     plugin->dbh = NULL;
134     return GNUNET_SYSERR;
135   }
136   return GNUNET_OK;
137 }
138
139
140 /**
141  * Store an item in the datastore.
142  *
143  * @param cls closure (our `struct Plugin`)
144  * @param key key to store @a data under
145  * @param data_size number of bytes in @a data
146  * @param data data to store
147  * @param type type of the value
148  * @param discard_time when to discard the value in any case
149  * @param path_info_len number of entries in @a path_info
150  * @param path_info a path through the network
151  * @return 0 if duplicate, -1 on error, number of bytes used otherwise
152  */
153 static ssize_t
154 postgres_plugin_put (void *cls,
155                      const struct GNUNET_HashCode *key,
156                      size_t data_size,
157                      const char *data,
158                      enum GNUNET_BLOCK_Type type,
159                      struct GNUNET_TIME_Absolute discard_time,
160                      unsigned int path_info_len,
161                      const struct GNUNET_PeerIdentity *path_info)
162 {
163   struct Plugin *plugin = cls;
164   uint32_t type32 = (uint32_t) type;
165   struct GNUNET_PQ_QueryParam params[] = {
166     GNUNET_PQ_query_param_uint32 (&type32),
167     GNUNET_PQ_query_param_absolute_time (&discard_time),
168     GNUNET_PQ_query_param_auto_from_type (key),
169     GNUNET_PQ_query_param_fixed_size (data, data_size),
170     GNUNET_PQ_query_param_fixed_size (path_info,
171                                       path_info_len * sizeof (struct GNUNET_PeerIdentity)),
172     GNUNET_PQ_query_param_end
173   };
174   enum GNUNET_PQ_QueryStatus ret;
175
176   ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
177                                             "put",
178                                             params);
179   if (0 > ret)
180     return -1;
181   plugin->num_items++;
182   return data_size + OVERHEAD;
183 }
184
185
186 /**
187  * Closure for #handle_results.
188  */
189 struct HandleResultContext
190 {
191
192   /**
193    * Function to call on each result, may be NULL.
194    */
195   GNUNET_DATACACHE_Iterator iter;
196
197   /**
198    * Closure for @e iter.
199    */
200   void *iter_cls;
201
202   /**
203    * Key used.
204    */
205   const struct GNUNET_HashCode *key;
206 };
207
208
209 /**
210  * Function to be called with the results of a SELECT statement
211  * that has returned @a num_results results.  Parse the result
212  * and call the callback given in @a cls
213  *
214  * @param cls closure of type `struct HandleResultContext`
215  * @param result the postgres result
216  * @param num_result the number of results in @a result
217  */
218 static void
219 handle_results (void *cls,
220                 PGresult *result,
221                 unsigned int num_results)
222 {
223   struct HandleResultContext *hrc = cls;
224
225   for (unsigned int i=0;i<num_results;i++)
226   {
227     struct GNUNET_TIME_Absolute expiration_time;
228     uint32_t type;
229     void *data;
230     size_t data_size;
231     struct GNUNET_PeerIdentity *path;
232     size_t path_len;
233     struct GNUNET_PQ_ResultSpec rs[] = {
234       GNUNET_PQ_result_spec_absolute_time ("discard_time",
235                                            &expiration_time),
236       GNUNET_PQ_result_spec_uint32 ("type",
237                                     &type),
238       GNUNET_PQ_result_spec_variable_size ("value",
239                                            &data,
240                                            &data_size),
241       GNUNET_PQ_result_spec_variable_size ("path",
242                                            (void **) &path,
243                                            &path_len),
244       GNUNET_PQ_result_spec_end
245     };
246
247     if (GNUNET_YES !=
248         GNUNET_PQ_extract_result (result,
249                                   rs,
250                                   i))
251     {
252       GNUNET_break (0);
253       return;
254     }
255     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
256     {
257       GNUNET_break (0);
258       path_len = 0;
259     }
260     path_len %= sizeof (struct GNUNET_PeerIdentity);
261     LOG (GNUNET_ERROR_TYPE_DEBUG,
262          "Found result of size %u bytes and type %u in database\n",
263          (unsigned int) data_size,
264          (unsigned int) type);
265     if ( (NULL != hrc->iter) &&
266          (GNUNET_SYSERR ==
267           hrc->iter (hrc->iter_cls,
268                      hrc->key,
269                      data_size,
270                      data,
271                      (enum GNUNET_BLOCK_Type) type,
272                      expiration_time,
273                      path_len,
274                      path)) )
275     {
276       LOG (GNUNET_ERROR_TYPE_DEBUG,
277            "Ending iteration (client error)\n");
278       GNUNET_PQ_cleanup_result (rs);
279       return;
280     }
281     GNUNET_PQ_cleanup_result (rs);
282   }
283 }
284
285
286 /**
287  * Iterate over the results for a particular key
288  * in the datastore.
289  *
290  * @param cls closure (our `struct Plugin`)
291  * @param key key to look for
292  * @param type entries of which type are relevant?
293  * @param iter maybe NULL (to just count)
294  * @param iter_cls closure for @a iter
295  * @return the number of results found
296  */
297 static unsigned int
298 postgres_plugin_get (void *cls,
299                      const struct GNUNET_HashCode *key,
300                      enum GNUNET_BLOCK_Type type,
301                      GNUNET_DATACACHE_Iterator iter,
302                      void *iter_cls)
303 {
304   struct Plugin *plugin = cls;
305   uint32_t type32 = (uint32_t) type;
306   struct GNUNET_PQ_QueryParam paramk[] = {
307     GNUNET_PQ_query_param_auto_from_type (key),
308     GNUNET_PQ_query_param_end
309   };
310   struct GNUNET_PQ_QueryParam paramkt[] = {
311     GNUNET_PQ_query_param_auto_from_type (key),
312     GNUNET_PQ_query_param_uint32 (&type32),
313     GNUNET_PQ_query_param_end
314   };
315   enum GNUNET_PQ_QueryStatus res;
316   struct HandleResultContext hr_ctx;
317
318   hr_ctx.iter = iter;
319   hr_ctx.iter_cls = iter_cls;
320   hr_ctx.key = key;
321   res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
322                                               (0 == type) ? "getk" : "getkt",
323                                               (0 == type) ? paramk : paramkt,
324                                               &handle_results,
325                                               &hr_ctx);
326   if (res < 0)
327     return 0;
328   return res;
329 }
330
331
332 /**
333  * Delete the entry with the lowest expiration value
334  * from the datacache right now.
335  *
336  * @param cls closure (our `struct Plugin`)
337  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
338  */
339 static int
340 postgres_plugin_del (void *cls)
341 {
342   struct Plugin *plugin = cls;
343   struct GNUNET_PQ_QueryParam pempty[] = {
344     GNUNET_PQ_query_param_end
345   };
346   uint32_t size;
347   uint32_t oid;
348   struct GNUNET_HashCode key;
349   struct GNUNET_PQ_ResultSpec rs[] = {
350     GNUNET_PQ_result_spec_uint32 ("len",
351                                   &size),
352     GNUNET_PQ_result_spec_uint32 ("oid",
353                                   &oid),
354     GNUNET_PQ_result_spec_auto_from_type ("key",
355                                           &key),
356     GNUNET_PQ_result_spec_end
357   };
358   enum GNUNET_PQ_QueryStatus res;
359   struct GNUNET_PQ_QueryParam dparam[] = {
360     GNUNET_PQ_query_param_uint32 (&oid),
361     GNUNET_PQ_query_param_end
362   };
363
364   res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
365                                                   "getm",
366                                                   pempty,
367                                                   rs);
368   if (0 > res)
369     return GNUNET_SYSERR;
370   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS == res)
371   {
372     /* no result */
373     LOG (GNUNET_ERROR_TYPE_DEBUG,
374          "Ending iteration (no more results)\n");
375     return 0;
376   }
377   res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
378                                             "delrow",
379                                             dparam);
380   if (0 > res)
381   {
382     GNUNET_PQ_cleanup_result (rs);
383     return GNUNET_SYSERR;
384   }
385   plugin->num_items--;
386   plugin->env->delete_notify (plugin->env->cls,
387                               &key,
388                               size + OVERHEAD);
389   GNUNET_PQ_cleanup_result (rs);
390   return GNUNET_OK;
391 }
392
393
394 /**
395  * Obtain a random key-value pair from the datacache.
396  *
397  * @param cls closure (our `struct Plugin`)
398  * @param iter maybe NULL (to just count)
399  * @param iter_cls closure for @a iter
400  * @return the number of results found, zero (datacache empty) or one
401  */
402 static unsigned int
403 postgres_plugin_get_random (void *cls,
404                             GNUNET_DATACACHE_Iterator iter,
405                             void *iter_cls)
406 {
407   struct Plugin *plugin = cls;
408   uint32_t off;
409   struct GNUNET_TIME_Absolute expiration_time;
410   size_t data_size;
411   void *data;
412   size_t path_len;
413   struct GNUNET_PeerIdentity *path;
414   struct GNUNET_HashCode key;
415   uint32_t type;
416   enum GNUNET_PQ_QueryStatus res;
417   struct GNUNET_PQ_QueryParam params[] = {
418     GNUNET_PQ_query_param_uint32 (&off),
419     GNUNET_PQ_query_param_end
420   };
421   struct GNUNET_PQ_ResultSpec rs[] = {
422     GNUNET_PQ_result_spec_absolute_time ("discard_time",
423                                          &expiration_time),
424     GNUNET_PQ_result_spec_uint32 ("type",
425                                   &type),
426     GNUNET_PQ_result_spec_variable_size ("value",
427                                          &data,
428                                          &data_size),
429     GNUNET_PQ_result_spec_variable_size ("path",
430                                          (void **) &path,
431                                          &path_len),
432     GNUNET_PQ_result_spec_auto_from_type ("key",
433                                           &key),
434     GNUNET_PQ_result_spec_end
435   };
436
437   if (0 == plugin->num_items)
438     return 0;
439   if (NULL == iter)
440     return 1;
441   off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
442                                   plugin->num_items);
443   res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
444                                                   "get_random",
445                                                   params,
446                                                   rs);
447   if (0 > res)
448   {
449     GNUNET_break (0);
450     return 0;
451   }
452   if (GNUNET_PQ_STATUS_SUCCESS_NO_RESULTS == res)
453   {
454     GNUNET_break (0);
455     return 0;
456   }
457   if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
458   {
459     GNUNET_break (0);
460     path_len = 0;
461   }
462   path_len %= sizeof (struct GNUNET_PeerIdentity);
463   LOG (GNUNET_ERROR_TYPE_DEBUG,
464        "Found random value with key %s of size %u bytes and type %u in database\n",
465        GNUNET_h2s (&key),
466        (unsigned int) data_size,
467        (unsigned int) type);
468   (void) iter (iter_cls,
469                &key,
470                data_size,
471                data,
472                (enum GNUNET_BLOCK_Type) type,
473                expiration_time,
474                path_len,
475                path);
476   GNUNET_PQ_cleanup_result (rs);
477   return 1;
478 }
479
480
481 /**
482  * Iterate over the results that are "close" to a particular key in
483  * the datacache.  "close" is defined as numerically larger than @a
484  * key (when interpreted as a circular address space), with small
485  * distance.
486  *
487  * @param cls closure (internal context for the plugin)
488  * @param key area of the keyspace to look into
489  * @param num_results number of results that should be returned to @a iter
490  * @param iter maybe NULL (to just count)
491  * @param iter_cls closure for @a iter
492  * @return the number of results found
493  */
494 static unsigned int
495 postgres_plugin_get_closest (void *cls,
496                              const struct GNUNET_HashCode *key,
497                              unsigned int num_results,
498                              GNUNET_DATACACHE_Iterator iter,
499                              void *iter_cls)
500 {
501   struct Plugin *plugin = cls;
502   uint32_t nbo_limit = htonl (num_results);
503   const char *paramValues[] = {
504     (const char *) key,
505     (const char *) &nbo_limit,
506   };
507   int paramLengths[] = {
508     sizeof (struct GNUNET_HashCode),
509     sizeof (nbo_limit)
510
511   };
512   const int paramFormats[] = { 1, 1 };
513   struct GNUNET_TIME_Absolute expiration_time;
514   uint32_t size;
515   unsigned int type;
516   unsigned int cnt;
517   unsigned int i;
518   unsigned int path_len;
519   const struct GNUNET_PeerIdentity *path;
520   PGresult *res;
521
522   res =
523       PQexecPrepared (plugin->dbh,
524                       "get_closest",
525                       2,
526                       paramValues,
527                       paramLengths,
528                       paramFormats,
529                       1);
530   if (GNUNET_OK !=
531       GNUNET_POSTGRES_check_result (plugin->dbh,
532                                     res,
533                                     PGRES_TUPLES_OK,
534                                     "PQexecPrepared",
535                                     "get_closest"))
536   {
537     LOG (GNUNET_ERROR_TYPE_DEBUG,
538          "Ending iteration (postgres error)\n");
539     return 0;
540   }
541
542   if (0 == (cnt = PQntuples (res)))
543   {
544     /* no result */
545     LOG (GNUNET_ERROR_TYPE_DEBUG,
546          "Ending iteration (no more results)\n");
547     PQclear (res);
548     return 0;
549   }
550   if (NULL == iter)
551   {
552     PQclear (res);
553     return cnt;
554   }
555   if ( (5 != PQnfields (res)) ||
556        (sizeof (uint64_t) != PQfsize (res, 0)) ||
557        (sizeof (uint32_t) != PQfsize (res, 1)) ||
558        (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
559   {
560     GNUNET_break (0);
561     PQclear (res);
562     return 0;
563   }
564   for (i = 0; i < cnt; i++)
565   {
566     expiration_time.abs_value_us =
567         GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
568     type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
569     size = PQgetlength (res, i, 2);
570     path_len = PQgetlength (res, i, 3);
571     if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
572     {
573       GNUNET_break (0);
574       path_len = 0;
575     }
576     path_len %= sizeof (struct GNUNET_PeerIdentity);
577     path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, i, 3);
578     key = (const struct GNUNET_HashCode *) PQgetvalue (res, i, 4);
579     LOG (GNUNET_ERROR_TYPE_DEBUG,
580          "Found result of size %u bytes and type %u in database\n",
581          (unsigned int) size,
582          (unsigned int) type);
583     if (GNUNET_SYSERR ==
584         iter (iter_cls,
585               key,
586               size,
587               PQgetvalue (res, i, 2),
588               (enum GNUNET_BLOCK_Type) type,
589               expiration_time,
590               path_len,
591               path))
592     {
593       LOG (GNUNET_ERROR_TYPE_DEBUG,
594            "Ending iteration (client error)\n");
595       PQclear (res);
596       return cnt;
597     }
598   }
599   PQclear (res);
600   return cnt;
601 }
602
603
604 /**
605  * Entry point for the plugin.
606  *
607  * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
608  * @return the plugin's closure (our `struct Plugin`)
609  */
610 void *
611 libgnunet_plugin_datacache_postgres_init (void *cls)
612 {
613   struct GNUNET_DATACACHE_PluginEnvironment *env = cls;
614   struct GNUNET_DATACACHE_PluginFunctions *api;
615   struct Plugin *plugin;
616
617   plugin = GNUNET_new (struct Plugin);
618   plugin->env = env;
619
620   if (GNUNET_OK != init_connection (plugin))
621   {
622     GNUNET_free (plugin);
623     return NULL;
624   }
625
626   api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
627   api->cls = plugin;
628   api->get = &postgres_plugin_get;
629   api->put = &postgres_plugin_put;
630   api->del = &postgres_plugin_del;
631   api->get_random = &postgres_plugin_get_random;
632   api->get_closest = &postgres_plugin_get_closest;
633   LOG (GNUNET_ERROR_TYPE_INFO,
634        "Postgres datacache running\n");
635   return api;
636 }
637
638
639 /**
640  * Exit point from the plugin.
641  *
642  * @param cls closure (our `struct Plugin`)
643  * @return NULL
644  */
645 void *
646 libgnunet_plugin_datacache_postgres_done (void *cls)
647 {
648   struct GNUNET_DATACACHE_PluginFunctions *api = cls;
649   struct Plugin *plugin = api->cls;
650
651   PQfinish (plugin->dbh);
652   GNUNET_free (plugin);
653   GNUNET_free (api);
654   return NULL;
655 }
656
657
658
659 /* end of plugin_datacache_postgres.c */