error handling
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  *
28  * NOTE: This db module does NOT work with mysql prior to 4.1 since
29  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
30  * in MyISAM that is causing us grief.  At the time of this writing,
31  * that version is yet to be released.  In anticipation, the code
32  * will use MyISAM with 5.0.46 (and higher).  If you run such a
33  * version, please run "make check" to verify that the MySQL bug
34  * was actually fixed in your version (and if not, change the
35  * code below to use MyISAM for gn071).
36  *
37  * HIGHLIGHTS
38  *
39  * Pros
40  * + On up-to-date hardware where mysql can be used comfortably, this
41  *   module will have better performance than the other db choices
42  *   (according to our tests).
43  * + Its often possible to recover the mysql database from internal
44  *   inconsistencies. The other db choices do not support repair!
45  * Cons
46  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
47  * - Manual setup
48  *
49  * MANUAL SETUP INSTRUCTIONS
50  *
51  * 1) in gnunet.conf, set
52  * @verbatim
53        [datastore]
54        DATABASE = "mysql"
55    @endverbatim
56  * 2) Then access mysql as root,
57  * @verbatim
58      $ mysql -u root -p
59    @endverbatim
60  *    and do the following. [You should replace $USER with the username
61  *    that will be running the gnunetd process].
62  * @verbatim
63       CREATE DATABASE gnunet;
64       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
65          ON gnunet.* TO $USER@localhost;
66       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
67       FLUSH PRIVILEGES;
68    @endverbatim
69  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
70  *    with the following lines
71  * @verbatim
72       [client]
73       user=$USER
74       password=$the_password_you_like
75    @endverbatim
76  *
77  * Thats it. Note that .my.cnf file is a security risk unless its on
78  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
79  * link. Even greater security risk can be achieved by setting no
80  * password for $USER.  Luckily $USER has only privileges to mess
81  * up GNUnet's tables, nothing else (unless you give them more,
82  * of course).<p>
83  *
84  * 4) Still, perhaps you should briefly try if the DB connection
85  *    works. First, login as $USER. Then use,
86  *
87  * @verbatim
88      $ mysql -u $USER -p $the_password_you_like
89      mysql> use gnunet;
90    @endverbatim
91  *
92  *    If you get the message &quot;Database changed&quot; it probably works.
93  *
94  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
95  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
96  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
97  *     so there may be some additional trouble depending on your mysql setup.]
98  *
99  * REPAIRING TABLES
100  *
101  * - Its probably healthy to check your tables for inconsistencies
102  *   every now and then.
103  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
104  *   databases have been corrupted.
105  * - The tables can be verified/fixed in two ways;
106  *   1) by running mysqlcheck -A, or
107  *   2) by executing (inside of mysql using the GNUnet database):
108  * @verbatim
109      mysql> REPAIR TABLE gn090;
110    @endverbatim
111  *
112  * PROBLEMS?
113  *
114  * If you have problems related to the mysql module, your best
115  * friend is probably the mysql manual. The first thing to check
116  * is that mysql is basically operational, that you can connect
117  * to it, create tables, issue queries etc.
118  */
119
120 #include "platform.h"
121 #include "gnunet_datastore_plugin.h"
122 #include "gnunet_util_lib.h"
123 #include "gnunet_mysql_lib.h"
124 #include "gnunet_my_lib.h"
125
126 #define MAX_DATUM_SIZE 65536
127
128
129 /**
130  * Context for all functions in this plugin.
131  */
132 struct Plugin
133 {
134   /**
135    * Our execution environment.
136    */
137   struct GNUNET_DATASTORE_PluginEnvironment *env;
138
139   /**
140    * Handle to talk to MySQL.
141    */
142   struct GNUNET_MYSQL_Context *mc;
143
144   /**
145    * Prepared statements.
146    */
147 #define INSERT_ENTRY \
148   "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
149   struct GNUNET_MYSQL_StatementHandle *insert_entry;
150
151 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
152   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
153
154 #define DELETE_ENTRY_BY_HASH_VALUE "DELETE FROM gn090 " \
155   "WHERE hash = ? AND " \
156   "value = ? " \
157   "LIMIT 1"
158   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_hash_value;
159
160 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, uid"
161
162 #define SELECT_ENTRY "SELECT " RESULT_COLUMNS " FROM gn090 " \
163   "WHERE uid >= ? AND " \
164   "(rvalue >= ? OR 0 = ?) " \
165   "ORDER BY uid LIMIT 1"
166   struct GNUNET_MYSQL_StatementHandle *select_entry;
167
168 #define SELECT_ENTRY_BY_HASH "SELECT " RESULT_COLUMNS " FROM gn090 " \
169   "FORCE INDEX (idx_hash_type_uid) " \
170   "WHERE hash=? AND " \
171   "uid >= ? AND " \
172   "(rvalue >= ? OR 0 = ?) " \
173   "ORDER BY uid LIMIT 1"
174   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
175
176 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT " RESULT_COLUMNS " FROM gn090 " \
177   "FORCE INDEX (idx_hash_type_uid) " \
178   "WHERE hash = ? AND " \
179   "type = ? AND " \
180   "uid >= ? AND " \
181   "(rvalue >= ? OR 0 = ?) " \
182   "ORDER BY uid LIMIT 1"
183   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
184
185 #define UPDATE_ENTRY "UPDATE gn090 SET " \
186   "prio = prio + ?, " \
187   "repl = repl + ?, " \
188   "expire = GREATEST(expire, ?) " \
189   "WHERE hash = ? AND vhash = ?"
190   struct GNUNET_MYSQL_StatementHandle *update_entry;
191
192 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
193   struct GNUNET_MYSQL_StatementHandle *dec_repl;
194
195 #define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
196   struct GNUNET_MYSQL_StatementHandle *get_size;
197
198 #define SELECT_IT_NON_ANONYMOUS "SELECT " RESULT_COLUMNS " FROM gn090 " \
199   "FORCE INDEX (idx_anonLevel_type_rvalue) " \
200   "WHERE anonLevel=0 AND " \
201   "type=? AND " \
202   "uid >= ? " \
203   "ORDER BY uid LIMIT 1"
204   struct GNUNET_MYSQL_StatementHandle *zero_iter;
205
206 #define SELECT_IT_EXPIRATION "SELECT " RESULT_COLUMNS " FROM gn090 " \
207   "FORCE INDEX (idx_expire) " \
208   "WHERE expire < ? " \
209   "ORDER BY expire ASC LIMIT 1"
210   struct GNUNET_MYSQL_StatementHandle *select_expiration;
211
212 #define SELECT_IT_PRIORITY "SELECT " RESULT_COLUMNS " FROM gn090 " \
213   "FORCE INDEX (idx_prio) " \
214   "ORDER BY prio ASC LIMIT 1"
215   struct GNUNET_MYSQL_StatementHandle *select_priority;
216
217 #define SELECT_IT_REPLICATION "SELECT " RESULT_COLUMNS " FROM gn090 " \
218   "FORCE INDEX (idx_repl_rvalue) " \
219   "WHERE repl=? AND " \
220   " (rvalue>=? OR" \
221   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) " \
222   "ORDER BY rvalue ASC " \
223   "LIMIT 1"
224   struct GNUNET_MYSQL_StatementHandle *select_replication;
225
226 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
227   struct GNUNET_MYSQL_StatementHandle *max_repl;
228
229 #define GET_ALL_KEYS "SELECT hash from gn090"
230   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
231 };
232
233 #define MAX_PARAM 16
234
235 /**
236  * Delete an entry from the gn090 table.
237  *
238  * @param plugin plugin context
239  * @param uid unique ID of the entry to delete
240  * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
241  */
242 static int
243 do_delete_entry (struct Plugin *plugin,
244                  unsigned long long uid)
245 {
246   int ret;
247   uint64_t uid64 = (uint64_t) uid;
248   struct GNUNET_MY_QueryParam params_delete[] = {
249     GNUNET_MY_query_param_uint64 (&uid64),
250     GNUNET_MY_query_param_end
251   };
252
253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254               "Deleting value %llu from gn090 table\n",
255               uid);
256   ret = GNUNET_MY_exec_prepared (plugin->mc,
257                                  plugin->delete_entry_by_uid,
258                                  params_delete);
259   if (ret >= 0)
260   {
261     return GNUNET_OK;
262   }
263   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
264               "Deleting value %llu from gn090 table failed\n",
265               (unsigned long long) uid);
266   return ret;
267 }
268
269
270 /**
271  * Get an estimate of how much space the database is
272  * currently using.
273  *
274  * @param cls our `struct Plugin *`
275  * @return number of bytes used on disk
276  */
277 static void
278 mysql_plugin_estimate_size (void *cls,
279                             unsigned long long *estimate)
280 {
281   struct Plugin *plugin = cls;
282   uint64_t total;
283   int ret;
284   struct GNUNET_MY_QueryParam params_get[] = {
285     GNUNET_MY_query_param_end
286   };
287   struct GNUNET_MY_ResultSpec results_get[] = {
288     GNUNET_MY_result_spec_uint64 (&total),
289     GNUNET_MY_result_spec_end
290   };
291
292   ret = GNUNET_MY_exec_prepared (plugin->mc,
293                                  plugin->get_size,
294                                  params_get);
295   *estimate = 0;
296   total = UINT64_MAX;
297   if ((GNUNET_OK == ret) &&
298       (GNUNET_OK ==
299        GNUNET_MY_extract_result (plugin->get_size,
300                                  results_get)))
301   {
302     *estimate = (unsigned long long) total;
303     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
304                 "Size estimate for MySQL payload is %lld\n",
305                 (long long) total);
306     GNUNET_assert (UINT64_MAX != total);
307     GNUNET_break (GNUNET_NO ==
308                   GNUNET_MY_extract_result (plugin->get_size,
309                                             NULL));
310   }
311 }
312
313
314 /**
315  * Store an item in the datastore.
316  *
317  * @param cls closure
318  * @param key key for the item
319  * @param absent true if the key was not found in the bloom filter
320  * @param size number of bytes in @a data
321  * @param data content stored
322  * @param type type of the content
323  * @param priority priority of the content
324  * @param anonymity anonymity-level for the content
325  * @param replication replication-level for the content
326  * @param expiration expiration time for the content
327  * @param cont continuation called with success or failure status
328  * @param cont_cls closure for @a cont
329  */
330 static void
331 mysql_plugin_put (void *cls,
332                   const struct GNUNET_HashCode *key,
333                   bool absent,
334                   uint32_t size,
335                   const void *data,
336                   enum GNUNET_BLOCK_Type type,
337                   uint32_t priority,
338                   uint32_t anonymity,
339                   uint32_t replication,
340                   struct GNUNET_TIME_Absolute expiration,
341                   PluginPutCont cont,
342                   void *cont_cls)
343 {
344   struct Plugin *plugin = cls;
345   uint64_t lexpiration = expiration.abs_value_us;
346   struct GNUNET_HashCode vhash;
347
348   GNUNET_CRYPTO_hash (data,
349                       size,
350                       &vhash);
351   if (! absent)
352   {
353     struct GNUNET_MY_QueryParam params_update[] = {
354       GNUNET_MY_query_param_uint32 (&priority),
355       GNUNET_MY_query_param_uint32 (&replication),
356       GNUNET_MY_query_param_uint64 (&lexpiration),
357       GNUNET_MY_query_param_auto_from_type (key),
358       GNUNET_MY_query_param_auto_from_type (&vhash),
359       GNUNET_MY_query_param_end
360     };
361
362     if (GNUNET_OK !=
363         GNUNET_MY_exec_prepared (plugin->mc,
364                                  plugin->update_entry,
365                                  params_update))
366     {
367       cont (cont_cls,
368             key,
369             size,
370             GNUNET_SYSERR,
371             _ ("MySQL statement run failure"));
372       return;
373     }
374
375     MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->update_entry);
376     my_ulonglong rows = mysql_stmt_affected_rows (stmt);
377
378     GNUNET_break (GNUNET_NO ==
379                   GNUNET_MY_extract_result (plugin->update_entry,
380                                             NULL));
381     if (0 != rows)
382     {
383       cont (cont_cls,
384             key,
385             size,
386             GNUNET_NO,
387             NULL);
388       return;
389     }
390   }
391
392   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
393                                                UINT64_MAX);
394   struct GNUNET_MY_QueryParam params_insert[] = {
395     GNUNET_MY_query_param_uint32 (&replication),
396     GNUNET_MY_query_param_uint32 (&type),
397     GNUNET_MY_query_param_uint32 (&priority),
398     GNUNET_MY_query_param_uint32 (&anonymity),
399     GNUNET_MY_query_param_uint64 (&lexpiration),
400     GNUNET_MY_query_param_uint64 (&lrvalue),
401     GNUNET_MY_query_param_auto_from_type (key),
402     GNUNET_MY_query_param_auto_from_type (&vhash),
403     GNUNET_MY_query_param_fixed_size (data, size),
404     GNUNET_MY_query_param_end
405   };
406
407   if (size > MAX_DATUM_SIZE)
408   {
409     GNUNET_break (0);
410     cont (cont_cls, key, size, GNUNET_SYSERR, _ ("Data too large"));
411     return;
412   }
413
414   if (GNUNET_OK !=
415       GNUNET_MY_exec_prepared (plugin->mc,
416                                plugin->insert_entry,
417                                params_insert))
418   {
419     cont (cont_cls,
420           key,
421           size,
422           GNUNET_SYSERR,
423           _ ("MySQL statement run failure"));
424     return;
425   }
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427               "Inserted value `%s' with size %u into gn090 table\n",
428               GNUNET_h2s (key),
429               (unsigned int) size);
430   if (size > 0)
431     plugin->env->duc (plugin->env->cls,
432                       size);
433   GNUNET_break (GNUNET_NO ==
434                 GNUNET_MY_extract_result (plugin->insert_entry,
435                                           NULL));
436   cont (cont_cls,
437         key,
438         size,
439         GNUNET_OK,
440         NULL);
441 }
442
443
444 /**
445  * Run the given select statement and call 'proc' on the resulting
446  * values (which must be in particular positions).
447  *
448  * @param plugin the plugin handle
449  * @param stmt select statement to run
450  * @param proc function to call on result
451  * @param proc_cls closure for @a proc
452  * @param params_select arguments to initialize stmt
453  */
454 static void
455 execute_select (struct Plugin *plugin,
456                 struct GNUNET_MYSQL_StatementHandle *stmt,
457                 PluginDatumProcessor proc,
458                 void *proc_cls,
459                 struct GNUNET_MY_QueryParam *params_select)
460 {
461   int ret;
462   uint32_t replication;
463   uint32_t type;
464   uint32_t priority;
465   uint32_t anonymity;
466   uint64_t uid;
467   size_t value_size;
468   void *value;
469   struct GNUNET_HashCode key;
470   struct GNUNET_TIME_Absolute expiration;
471   struct GNUNET_MY_ResultSpec results_select[] = {
472     GNUNET_MY_result_spec_uint32 (&replication),
473     GNUNET_MY_result_spec_uint32 (&type),
474     GNUNET_MY_result_spec_uint32 (&priority),
475     GNUNET_MY_result_spec_uint32 (&anonymity),
476     GNUNET_MY_result_spec_absolute_time (&expiration),
477     GNUNET_MY_result_spec_auto_from_type (&key),
478     GNUNET_MY_result_spec_variable_size (&value, &value_size),
479     GNUNET_MY_result_spec_uint64 (&uid),
480     GNUNET_MY_result_spec_end
481   };
482
483   ret = GNUNET_MY_exec_prepared (plugin->mc,
484                                  stmt,
485                                  params_select);
486   if (GNUNET_OK != ret)
487   {
488     proc (proc_cls,
489           NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
490     return;
491   }
492
493   ret = GNUNET_MY_extract_result (stmt,
494                                   results_select);
495   if (GNUNET_OK != ret)
496   {
497     proc (proc_cls,
498           NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
499     return;
500   }
501
502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
504               (unsigned int) value_size,
505               GNUNET_h2s (&key),
506               (unsigned int) priority,
507               (unsigned int) anonymity,
508               GNUNET_STRINGS_absolute_time_to_string (expiration));
509   GNUNET_assert (value_size < MAX_DATUM_SIZE);
510   GNUNET_break (GNUNET_NO ==
511                 GNUNET_MY_extract_result (stmt,
512                                           NULL));
513   ret = proc (proc_cls,
514               &key,
515               value_size,
516               value,
517               type,
518               priority,
519               anonymity,
520               replication,
521               expiration,
522               uid);
523   GNUNET_MY_cleanup_result (results_select);
524   if (GNUNET_NO == ret)
525   {
526     do_delete_entry (plugin, uid);
527     if (0 != value_size)
528       plugin->env->duc (plugin->env->cls,
529                         -value_size);
530   }
531 }
532
533
534 /**
535  * Get one of the results for a particular key in the datastore.
536  *
537  * @param cls closure
538  * @param next_uid return the result with lowest uid >= next_uid
539  * @param random if true, return a random result instead of using next_uid
540  * @param key key to match, never NULL
541  * @param type entries of which type are relevant?
542  *     Use 0 for any type.
543  * @param proc function to call on the matching value,
544  *        with NULL for if no value matches
545  * @param proc_cls closure for @a proc
546  */
547 static void
548 mysql_plugin_get_key (void *cls,
549                       uint64_t next_uid,
550                       bool random,
551                       const struct GNUNET_HashCode *key,
552                       enum GNUNET_BLOCK_Type type,
553                       PluginDatumProcessor proc,
554                       void *proc_cls)
555 {
556   struct Plugin *plugin = cls;
557   uint64_t rvalue;
558
559   if (random)
560   {
561     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
562                                        UINT64_MAX);
563     next_uid = 0;
564   }
565   else
566     rvalue = 0;
567
568   if (NULL == key)
569   {
570     struct GNUNET_MY_QueryParam params_select[] = {
571       GNUNET_MY_query_param_uint64 (&next_uid),
572       GNUNET_MY_query_param_uint64 (&rvalue),
573       GNUNET_MY_query_param_uint64 (&rvalue),
574       GNUNET_MY_query_param_end
575     };
576
577     execute_select (plugin,
578                     plugin->select_entry,
579                     proc,
580                     proc_cls,
581                     params_select);
582   }
583   else if (type != GNUNET_BLOCK_TYPE_ANY)
584   {
585     struct GNUNET_MY_QueryParam params_select[] = {
586       GNUNET_MY_query_param_auto_from_type (key),
587       GNUNET_MY_query_param_uint32 (&type),
588       GNUNET_MY_query_param_uint64 (&next_uid),
589       GNUNET_MY_query_param_uint64 (&rvalue),
590       GNUNET_MY_query_param_uint64 (&rvalue),
591       GNUNET_MY_query_param_end
592     };
593
594     execute_select (plugin,
595                     plugin->select_entry_by_hash_and_type,
596                     proc,
597                     proc_cls,
598                     params_select);
599   }
600   else
601   {
602     struct GNUNET_MY_QueryParam params_select[] = {
603       GNUNET_MY_query_param_auto_from_type (key),
604       GNUNET_MY_query_param_uint64 (&next_uid),
605       GNUNET_MY_query_param_uint64 (&rvalue),
606       GNUNET_MY_query_param_uint64 (&rvalue),
607       GNUNET_MY_query_param_end
608     };
609
610     execute_select (plugin,
611                     plugin->select_entry_by_hash,
612                     proc,
613                     proc_cls,
614                     params_select);
615   }
616 }
617
618
619 /**
620  * Get a zero-anonymity datum from the datastore.
621  *
622  * @param cls our `struct Plugin *`
623  * @param next_uid return the result with lowest uid >= next_uid
624  * @param type entries of which type should be considered?
625  *        Must not be zero (ANY).
626  * @param proc function to call on a matching value;
627  *        will be called with NULL if no value matches
628  * @param proc_cls closure for @a proc
629  */
630 static void
631 mysql_plugin_get_zero_anonymity (void *cls,
632                                  uint64_t next_uid,
633                                  enum GNUNET_BLOCK_Type type,
634                                  PluginDatumProcessor proc,
635                                  void *proc_cls)
636 {
637   struct Plugin *plugin = cls;
638   uint32_t typei = (uint32_t) type;
639
640   struct GNUNET_MY_QueryParam params_zero_iter[] = {
641     GNUNET_MY_query_param_uint32 (&typei),
642     GNUNET_MY_query_param_uint64 (&next_uid),
643     GNUNET_MY_query_param_end
644   };
645
646   execute_select (plugin,
647                   plugin->zero_iter,
648                   proc,
649                   proc_cls,
650                   params_zero_iter);
651 }
652
653
654 /**
655  * Context for #repl_proc() function.
656  */
657 struct ReplCtx
658 {
659   /**
660    * Plugin handle.
661    */
662   struct Plugin *plugin;
663
664   /**
665    * Function to call for the result (or the NULL).
666    */
667   PluginDatumProcessor proc;
668
669   /**
670    * Closure for @e proc.
671    */
672   void *proc_cls;
673 };
674
675
676 /**
677  * Wrapper for the processor for #mysql_plugin_get_replication().
678  * Decrements the replication counter and calls the original
679  * iterator.
680  *
681  * @param cls closure
682  * @param key key for the content
683  * @param size number of bytes in @a data
684  * @param data content stored
685  * @param type type of the content
686  * @param priority priority of the content
687  * @param anonymity anonymity-level for the content
688  * @param replication replication-level for the content
689  * @param expiration expiration time for the content
690  * @param uid unique identifier for the datum;
691  *        maybe 0 if no unique identifier is available
692  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
693  *         (continue on call to "next", of course),
694  *         #GNUNET_NO to delete the item and continue (if supported)
695  */
696 static int
697 repl_proc (void *cls,
698            const struct GNUNET_HashCode *key,
699            uint32_t size,
700            const void *data,
701            enum GNUNET_BLOCK_Type type,
702            uint32_t priority,
703            uint32_t anonymity,
704            uint32_t replication,
705            struct GNUNET_TIME_Absolute expiration,
706            uint64_t uid)
707 {
708   struct ReplCtx *rc = cls;
709   struct Plugin *plugin = rc->plugin;
710   int ret;
711   int iret;
712
713   ret = rc->proc (rc->proc_cls,
714                   key,
715                   size,
716                   data,
717                   type,
718                   priority,
719                   anonymity,
720                   replication,
721                   expiration,
722                   uid);
723   if (NULL != key)
724   {
725     struct GNUNET_MY_QueryParam params_proc[] = {
726       GNUNET_MY_query_param_uint64 (&uid),
727       GNUNET_MY_query_param_end
728     };
729
730     iret = GNUNET_MY_exec_prepared (plugin->mc,
731                                     plugin->dec_repl,
732                                     params_proc);
733     if (GNUNET_SYSERR == iret)
734     {
735       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
736                   "Failed to reduce replication counter\n");
737       return GNUNET_SYSERR;
738     }
739   }
740   return ret;
741 }
742
743
744 /**
745  * Get a random item for replication.  Returns a single, not expired,
746  * random item from those with the highest replication counters.  The
747  * item's replication counter is decremented by one IF it was positive
748  * before.  Call @a proc with all values ZERO or NULL if the datastore
749  * is empty.
750  *
751  * @param cls closure
752  * @param proc function to call the value (once only).
753  * @param proc_cls closure for @a proc
754  */
755 static void
756 mysql_plugin_get_replication (void *cls,
757                               PluginDatumProcessor proc,
758                               void *proc_cls)
759 {
760   struct Plugin *plugin = cls;
761   uint64_t rvalue;
762   uint32_t repl;
763   struct ReplCtx rc;
764   struct GNUNET_MY_QueryParam params_get[] = {
765     GNUNET_MY_query_param_end
766   };
767   struct GNUNET_MY_ResultSpec results_get[] = {
768     GNUNET_MY_result_spec_uint32 (&repl),
769     GNUNET_MY_result_spec_end
770   };
771   struct GNUNET_MY_QueryParam params_select[] = {
772     GNUNET_MY_query_param_uint32 (&repl),
773     GNUNET_MY_query_param_uint64 (&rvalue),
774     GNUNET_MY_query_param_uint32 (&repl),
775     GNUNET_MY_query_param_uint64 (&rvalue),
776     GNUNET_MY_query_param_end
777   };
778
779   rc.plugin = plugin;
780   rc.proc = proc;
781   rc.proc_cls = proc_cls;
782
783   if (1 !=
784       GNUNET_MY_exec_prepared (plugin->mc,
785                                plugin->max_repl,
786                                params_get))
787   {
788     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
789     return;
790   }
791
792   if (GNUNET_OK !=
793       GNUNET_MY_extract_result (plugin->max_repl,
794                                 results_get))
795   {
796     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
797     return;
798   }
799   GNUNET_break (GNUNET_NO ==
800                 GNUNET_MY_extract_result (plugin->max_repl,
801                                           NULL));
802   rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
803                                      UINT64_MAX);
804
805   execute_select (plugin,
806                   plugin->select_replication,
807                   &repl_proc,
808                   &rc,
809                   params_select);
810 }
811
812
813 /**
814  * Get all of the keys in the datastore.
815  *
816  * @param cls closure
817  * @param proc function to call on each key
818  * @param proc_cls closure for @a proc
819  */
820 static void
821 mysql_plugin_get_keys (void *cls,
822                        PluginKeyProcessor proc,
823                        void *proc_cls)
824 {
825   struct Plugin *plugin = cls;
826   int ret;
827   MYSQL_STMT *statement;
828   unsigned int cnt;
829   struct GNUNET_HashCode key;
830   struct GNUNET_HashCode last;
831   struct GNUNET_MY_QueryParam params_select[] = {
832     GNUNET_MY_query_param_end
833   };
834   struct GNUNET_MY_ResultSpec results_select[] = {
835     GNUNET_MY_result_spec_auto_from_type (&key),
836     GNUNET_MY_result_spec_end
837   };
838
839   GNUNET_assert (NULL != proc);
840   statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
841   if (GNUNET_OK !=
842       GNUNET_MY_exec_prepared (plugin->mc,
843                                plugin->get_all_keys,
844                                params_select))
845   {
846     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
847                 _ ("`%s' for `%s' failed at %s:%d with error: %s\n"),
848                 "mysql_stmt_execute",
849                 GET_ALL_KEYS,
850                 __FILE__,
851                 __LINE__,
852                 mysql_stmt_error (statement));
853     GNUNET_MYSQL_statements_invalidate (plugin->mc);
854     proc (proc_cls, NULL, 0);
855     return;
856   }
857   memset (&last, 0, sizeof(last));   /* make static analysis happy */
858   ret = GNUNET_YES;
859   cnt = 0;
860   while (ret == GNUNET_YES)
861   {
862     ret = GNUNET_MY_extract_result (plugin->get_all_keys,
863                                     results_select);
864     if (0 != GNUNET_memcmp (&last,
865                             &key))
866     {
867       if (0 != cnt)
868         proc (proc_cls,
869               &last,
870               cnt);
871       cnt = 1;
872       last = key;
873     }
874     else
875     {
876       cnt++;
877     }
878   }
879   if (0 != cnt)
880     proc (proc_cls,
881           &last,
882           cnt);
883   /* finally, let app know we are done */
884   proc (proc_cls,
885         NULL,
886         0);
887   if (GNUNET_SYSERR == ret)
888   {
889     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
890                 _ ("`%s' failed at %s:%d with error: %s\n"),
891                 "mysql_stmt_fetch",
892                 __FILE__,
893                 __LINE__,
894                 mysql_stmt_error (statement));
895     GNUNET_MYSQL_statements_invalidate (plugin->mc);
896     return;
897   }
898 }
899
900
901 /**
902  * Context for #expi_proc() function.
903  */
904 struct ExpiCtx
905 {
906   /**
907    * Plugin handle.
908    */
909   struct Plugin *plugin;
910
911   /**
912    * Function to call for the result (or the NULL).
913    */
914   PluginDatumProcessor proc;
915
916   /**
917    * Closure for @e proc.
918    */
919   void *proc_cls;
920 };
921
922
923 /**
924  * Wrapper for the processor for #mysql_plugin_get_expiration().
925  * If no expired value was found, we do a second query for
926  * low-priority content.
927  *
928  * @param cls closure
929  * @param key key for the content
930  * @param size number of bytes in data
931  * @param data content stored
932  * @param type type of the content
933  * @param priority priority of the content
934  * @param anonymity anonymity-level for the content
935  * @param replication replication-level for the content
936  * @param expiration expiration time for the content
937  * @param uid unique identifier for the datum;
938  *        maybe 0 if no unique identifier is available
939  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
940  *         (continue on call to "next", of course),
941  *         #GNUNET_NO to delete the item and continue (if supported)
942  */
943 static int
944 expi_proc (void *cls,
945            const struct GNUNET_HashCode *key,
946            uint32_t size,
947            const void *data,
948            enum GNUNET_BLOCK_Type type,
949            uint32_t priority,
950            uint32_t anonymity,
951            uint32_t replication,
952            struct GNUNET_TIME_Absolute expiration,
953            uint64_t uid)
954 {
955   struct ExpiCtx *rc = cls;
956   struct Plugin *plugin = rc->plugin;
957   struct GNUNET_MY_QueryParam params_select[] = {
958     GNUNET_MY_query_param_end
959   };
960
961   if (NULL == key)
962   {
963     execute_select (plugin,
964                     plugin->select_priority,
965                     rc->proc,
966                     rc->proc_cls,
967                     params_select);
968     return GNUNET_SYSERR;
969   }
970   return rc->proc (rc->proc_cls,
971                    key,
972                    size,
973                    data,
974                    type,
975                    priority,
976                    anonymity,
977                    replication,
978                    expiration,
979                    uid);
980 }
981
982
983 /**
984  * Get a random item for expiration.
985  * Call @a proc with all values ZERO or NULL if the datastore is empty.
986  *
987  * @param cls closure
988  * @param proc function to call the value (once only).
989  * @param proc_cls closure for @a proc
990  */
991 static void
992 mysql_plugin_get_expiration (void *cls,
993                              PluginDatumProcessor proc,
994                              void *proc_cls)
995 {
996   struct Plugin *plugin = cls;
997   struct GNUNET_TIME_Absolute now;
998   struct GNUNET_MY_QueryParam params_select[] = {
999     GNUNET_MY_query_param_absolute_time (&now),
1000     GNUNET_MY_query_param_end
1001   };
1002   struct ExpiCtx rc;
1003
1004   rc.plugin = plugin;
1005   rc.proc = proc;
1006   rc.proc_cls = proc_cls;
1007   now = GNUNET_TIME_absolute_get ();
1008   execute_select (plugin,
1009                   plugin->select_expiration,
1010                   expi_proc,
1011                   &rc,
1012                   params_select);
1013 }
1014
1015
1016 /**
1017  * Drop database.
1018  *
1019  * @param cls the `struct Plugin *`
1020  */
1021 static void
1022 mysql_plugin_drop (void *cls)
1023 {
1024   struct Plugin *plugin = cls;
1025
1026   if (GNUNET_OK !=
1027       GNUNET_MYSQL_statement_run (plugin->mc,
1028                                   "DROP TABLE gn090"))
1029     return;                     /* error */
1030   plugin->env->duc (plugin->env->cls, 0);
1031 }
1032
1033
1034 /**
1035  * Remove a particular key in the datastore.
1036  *
1037  * @param cls closure
1038  * @param key key for the content
1039  * @param size number of bytes in data
1040  * @param data content stored
1041  * @param cont continuation called with success or failure status
1042  * @param cont_cls continuation closure for @a cont
1043  */
1044 static void
1045 mysql_plugin_remove_key (void *cls,
1046                          const struct GNUNET_HashCode *key,
1047                          uint32_t size,
1048                          const void *data,
1049                          PluginRemoveCont cont,
1050                          void *cont_cls)
1051 {
1052   struct Plugin *plugin = cls;
1053   struct GNUNET_MY_QueryParam params_delete[] = {
1054     GNUNET_MY_query_param_auto_from_type (key),
1055     GNUNET_MY_query_param_fixed_size (data, size),
1056     GNUNET_MY_query_param_end
1057   };
1058
1059   if (GNUNET_OK !=
1060       GNUNET_MY_exec_prepared (plugin->mc,
1061                                plugin->delete_entry_by_hash_value,
1062                                params_delete))
1063   {
1064     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1065                 "Removing key `%s' from gn090 table failed\n",
1066                 GNUNET_h2s (key));
1067     cont (cont_cls,
1068           key,
1069           size,
1070           GNUNET_SYSERR,
1071           _ ("MySQL statement run failure"));
1072     return;
1073   }
1074
1075   MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (
1076     plugin->delete_entry_by_hash_value);
1077   my_ulonglong rows = mysql_stmt_affected_rows (stmt);
1078
1079   if (0 == rows)
1080   {
1081     cont (cont_cls,
1082           key,
1083           size,
1084           GNUNET_NO,
1085           NULL);
1086     return;
1087   }
1088   plugin->env->duc (plugin->env->cls,
1089                     -size);
1090   cont (cont_cls,
1091         key,
1092         size,
1093         GNUNET_OK,
1094         NULL);
1095 }
1096
1097
1098 /**
1099  * Entry point for the plugin.
1100  *
1101  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
1102  * @return our `struct Plugin *`
1103  */
1104 void *
1105 libgnunet_plugin_datastore_mysql_init (void *cls)
1106 {
1107   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1108   struct GNUNET_DATASTORE_PluginFunctions *api;
1109   struct Plugin *plugin;
1110
1111   plugin = GNUNET_new (struct Plugin);
1112   plugin->env = env;
1113   plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
1114                                             "datastore-mysql");
1115   if (NULL == plugin->mc)
1116   {
1117     GNUNET_free (plugin);
1118     return NULL;
1119   }
1120 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a))
1121 #define PINIT(a, b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, \
1122                                                                   b)))
1123   if (MRUNS
1124         ("CREATE TABLE IF NOT EXISTS gn090 ("
1125         " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1126         " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1127         " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1128         " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1129         " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1130         " rvalue BIGINT UNSIGNED NOT NULL,"
1131         " hash BINARY(64) NOT NULL DEFAULT '',"
1132         " vhash BINARY(64) NOT NULL DEFAULT '',"
1133         " value BLOB NOT NULL DEFAULT '',"
1134         " uid BIGINT NOT NULL AUTO_INCREMENT,"
1135         " PRIMARY KEY (uid),"
1136         " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
1137         " INDEX idx_prio (prio),"
1138         " INDEX idx_repl_rvalue (repl,rvalue),"
1139         " INDEX idx_expire (expire),"
1140         " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
1141         ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
1142       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1143       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1144       PINIT (plugin->delete_entry_by_hash_value, DELETE_ENTRY_BY_HASH_VALUE) ||
1145       PINIT (plugin->select_entry, SELECT_ENTRY) ||
1146       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1147       PINIT (plugin->select_entry_by_hash_and_type,
1148              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1149       PINIT (plugin->get_size, SELECT_SIZE) ||
1150       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1151       PINIT (plugin->dec_repl, DEC_REPL) ||
1152       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1153       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1154       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1155       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1156       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1157       PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ||
1158       false)
1159   {
1160     GNUNET_MYSQL_context_destroy (plugin->mc);
1161     GNUNET_free (plugin);
1162     return NULL;
1163   }
1164 #undef PINIT
1165 #undef MRUNS
1166
1167   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1168   api->cls = plugin;
1169   api->estimate_size = &mysql_plugin_estimate_size;
1170   api->put = &mysql_plugin_put;
1171   api->get_key = &mysql_plugin_get_key;
1172   api->get_replication = &mysql_plugin_get_replication;
1173   api->get_expiration = &mysql_plugin_get_expiration;
1174   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1175   api->get_keys = &mysql_plugin_get_keys;
1176   api->drop = &mysql_plugin_drop;
1177   api->remove_key = &mysql_plugin_remove_key;
1178   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1179                    _ ("Mysql database running\n"));
1180   return api;
1181 }
1182
1183
1184 /**
1185  * Exit point from the plugin.
1186  *
1187  * @param cls our `struct Plugin *`
1188  * @return always NULL
1189  */
1190 void *
1191 libgnunet_plugin_datastore_mysql_done (void *cls)
1192 {
1193   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1194   struct Plugin *plugin = api->cls;
1195
1196   GNUNET_MYSQL_context_destroy (plugin->mc);
1197   GNUNET_free (plugin);
1198   GNUNET_free (api);
1199   return NULL;
1200 }
1201
1202
1203 /* end of plugin_datastore_mysql.c */