glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file datastore/plugin_datastore_mysql.c
18  * @brief mysql-based datastore backend
19  * @author Igor Wronsky
20  * @author Christian Grothoff
21  * @author Christophe Genevey
22  *
23  * NOTE: This db module does NOT work with mysql prior to 4.1 since
24  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
25  * in MyISAM that is causing us grief.  At the time of this writing,
26  * that version is yet to be released.  In anticipation, the code
27  * will use MyISAM with 5.0.46 (and higher).  If you run such a
28  * version, please run "make check" to verify that the MySQL bug
29  * was actually fixed in your version (and if not, change the
30  * code below to use MyISAM for gn071).
31  *
32  * HIGHLIGHTS
33  *
34  * Pros
35  * + On up-to-date hardware where mysql can be used comfortably, this
36  *   module will have better performance than the other db choices
37  *   (according to our tests).
38  * + Its often possible to recover the mysql database from internal
39  *   inconsistencies. The other db choices do not support repair!
40  * Cons
41  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
42  * - Manual setup
43  *
44  * MANUAL SETUP INSTRUCTIONS
45  *
46  * 1) in gnunet.conf, set
47  * @verbatim
48        [datastore]
49        DATABASE = "mysql"
50    @endverbatim
51  * 2) Then access mysql as root,
52  * @verbatim
53      $ mysql -u root -p
54    @endverbatim
55  *    and do the following. [You should replace $USER with the username
56  *    that will be running the gnunetd process].
57  * @verbatim
58       CREATE DATABASE gnunet;
59       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
60          ON gnunet.* TO $USER@localhost;
61       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
62       FLUSH PRIVILEGES;
63    @endverbatim
64  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
65  *    with the following lines
66  * @verbatim
67       [client]
68       user=$USER
69       password=$the_password_you_like
70    @endverbatim
71  *
72  * Thats it. Note that .my.cnf file is a security risk unless its on
73  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
74  * link. Even greater security risk can be achieved by setting no
75  * password for $USER.  Luckily $USER has only priviledges to mess
76  * up GNUnet's tables, nothing else (unless you give him more,
77  * of course).<p>
78  *
79  * 4) Still, perhaps you should briefly try if the DB connection
80  *    works. First, login as $USER. Then use,
81  *
82  * @verbatim
83      $ mysql -u $USER -p $the_password_you_like
84      mysql> use gnunet;
85    @endverbatim
86  *
87  *    If you get the message &quot;Database changed&quot; it probably works.
88  *
89  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
90  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
91  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
92  *     so there may be some additional trouble depending on your mysql setup.]
93  *
94  * REPAIRING TABLES
95  *
96  * - Its probably healthy to check your tables for inconsistencies
97  *   every now and then.
98  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
99  *   databases have been corrupted.
100  * - The tables can be verified/fixed in two ways;
101  *   1) by running mysqlcheck -A, or
102  *   2) by executing (inside of mysql using the GNUnet database):
103  * @verbatim
104      mysql> REPAIR TABLE gn090;
105    @endverbatim
106  *
107  * PROBLEMS?
108  *
109  * If you have problems related to the mysql module, your best
110  * friend is probably the mysql manual. The first thing to check
111  * is that mysql is basically operational, that you can connect
112  * to it, create tables, issue queries etc.
113  */
114
115 #include "platform.h"
116 #include "gnunet_datastore_plugin.h"
117 #include "gnunet_util_lib.h"
118 #include "gnunet_mysql_lib.h"
119 #include "gnunet_my_lib.h"
120
121 #define MAX_DATUM_SIZE 65536
122
123
124 /**
125  * Context for all functions in this plugin.
126  */
127 struct Plugin
128 {
129   /**
130    * Our execution environment.
131    */
132   struct GNUNET_DATASTORE_PluginEnvironment *env;
133
134   /**
135    * Handle to talk to MySQL.
136    */
137   struct GNUNET_MYSQL_Context *mc;
138
139   /**
140    * Prepared statements.
141    */
142 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
143   struct GNUNET_MYSQL_StatementHandle *insert_entry;
144
145 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
146   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
147
148 #define DELETE_ENTRY_BY_HASH_VALUE "DELETE FROM gn090 "\
149   "WHERE hash = ? AND "\
150   "value = ? "\
151   "LIMIT 1"
152   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_hash_value;
153
154 #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, uid"
155
156 #define SELECT_ENTRY "SELECT " RESULT_COLUMNS " FROM gn090 "\
157   "WHERE uid >= ? AND "\
158   "(rvalue >= ? OR 0 = ?) "\
159   "ORDER BY uid LIMIT 1"
160   struct GNUNET_MYSQL_StatementHandle *select_entry;
161
162 #define SELECT_ENTRY_BY_HASH "SELECT " RESULT_COLUMNS " FROM gn090 "\
163   "FORCE INDEX (idx_hash_type_uid) "\
164   "WHERE hash=? AND "\
165   "uid >= ? AND "\
166   "(rvalue >= ? OR 0 = ?) "\
167   "ORDER BY uid LIMIT 1"
168   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
169
170 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT " RESULT_COLUMNS " FROM gn090 "\
171   "FORCE INDEX (idx_hash_type_uid) "\
172   "WHERE hash = ? AND "\
173   "type = ? AND "\
174   "uid >= ? AND "\
175   "(rvalue >= ? OR 0 = ?) "\
176   "ORDER BY uid LIMIT 1"
177   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
178
179 #define UPDATE_ENTRY "UPDATE gn090 SET "\
180   "prio = prio + ?, "\
181   "repl = repl + ?, "\
182   "expire = GREATEST(expire, ?) "\
183   "WHERE hash = ? AND vhash = ?"
184   struct GNUNET_MYSQL_StatementHandle *update_entry;
185
186 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
187   struct GNUNET_MYSQL_StatementHandle *dec_repl;
188
189 #define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
190   struct GNUNET_MYSQL_StatementHandle *get_size;
191
192 #define SELECT_IT_NON_ANONYMOUS "SELECT " RESULT_COLUMNS " FROM gn090 "\
193   "FORCE INDEX (idx_anonLevel_type_rvalue) "\
194   "WHERE anonLevel=0 AND "\
195   "type=? AND "\
196   "uid >= ? "\
197   "ORDER BY uid LIMIT 1"
198   struct GNUNET_MYSQL_StatementHandle *zero_iter;
199
200 #define SELECT_IT_EXPIRATION "SELECT " RESULT_COLUMNS " FROM gn090 "\
201   "FORCE INDEX (idx_expire) "\
202   "WHERE expire < ? "\
203   "ORDER BY expire ASC LIMIT 1"
204   struct GNUNET_MYSQL_StatementHandle *select_expiration;
205
206 #define SELECT_IT_PRIORITY "SELECT " RESULT_COLUMNS " FROM gn090 "\
207   "FORCE INDEX (idx_prio) "\
208   "ORDER BY prio ASC LIMIT 1"
209   struct GNUNET_MYSQL_StatementHandle *select_priority;
210
211 #define SELECT_IT_REPLICATION "SELECT " RESULT_COLUMNS " FROM gn090 "\
212   "FORCE INDEX (idx_repl_rvalue) "\
213   "WHERE repl=? AND "\
214   " (rvalue>=? OR"\
215   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
216   "ORDER BY rvalue ASC "\
217   "LIMIT 1"
218   struct GNUNET_MYSQL_StatementHandle *select_replication;
219
220 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
221   struct GNUNET_MYSQL_StatementHandle *max_repl;
222
223 #define GET_ALL_KEYS "SELECT hash from gn090"
224   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
225
226 };
227
228 #define MAX_PARAM 16
229
230 /**
231  * Delete an entry from the gn090 table.
232  *
233  * @param plugin plugin context
234  * @param uid unique ID of the entry to delete
235  * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
236  */
237 static int
238 do_delete_entry (struct Plugin *plugin,
239                  unsigned long long uid)
240 {
241   int ret;
242   uint64_t uid64 = (uint64_t) uid;
243   struct GNUNET_MY_QueryParam params_delete[] = {
244     GNUNET_MY_query_param_uint64 (&uid64),
245     GNUNET_MY_query_param_end
246   };
247
248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249               "Deleting value %llu from gn090 table\n",
250               uid);
251   ret = GNUNET_MY_exec_prepared (plugin->mc,
252                                  plugin->delete_entry_by_uid,
253                                  params_delete);
254   if (ret >= 0)
255   {
256     return GNUNET_OK;
257   }
258   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
259               "Deleting value %llu from gn090 table failed\n",
260               (unsigned long long) uid);
261   return ret;
262 }
263
264
265 /**
266  * Get an estimate of how much space the database is
267  * currently using.
268  *
269  * @param cls our `struct Plugin *`
270  * @return number of bytes used on disk
271  */
272 static void
273 mysql_plugin_estimate_size (void *cls,
274                             unsigned long long *estimate)
275 {
276   struct Plugin *plugin = cls;
277   uint64_t total;
278   int ret;
279   struct GNUNET_MY_QueryParam params_get[] = {
280     GNUNET_MY_query_param_end
281   };
282   struct GNUNET_MY_ResultSpec results_get[] = {
283     GNUNET_MY_result_spec_uint64 (&total),
284     GNUNET_MY_result_spec_end
285   };
286
287   ret = GNUNET_MY_exec_prepared (plugin->mc,
288                                  plugin->get_size,
289                                  params_get);
290   *estimate = 0;
291   total = UINT64_MAX;
292   if ( (GNUNET_OK == ret) &&
293        (GNUNET_OK ==
294         GNUNET_MY_extract_result (plugin->get_size,
295                                   results_get)) )
296   {
297     *estimate = (unsigned long long) total;
298     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
299                 "Size estimate for MySQL payload is %lld\n",
300                 (long long) total);
301     GNUNET_assert (UINT64_MAX != total);
302     GNUNET_break (GNUNET_NO ==
303                   GNUNET_MY_extract_result (plugin->get_size,
304                                             NULL));
305   }
306 }
307
308
309 /**
310  * Store an item in the datastore.
311  *
312  * @param cls closure
313  * @param key key for the item
314  * @param absent true if the key was not found in the bloom filter
315  * @param size number of bytes in @a data
316  * @param data content stored
317  * @param type type of the content
318  * @param priority priority of the content
319  * @param anonymity anonymity-level for the content
320  * @param replication replication-level for the content
321  * @param expiration expiration time for the content
322  * @param cont continuation called with success or failure status
323  * @param cont_cls closure for @a cont
324  */
325 static void
326 mysql_plugin_put (void *cls,
327                   const struct GNUNET_HashCode *key,
328                   bool absent,
329                   uint32_t size,
330                   const void *data,
331                   enum GNUNET_BLOCK_Type type,
332                   uint32_t priority,
333                   uint32_t anonymity,
334                   uint32_t replication,
335                   struct GNUNET_TIME_Absolute expiration,
336                   PluginPutCont cont,
337                   void *cont_cls)
338 {
339   struct Plugin *plugin = cls;
340   uint64_t lexpiration = expiration.abs_value_us;
341   struct GNUNET_HashCode vhash;
342
343   GNUNET_CRYPTO_hash (data,
344                       size,
345                       &vhash);
346   if (!absent)
347   {
348     struct GNUNET_MY_QueryParam params_update[] = {
349       GNUNET_MY_query_param_uint32 (&priority),
350       GNUNET_MY_query_param_uint32 (&replication),
351       GNUNET_MY_query_param_uint64 (&lexpiration),
352       GNUNET_MY_query_param_auto_from_type (key),
353       GNUNET_MY_query_param_auto_from_type (&vhash),
354       GNUNET_MY_query_param_end
355     };
356
357     if (GNUNET_OK !=
358         GNUNET_MY_exec_prepared (plugin->mc,
359                                  plugin->update_entry,
360                                  params_update))
361     {
362       cont (cont_cls,
363             key,
364             size,
365             GNUNET_SYSERR,
366             _("MySQL statement run failure"));
367       return;
368     }
369
370     MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->update_entry);
371     my_ulonglong rows = mysql_stmt_affected_rows (stmt);
372
373     GNUNET_break (GNUNET_NO ==
374                   GNUNET_MY_extract_result (plugin->update_entry,
375                                             NULL));
376     if (0 != rows)
377     {
378       cont (cont_cls,
379             key,
380             size,
381             GNUNET_NO,
382             NULL);
383       return;
384     }
385   }
386
387   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
388                                                UINT64_MAX);
389   struct GNUNET_MY_QueryParam params_insert[] = {
390     GNUNET_MY_query_param_uint32 (&replication),
391     GNUNET_MY_query_param_uint32 (&type),
392     GNUNET_MY_query_param_uint32 (&priority),
393     GNUNET_MY_query_param_uint32 (&anonymity),
394     GNUNET_MY_query_param_uint64 (&lexpiration),
395     GNUNET_MY_query_param_uint64 (&lrvalue),
396     GNUNET_MY_query_param_auto_from_type (key),
397     GNUNET_MY_query_param_auto_from_type (&vhash),
398     GNUNET_MY_query_param_fixed_size (data, size),
399     GNUNET_MY_query_param_end
400   };
401
402   if (size > MAX_DATUM_SIZE)
403   {
404     GNUNET_break (0);
405     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
406     return;
407   }
408
409   if (GNUNET_OK !=
410       GNUNET_MY_exec_prepared (plugin->mc,
411                                plugin->insert_entry,
412                                params_insert))
413   {
414     cont (cont_cls,
415           key,
416           size,
417           GNUNET_SYSERR,
418           _("MySQL statement run failure"));
419     return;
420   }
421   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
422               "Inserted value `%s' with size %u into gn090 table\n",
423               GNUNET_h2s (key),
424               (unsigned int) size);
425   if (size > 0)
426     plugin->env->duc (plugin->env->cls,
427                       size);
428   GNUNET_break (GNUNET_NO ==
429                 GNUNET_MY_extract_result (plugin->insert_entry,
430                                           NULL));
431   cont (cont_cls,
432         key,
433         size,
434         GNUNET_OK,
435         NULL);
436 }
437
438
439 /**
440  * Run the given select statement and call 'proc' on the resulting
441  * values (which must be in particular positions).
442  *
443  * @param plugin the plugin handle
444  * @param stmt select statement to run
445  * @param proc function to call on result
446  * @param proc_cls closure for @a proc
447  * @param params_select arguments to initialize stmt
448  */
449 static void
450 execute_select (struct Plugin *plugin,
451                 struct GNUNET_MYSQL_StatementHandle *stmt,
452                 PluginDatumProcessor proc,
453                 void *proc_cls,
454                 struct GNUNET_MY_QueryParam *params_select)
455 {
456   int ret;
457   uint32_t replication;
458   uint32_t type;
459   uint32_t priority;
460   uint32_t anonymity;
461   uint64_t uid;
462   size_t value_size;
463   void *value;
464   struct GNUNET_HashCode key;
465   struct GNUNET_TIME_Absolute expiration;
466   struct GNUNET_MY_ResultSpec results_select[] = {
467     GNUNET_MY_result_spec_uint32 (&replication),
468     GNUNET_MY_result_spec_uint32 (&type),
469     GNUNET_MY_result_spec_uint32 (&priority),
470     GNUNET_MY_result_spec_uint32 (&anonymity),
471     GNUNET_MY_result_spec_absolute_time (&expiration),
472     GNUNET_MY_result_spec_auto_from_type (&key),
473     GNUNET_MY_result_spec_variable_size (&value, &value_size),
474     GNUNET_MY_result_spec_uint64 (&uid),
475     GNUNET_MY_result_spec_end
476   };
477
478   ret = GNUNET_MY_exec_prepared (plugin->mc,
479                                  stmt,
480                                  params_select);
481   if (GNUNET_OK != ret)
482   {
483     proc (proc_cls,
484           NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
485     return;
486   }
487
488   ret = GNUNET_MY_extract_result (stmt,
489                                   results_select);
490   if (GNUNET_OK != ret)
491   {
492     proc (proc_cls,
493           NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
494     return;
495   }
496
497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
499               (unsigned int) value_size,
500               GNUNET_h2s (&key),
501               (unsigned int) priority,
502               (unsigned int) anonymity,
503               GNUNET_STRINGS_absolute_time_to_string (expiration));
504   GNUNET_assert (value_size < MAX_DATUM_SIZE);
505   GNUNET_break (GNUNET_NO ==
506                 GNUNET_MY_extract_result (stmt,
507                                           NULL));
508   ret = proc (proc_cls,
509               &key,
510               value_size,
511               value,
512               type,
513               priority,
514               anonymity,
515               replication,
516               expiration,
517               uid);
518   GNUNET_MY_cleanup_result (results_select);
519   if (GNUNET_NO == ret)
520   {
521     do_delete_entry (plugin, uid);
522     if (0 != value_size)
523       plugin->env->duc (plugin->env->cls,
524                         - value_size);
525   }
526 }
527
528
529 /**
530  * Get one of the results for a particular key in the datastore.
531  *
532  * @param cls closure
533  * @param next_uid return the result with lowest uid >= next_uid
534  * @param random if true, return a random result instead of using next_uid
535  * @param key key to match, never NULL
536  * @param type entries of which type are relevant?
537  *     Use 0 for any type.
538  * @param proc function to call on the matching value,
539  *        with NULL for if no value matches
540  * @param proc_cls closure for @a proc
541  */
542 static void
543 mysql_plugin_get_key (void *cls,
544                       uint64_t next_uid,
545                       bool random,
546                       const struct GNUNET_HashCode *key,
547                       enum GNUNET_BLOCK_Type type,
548                       PluginDatumProcessor proc,
549                       void *proc_cls)
550 {
551   struct Plugin *plugin = cls;
552   uint64_t rvalue;
553
554   if (random)
555   {
556     rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
557                                        UINT64_MAX);
558     next_uid = 0;
559   }
560   else
561     rvalue = 0;
562
563   if (NULL == key)
564   {
565     struct GNUNET_MY_QueryParam params_select[] = {
566       GNUNET_MY_query_param_uint64 (&next_uid),
567       GNUNET_MY_query_param_uint64 (&rvalue),
568       GNUNET_MY_query_param_uint64 (&rvalue),
569       GNUNET_MY_query_param_end
570     };
571
572     execute_select (plugin,
573                     plugin->select_entry,
574                     proc,
575                     proc_cls,
576                     params_select);
577   }
578   else if (type != GNUNET_BLOCK_TYPE_ANY)
579   {
580     struct GNUNET_MY_QueryParam params_select[] = {
581       GNUNET_MY_query_param_auto_from_type (key),
582       GNUNET_MY_query_param_uint32 (&type),
583       GNUNET_MY_query_param_uint64 (&next_uid),
584       GNUNET_MY_query_param_uint64 (&rvalue),
585       GNUNET_MY_query_param_uint64 (&rvalue),
586       GNUNET_MY_query_param_end
587     };
588
589     execute_select (plugin,
590                     plugin->select_entry_by_hash_and_type,
591                     proc,
592                     proc_cls,
593                     params_select);
594   }
595   else
596   {
597     struct GNUNET_MY_QueryParam params_select[] = {
598       GNUNET_MY_query_param_auto_from_type (key),
599       GNUNET_MY_query_param_uint64 (&next_uid),
600       GNUNET_MY_query_param_uint64 (&rvalue),
601       GNUNET_MY_query_param_uint64 (&rvalue),
602       GNUNET_MY_query_param_end
603     };
604
605     execute_select (plugin,
606                     plugin->select_entry_by_hash,
607                     proc,
608                     proc_cls,
609                     params_select);
610   }
611 }
612
613
614 /**
615  * Get a zero-anonymity datum from the datastore.
616  *
617  * @param cls our `struct Plugin *`
618  * @param next_uid return the result with lowest uid >= next_uid
619  * @param type entries of which type should be considered?
620  *        Must not be zero (ANY).
621  * @param proc function to call on a matching value;
622  *        will be called with NULL if no value matches
623  * @param proc_cls closure for @a proc
624  */
625 static void
626 mysql_plugin_get_zero_anonymity (void *cls,
627                                  uint64_t next_uid,
628                                  enum GNUNET_BLOCK_Type type,
629                                  PluginDatumProcessor proc,
630                                  void *proc_cls)
631 {
632   struct Plugin *plugin = cls;
633   uint32_t typei = (uint32_t) type;
634
635   struct GNUNET_MY_QueryParam params_zero_iter[] = {
636     GNUNET_MY_query_param_uint32 (&typei),
637     GNUNET_MY_query_param_uint64 (&next_uid),
638     GNUNET_MY_query_param_end
639   };
640
641   execute_select (plugin,
642                   plugin->zero_iter,
643                   proc,
644                   proc_cls,
645                   params_zero_iter);
646 }
647
648
649 /**
650  * Context for #repl_proc() function.
651  */
652 struct ReplCtx
653 {
654
655   /**
656    * Plugin handle.
657    */
658   struct Plugin *plugin;
659
660   /**
661    * Function to call for the result (or the NULL).
662    */
663   PluginDatumProcessor proc;
664
665   /**
666    * Closure for @e proc.
667    */
668   void *proc_cls;
669 };
670
671
672 /**
673  * Wrapper for the processor for #mysql_plugin_get_replication().
674  * Decrements the replication counter and calls the original
675  * iterator.
676  *
677  * @param cls closure
678  * @param key key for the content
679  * @param size number of bytes in @a data
680  * @param data content stored
681  * @param type type of the content
682  * @param priority priority of the content
683  * @param anonymity anonymity-level for the content
684  * @param replication replication-level for the content
685  * @param expiration expiration time for the content
686  * @param uid unique identifier for the datum;
687  *        maybe 0 if no unique identifier is available
688  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
689  *         (continue on call to "next", of course),
690  *         #GNUNET_NO to delete the item and continue (if supported)
691  */
692 static int
693 repl_proc (void *cls,
694            const struct GNUNET_HashCode *key,
695            uint32_t size,
696            const void *data,
697            enum GNUNET_BLOCK_Type type,
698            uint32_t priority,
699            uint32_t anonymity,
700            uint32_t replication,
701            struct GNUNET_TIME_Absolute expiration,
702            uint64_t uid)
703 {
704   struct ReplCtx *rc = cls;
705   struct Plugin *plugin = rc->plugin;
706   int ret;
707   int iret;
708
709   ret = rc->proc (rc->proc_cls,
710                   key,
711                   size,
712                   data,
713                   type,
714                   priority,
715                   anonymity,
716                   replication,
717                   expiration,
718                   uid);
719   if (NULL != key)
720   {
721     struct GNUNET_MY_QueryParam params_proc[] = {
722       GNUNET_MY_query_param_uint64 (&uid),
723       GNUNET_MY_query_param_end
724     };
725
726     iret = GNUNET_MY_exec_prepared (plugin->mc,
727                                     plugin->dec_repl,
728                                     params_proc);
729     if (GNUNET_SYSERR == iret)
730     {
731       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
732                   "Failed to reduce replication counter\n");
733       return GNUNET_SYSERR;
734     }
735   }
736   return ret;
737 }
738
739
740 /**
741  * Get a random item for replication.  Returns a single, not expired,
742  * random item from those with the highest replication counters.  The
743  * item's replication counter is decremented by one IF it was positive
744  * before.  Call @a proc with all values ZERO or NULL if the datastore
745  * is empty.
746  *
747  * @param cls closure
748  * @param proc function to call the value (once only).
749  * @param proc_cls closure for @a proc
750  */
751 static void
752 mysql_plugin_get_replication (void *cls,
753                               PluginDatumProcessor proc,
754                               void *proc_cls)
755 {
756   struct Plugin *plugin = cls;
757   uint64_t rvalue;
758   uint32_t repl;
759   struct ReplCtx rc;
760   struct GNUNET_MY_QueryParam params_get[] = {
761     GNUNET_MY_query_param_end
762   };
763   struct GNUNET_MY_ResultSpec results_get[] = {
764     GNUNET_MY_result_spec_uint32 (&repl),
765     GNUNET_MY_result_spec_end
766   };
767   struct GNUNET_MY_QueryParam params_select[] = {
768     GNUNET_MY_query_param_uint32 (&repl),
769     GNUNET_MY_query_param_uint64 (&rvalue),
770     GNUNET_MY_query_param_uint32 (&repl),
771     GNUNET_MY_query_param_uint64 (&rvalue),
772     GNUNET_MY_query_param_end
773   };
774
775   rc.plugin = plugin;
776   rc.proc = proc;
777   rc.proc_cls = proc_cls;
778
779   if (1 !=
780       GNUNET_MY_exec_prepared (plugin->mc,
781                                plugin->max_repl,
782                                params_get))
783   {
784     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
785     return;
786   }
787
788   if (GNUNET_OK !=
789       GNUNET_MY_extract_result (plugin->max_repl,
790                                 results_get))
791   {
792     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
793     return;
794   }
795   GNUNET_break (GNUNET_NO ==
796                 GNUNET_MY_extract_result (plugin->max_repl,
797                                           NULL));
798   rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
799                                      UINT64_MAX);
800
801   execute_select (plugin,
802                   plugin->select_replication,
803                   &repl_proc,
804                   &rc,
805                   params_select);
806 }
807
808
809 /**
810  * Get all of the keys in the datastore.
811  *
812  * @param cls closure
813  * @param proc function to call on each key
814  * @param proc_cls closure for @a proc
815  */
816 static void
817 mysql_plugin_get_keys (void *cls,
818                        PluginKeyProcessor proc,
819                        void *proc_cls)
820 {
821   struct Plugin *plugin = cls;
822   int ret;
823   MYSQL_STMT *statement;
824   unsigned int cnt;
825   struct GNUNET_HashCode key;
826   struct GNUNET_HashCode last;
827   struct GNUNET_MY_QueryParam params_select[] = {
828     GNUNET_MY_query_param_end
829   };
830   struct GNUNET_MY_ResultSpec results_select[] = {
831     GNUNET_MY_result_spec_auto_from_type (&key),
832     GNUNET_MY_result_spec_end
833   };
834
835   GNUNET_assert (NULL != proc);
836   statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
837   if (GNUNET_OK !=
838       GNUNET_MY_exec_prepared (plugin->mc,
839                                plugin->get_all_keys,
840                                params_select))
841   {
842     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
843                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
844                 "mysql_stmt_execute",
845                 GET_ALL_KEYS,
846                 __FILE__,
847                 __LINE__,
848                 mysql_stmt_error (statement));
849     GNUNET_MYSQL_statements_invalidate (plugin->mc);
850     proc (proc_cls, NULL, 0);
851     return;
852   }
853   memset (&last, 0, sizeof (last)); /* make static analysis happy */
854   ret = GNUNET_YES;
855   cnt = 0;
856   while (ret == GNUNET_YES)
857   {
858     ret = GNUNET_MY_extract_result (plugin->get_all_keys,
859                                     results_select);
860     if (0 != memcmp (&last,
861                      &key,
862                      sizeof (key)))
863     {
864       if (0 != cnt)
865         proc (proc_cls,
866               &last,
867               cnt);
868       cnt = 1;
869       last = key;
870     }
871     else
872     {
873       cnt++;
874     }
875   }
876   if (0 != cnt)
877     proc (proc_cls,
878           &last,
879           cnt);
880   /* finally, let app know we are done */
881   proc (proc_cls,
882         NULL,
883         0);
884   if (GNUNET_SYSERR == ret)
885   {
886     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
887                 _("`%s' failed at %s:%d with error: %s\n"),
888                 "mysql_stmt_fetch",
889                 __FILE__,
890                 __LINE__,
891                 mysql_stmt_error (statement));
892     GNUNET_MYSQL_statements_invalidate (plugin->mc);
893     return;
894   }
895 }
896
897
898 /**
899  * Context for #expi_proc() function.
900  */
901 struct ExpiCtx
902 {
903
904   /**
905    * Plugin handle.
906    */
907   struct Plugin *plugin;
908
909   /**
910    * Function to call for the result (or the NULL).
911    */
912   PluginDatumProcessor proc;
913
914   /**
915    * Closure for @e proc.
916    */
917   void *proc_cls;
918 };
919
920
921
922 /**
923  * Wrapper for the processor for #mysql_plugin_get_expiration().
924  * If no expired value was found, we do a second query for
925  * low-priority content.
926  *
927  * @param cls closure
928  * @param key key for the content
929  * @param size number of bytes in data
930  * @param data content stored
931  * @param type type of the content
932  * @param priority priority of the content
933  * @param anonymity anonymity-level for the content
934  * @param replication replication-level for the content
935  * @param expiration expiration time for the content
936  * @param uid unique identifier for the datum;
937  *        maybe 0 if no unique identifier is available
938  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
939  *         (continue on call to "next", of course),
940  *         #GNUNET_NO to delete the item and continue (if supported)
941  */
942 static int
943 expi_proc (void *cls,
944            const struct GNUNET_HashCode *key,
945            uint32_t size,
946            const void *data,
947            enum GNUNET_BLOCK_Type type,
948            uint32_t priority,
949            uint32_t anonymity,
950            uint32_t replication,
951            struct GNUNET_TIME_Absolute expiration,
952            uint64_t uid)
953 {
954   struct ExpiCtx *rc = cls;
955   struct Plugin *plugin = rc->plugin;
956   struct GNUNET_MY_QueryParam params_select[] = {
957     GNUNET_MY_query_param_end
958   };
959
960   if (NULL == key)
961   {
962     execute_select (plugin,
963                     plugin->select_priority,
964                     rc->proc,
965                     rc->proc_cls,
966                     params_select);
967     return GNUNET_SYSERR;
968   }
969   return rc->proc (rc->proc_cls,
970                    key,
971                    size,
972                    data,
973                    type,
974                    priority,
975                    anonymity,
976                    replication,
977                    expiration,
978                    uid);
979 }
980
981
982 /**
983  * Get a random item for expiration.
984  * Call @a proc with all values ZERO or NULL if the datastore is empty.
985  *
986  * @param cls closure
987  * @param proc function to call the value (once only).
988  * @param proc_cls closure for @a proc
989  */
990 static void
991 mysql_plugin_get_expiration (void *cls,
992                              PluginDatumProcessor proc,
993                              void *proc_cls)
994 {
995   struct Plugin *plugin = cls;
996   struct GNUNET_TIME_Absolute now;
997   struct GNUNET_MY_QueryParam params_select[] = {
998     GNUNET_MY_query_param_absolute_time (&now),
999     GNUNET_MY_query_param_end
1000   };
1001   struct ExpiCtx rc;
1002
1003   rc.plugin = plugin;
1004   rc.proc = proc;
1005   rc.proc_cls = proc_cls;
1006   now = GNUNET_TIME_absolute_get ();
1007   execute_select (plugin,
1008                   plugin->select_expiration,
1009                   expi_proc,
1010                   &rc,
1011                   params_select);
1012 }
1013
1014
1015 /**
1016  * Drop database.
1017  *
1018  * @param cls the `struct Plugin *`
1019  */
1020 static void
1021 mysql_plugin_drop (void *cls)
1022 {
1023   struct Plugin *plugin = cls;
1024
1025   if (GNUNET_OK !=
1026       GNUNET_MYSQL_statement_run (plugin->mc,
1027                                   "DROP TABLE gn090"))
1028     return;                     /* error */
1029   plugin->env->duc (plugin->env->cls, 0);
1030 }
1031
1032
1033 /**
1034  * Remove a particular key in the datastore.
1035  *
1036  * @param cls closure
1037  * @param key key for the content
1038  * @param size number of bytes in data
1039  * @param data content stored
1040  * @param cont continuation called with success or failure status
1041  * @param cont_cls continuation closure for @a cont
1042  */
1043 static void
1044 mysql_plugin_remove_key (void *cls,
1045                          const struct GNUNET_HashCode *key,
1046                          uint32_t size,
1047                          const void *data,
1048                          PluginRemoveCont cont,
1049                          void *cont_cls)
1050 {
1051   struct Plugin *plugin = cls;
1052   struct GNUNET_MY_QueryParam params_delete[] = {
1053     GNUNET_MY_query_param_auto_from_type (key),
1054     GNUNET_MY_query_param_fixed_size (data, size),
1055     GNUNET_MY_query_param_end
1056   };
1057
1058   if (GNUNET_OK !=
1059       GNUNET_MY_exec_prepared (plugin->mc,
1060                                plugin->delete_entry_by_hash_value,
1061                                params_delete))
1062   {
1063     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1064                 "Removing key `%s' from gn090 table failed\n",
1065                 GNUNET_h2s (key));
1066     cont (cont_cls,
1067           key,
1068           size,
1069           GNUNET_SYSERR,
1070           _("MySQL statement run failure"));
1071     return;
1072   }
1073
1074   MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->delete_entry_by_hash_value);
1075   my_ulonglong rows = mysql_stmt_affected_rows (stmt);
1076
1077   if (0 == rows)
1078   {
1079     cont (cont_cls,
1080           key,
1081           size,
1082           GNUNET_NO,
1083           NULL);
1084     return;
1085   }
1086   plugin->env->duc (plugin->env->cls,
1087                     -size);
1088   cont (cont_cls,
1089         key,
1090         size,
1091         GNUNET_OK,
1092         NULL);
1093 }
1094
1095
1096 /**
1097  * Entry point for the plugin.
1098  *
1099  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
1100  * @return our `struct Plugin *`
1101  */
1102 void *
1103 libgnunet_plugin_datastore_mysql_init (void *cls)
1104 {
1105   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1106   struct GNUNET_DATASTORE_PluginFunctions *api;
1107   struct Plugin *plugin;
1108
1109   plugin = GNUNET_new (struct Plugin);
1110   plugin->env = env;
1111   plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
1112                                             "datastore-mysql");
1113   if (NULL == plugin->mc)
1114   {
1115     GNUNET_free (plugin);
1116     return NULL;
1117   }
1118 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
1119 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
1120   if (MRUNS
1121       ("CREATE TABLE IF NOT EXISTS gn090 ("
1122        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1123        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1124        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1125        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1126        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1127        " rvalue BIGINT UNSIGNED NOT NULL,"
1128        " hash BINARY(64) NOT NULL DEFAULT '',"
1129        " vhash BINARY(64) NOT NULL DEFAULT '',"
1130        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
1131        " PRIMARY KEY (uid),"
1132        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
1133        " INDEX idx_prio (prio),"
1134        " INDEX idx_repl_rvalue (repl,rvalue),"
1135        " INDEX idx_expire (expire),"
1136        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
1137        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
1138       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1139       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1140       PINIT (plugin->delete_entry_by_hash_value, DELETE_ENTRY_BY_HASH_VALUE) ||
1141       PINIT (plugin->select_entry, SELECT_ENTRY) ||
1142       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1143       PINIT (plugin->select_entry_by_hash_and_type,
1144              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1145       PINIT (plugin->get_size, SELECT_SIZE) ||
1146       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1147       PINIT (plugin->dec_repl, DEC_REPL) ||
1148       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1149       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1150       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1151       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1152       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1153       PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ||
1154       false)
1155   {
1156     GNUNET_MYSQL_context_destroy (plugin->mc);
1157     GNUNET_free (plugin);
1158     return NULL;
1159   }
1160 #undef PINIT
1161 #undef MRUNS
1162
1163   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1164   api->cls = plugin;
1165   api->estimate_size = &mysql_plugin_estimate_size;
1166   api->put = &mysql_plugin_put;
1167   api->get_key = &mysql_plugin_get_key;
1168   api->get_replication = &mysql_plugin_get_replication;
1169   api->get_expiration = &mysql_plugin_get_expiration;
1170   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1171   api->get_keys = &mysql_plugin_get_keys;
1172   api->drop = &mysql_plugin_drop;
1173   api->remove_key = &mysql_plugin_remove_key;
1174   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1175                    _("Mysql database running\n"));
1176   return api;
1177 }
1178
1179
1180 /**
1181  * Exit point from the plugin.
1182  *
1183  * @param cls our `struct Plugin *`
1184  * @return always NULL
1185  */
1186 void *
1187 libgnunet_plugin_datastore_mysql_done (void *cls)
1188 {
1189   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1190   struct Plugin *plugin = api->cls;
1191
1192   GNUNET_MYSQL_context_destroy (plugin->mc);
1193   GNUNET_free (plugin);
1194   GNUNET_free (api);
1195   return NULL;
1196 }
1197
1198 /* end of plugin_datastore_mysql.c */