1067064aa1ca0cfdb824ee49dbf5d918844c1a4a
[oweals/gnunet.git] / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  *
28  * NOTE: This db module does NOT work with mysql prior to 4.1 since
29  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
30  * in MyISAM that is causing us grief.  At the time of this writing,
31  * that version is yet to be released.  In anticipation, the code
32  * will use MyISAM with 5.0.46 (and higher).  If you run such a
33  * version, please run "make check" to verify that the MySQL bug
34  * was actually fixed in your version (and if not, change the
35  * code below to use MyISAM for gn071).
36  *
37  * HIGHLIGHTS
38  *
39  * Pros
40  * + On up-to-date hardware where mysql can be used comfortably, this
41  *   module will have better performance than the other db choices
42  *   (according to our tests).
43  * + Its often possible to recover the mysql database from internal
44  *   inconsistencies. The other db choices do not support repair!
45  * Cons
46  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
47  * - Manual setup
48  *
49  * MANUAL SETUP INSTRUCTIONS
50  *
51  * 1) in gnunet.conf, set
52  * @verbatim
53        [datastore]
54        DATABASE = "mysql"
55    @endverbatim
56  * 2) Then access mysql as root,
57  * @verbatim
58      $ mysql -u root -p
59    @endverbatim
60  *    and do the following. [You should replace $USER with the username
61  *    that will be running the gnunetd process].
62  * @verbatim
63       CREATE DATABASE gnunet;
64       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
65          ON gnunet.* TO $USER@localhost;
66       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
67       FLUSH PRIVILEGES;
68    @endverbatim
69  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
70  *    with the following lines
71  * @verbatim
72       [client]
73       user=$USER
74       password=$the_password_you_like
75    @endverbatim
76  *
77  * Thats it. Note that .my.cnf file is a security risk unless its on
78  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
79  * link. Even greater security risk can be achieved by setting no
80  * password for $USER.  Luckily $USER has only priviledges to mess
81  * up GNUnet's tables, nothing else (unless you give him more,
82  * of course).<p>
83  *
84  * 4) Still, perhaps you should briefly try if the DB connection
85  *    works. First, login as $USER. Then use,
86  *
87  * @verbatim
88      $ mysql -u $USER -p $the_password_you_like
89      mysql> use gnunet;
90    @endverbatim
91  *
92  *    If you get the message &quot;Database changed&quot; it probably works.
93  *
94  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
95  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
96  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
97  *     so there may be some additional trouble depending on your mysql setup.]
98  *
99  * REPAIRING TABLES
100  *
101  * - Its probably healthy to check your tables for inconsistencies
102  *   every now and then.
103  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
104  *   databases have been corrupted.
105  * - The tables can be verified/fixed in two ways;
106  *   1) by running mysqlcheck -A, or
107  *   2) by executing (inside of mysql using the GNUnet database):
108  * @verbatim
109      mysql> REPAIR TABLE gn090;
110    @endverbatim
111  *
112  * PROBLEMS?
113  *
114  * If you have problems related to the mysql module, your best
115  * friend is probably the mysql manual. The first thing to check
116  * is that mysql is basically operational, that you can connect
117  * to it, create tables, issue queries etc.
118  */
119
120 #include "platform.h"
121 #include "gnunet_datastore_plugin.h"
122 #include "gnunet_util_lib.h"
123 #include "gnunet_mysql_lib.h"
124 #include "gnunet_my_lib.h"
125
126 #define MAX_DATUM_SIZE 65536
127
128
129 /**
130  * Context for all functions in this plugin.
131  */
132 struct Plugin
133 {
134   /**
135    * Our execution environment.
136    */
137   struct GNUNET_DATASTORE_PluginEnvironment *env;
138
139   /**
140    * Handle to talk to MySQL.
141    */
142   struct GNUNET_MYSQL_Context *mc;
143
144   /**
145    * Prepared statements.
146    */
147 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
148   struct GNUNET_MYSQL_StatementHandle *insert_entry;
149
150 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
151   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
152
153 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
154   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
155
156 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
157   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
158
159 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
160   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
161
162 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
163   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
164
165 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
166   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
167
168 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
169   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
170
171 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
172   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
173
174 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
175   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
176
177 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
178   struct GNUNET_MYSQL_StatementHandle *update_entry;
179
180 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
181   struct GNUNET_MYSQL_StatementHandle *dec_repl;
182
183 #define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
184   struct GNUNET_MYSQL_StatementHandle *get_size;
185
186 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
187    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
188    "WHERE anonLevel=0 AND type=? AND "\
189    "(rvalue >= ? OR"\
190    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
191    "ORDER BY rvalue ASC LIMIT 1"
192   struct GNUNET_MYSQL_StatementHandle *zero_iter;
193
194 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
195   struct GNUNET_MYSQL_StatementHandle *select_expiration;
196
197 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
198   struct GNUNET_MYSQL_StatementHandle *select_priority;
199
200 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
201   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
202   "WHERE repl=? AND "\
203   " (rvalue>=? OR"\
204   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
205   "ORDER BY rvalue ASC "\
206   "LIMIT 1"
207   struct GNUNET_MYSQL_StatementHandle *select_replication;
208
209 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
210   struct GNUNET_MYSQL_StatementHandle *max_repl;
211
212 #define GET_ALL_KEYS "SELECT hash from gn090"
213   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
214
215 };
216
217 #define MAX_PARAM 16
218
219 /**
220  * Delete an entry from the gn090 table.
221  *
222  * @param plugin plugin context
223  * @param uid unique ID of the entry to delete
224  * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
225  */
226 static int
227 do_delete_entry (struct Plugin *plugin,
228                  unsigned long long uid)
229 {
230   int ret;
231   uint64_t uid64 = (uint64_t) uid;
232   struct GNUNET_MY_QueryParam params_delete[] = {
233     GNUNET_MY_query_param_uint64 (&uid64),
234     GNUNET_MY_query_param_end
235   };
236
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238               "Deleting value %llu from gn090 table\n",
239               uid);
240   ret = GNUNET_MY_exec_prepared (plugin->mc,
241                                  plugin->delete_entry_by_uid,
242                                  params_delete);
243   if (ret >= 0)
244   {
245     return GNUNET_OK;
246   }
247   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
248               "Deleting value %llu from gn090 table failed\n",
249               (unsigned long long) uid);
250   return ret;
251 }
252
253
254 /**
255  * Get an estimate of how much space the database is
256  * currently using.
257  *
258  * @param cls our `struct Plugin *`
259  * @return number of bytes used on disk
260  */
261 static void
262 mysql_plugin_estimate_size (void *cls,
263                             unsigned long long *estimate)
264 {
265   struct Plugin *plugin = cls;
266   uint64_t total;
267   int ret;
268   struct GNUNET_MY_QueryParam params_get[] = {
269     GNUNET_MY_query_param_end
270   };
271   struct GNUNET_MY_ResultSpec results_get[] = {
272     GNUNET_MY_result_spec_uint64 (&total),
273     GNUNET_MY_result_spec_end
274   };
275
276   ret = GNUNET_MY_exec_prepared (plugin->mc,
277                                  plugin->get_size,
278                                  params_get);
279   *estimate = 0;
280   total = UINT64_MAX;
281   if ( (GNUNET_OK == ret) &&
282        (GNUNET_OK ==
283         GNUNET_MY_extract_result (plugin->get_size,
284                                   results_get)) )
285   {
286     *estimate = (unsigned long long) total;
287     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
288                 "Size estimate for MySQL payload is %lld\n",
289                 (long long) total);
290     GNUNET_assert (UINT64_MAX != total);
291     GNUNET_break (GNUNET_NO ==
292                   GNUNET_MY_extract_result (plugin->get_size,
293                                             NULL));
294   }
295 }
296
297
298 /**
299  * Store an item in the datastore.
300  *
301  * @param cls closure
302  * @param key key for the item
303  * @param size number of bytes in @a data
304  * @param data content stored
305  * @param type type of the content
306  * @param priority priority of the content
307  * @param anonymity anonymity-level for the content
308  * @param replication replication-level for the content
309  * @param expiration expiration time for the content
310  * @param cont continuation called with success or failure status
311  * @param cont_cls closure for @a cont
312  */
313 static void
314 mysql_plugin_put (void *cls,
315                   const struct GNUNET_HashCode *key,
316                   uint32_t size,
317                   const void *data,
318                   enum GNUNET_BLOCK_Type type,
319                   uint32_t priority,
320                   uint32_t anonymity,
321                   uint32_t replication,
322                   struct GNUNET_TIME_Absolute expiration,
323                   PluginPutCont cont,
324                   void *cont_cls)
325 {
326   struct Plugin *plugin = cls;
327   uint64_t lexpiration = expiration.abs_value_us;
328   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
329                                                UINT64_MAX);
330   struct GNUNET_HashCode vhash;
331   struct GNUNET_MY_QueryParam params_insert[] = {
332     GNUNET_MY_query_param_uint32 (&replication),
333     GNUNET_MY_query_param_uint32 (&type),
334     GNUNET_MY_query_param_uint32 (&priority),
335     GNUNET_MY_query_param_uint32 (&anonymity),
336     GNUNET_MY_query_param_uint64 (&lexpiration),
337     GNUNET_MY_query_param_uint64 (&lrvalue),
338     GNUNET_MY_query_param_auto_from_type (key),
339     GNUNET_MY_query_param_auto_from_type (&vhash),
340     GNUNET_MY_query_param_fixed_size (data, size),
341     GNUNET_MY_query_param_end
342   };
343
344   if (size > MAX_DATUM_SIZE)
345   {
346     GNUNET_break (0);
347     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
348     return;
349   }
350   GNUNET_CRYPTO_hash (data,
351                       size,
352                       &vhash);
353
354   if (GNUNET_OK !=
355       GNUNET_MY_exec_prepared (plugin->mc,
356                                plugin->insert_entry,
357                                params_insert))
358   {
359     cont (cont_cls,
360           key,
361           size,
362           GNUNET_SYSERR,
363           _("MySQL statement run failure"));
364     return;
365   }
366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367               "Inserted value `%s' with size %u into gn090 table\n",
368               GNUNET_h2s (key),
369               (unsigned int) size);
370   if (size > 0)
371     plugin->env->duc (plugin->env->cls,
372                       size);
373   GNUNET_break (GNUNET_NO ==
374                 GNUNET_MY_extract_result (plugin->insert_entry,
375                                           NULL));
376   cont (cont_cls,
377         key,
378         size,
379         GNUNET_OK,
380         NULL);
381 }
382
383
384 /**
385  * Update the priority for a particular key in the datastore.  If
386  * the expiration time in value is different than the time found in
387  * the datastore, the higher value should be kept.  For the
388  * anonymity level, the lower value is to be used.  The specified
389  * priority should be added to the existing priority, ignoring the
390  * priority in value.
391  *
392  * Note that it is possible for multiple values to match this put.
393  * In that case, all of the respective values are updated.
394  *
395  * @param cls our "struct Plugin*"
396  * @param uid unique identifier of the datum
397  * @param delta by how much should the priority
398  *     change?
399  * @param expire new expiration time should be the
400  *     MAX of any existing expiration time and
401  *     this value
402  * @param cont continuation called with success or failure status
403  * @param cons_cls continuation closure
404  */
405 static void
406 mysql_plugin_update (void *cls,
407                      uint64_t uid,
408                      uint32_t delta,
409                      struct GNUNET_TIME_Absolute expire,
410                      PluginUpdateCont cont,
411                      void *cont_cls)
412 {
413   struct Plugin *plugin = cls;
414   uint64_t lexpire = expire.abs_value_us;
415   int ret;
416
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418               "Updating value %llu adding %d to priority and maxing exp at %s\n",
419               (unsigned long long) uid,
420               delta,
421               GNUNET_STRINGS_absolute_time_to_string (expire));
422
423   struct GNUNET_MY_QueryParam params_update[] = {
424     GNUNET_MY_query_param_uint32 (&delta),
425     GNUNET_MY_query_param_uint64 (&lexpire),
426     GNUNET_MY_query_param_uint64 (&lexpire),
427     GNUNET_MY_query_param_uint64 (&uid),
428     GNUNET_MY_query_param_end
429   };
430
431   ret = GNUNET_MY_exec_prepared (plugin->mc,
432                                  plugin->update_entry,
433                                  params_update);
434
435   if (GNUNET_OK != ret)
436   {
437     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
438                 "Failed to update value %llu\n",
439                 (unsigned long long) uid);
440   }
441   else
442   {
443     GNUNET_break (GNUNET_NO ==
444                   GNUNET_MY_extract_result (plugin->update_entry,
445                                             NULL));
446   }
447   cont (cont_cls,
448         ret,
449         NULL);
450 }
451
452
453 /**
454  * Run the given select statement and call 'proc' on the resulting
455  * values (which must be in particular positions).
456  *
457  * @param plugin the plugin handle
458  * @param stmt select statement to run
459  * @param proc function to call on result
460  * @param proc_cls closure for @a proc
461  * @param params_select arguments to initialize stmt
462  */
463 static void
464 execute_select (struct Plugin *plugin,
465                 struct GNUNET_MYSQL_StatementHandle *stmt,
466                 PluginDatumProcessor proc,
467                 void *proc_cls,
468                 struct GNUNET_MY_QueryParam *params_select)
469 {
470   int ret;
471   uint32_t type;
472   uint32_t priority;
473   uint32_t anonymity;
474   uint64_t uid;
475   size_t value_size;
476   void *value;
477   struct GNUNET_HashCode key;
478   struct GNUNET_TIME_Absolute expiration;
479   struct GNUNET_MY_ResultSpec results_select[] = {
480     GNUNET_MY_result_spec_uint32 (&type),
481     GNUNET_MY_result_spec_uint32 (&priority),
482     GNUNET_MY_result_spec_uint32 (&anonymity),
483     GNUNET_MY_result_spec_absolute_time (&expiration),
484     GNUNET_MY_result_spec_auto_from_type (&key),
485     GNUNET_MY_result_spec_variable_size (&value, &value_size),
486     GNUNET_MY_result_spec_uint64 (&uid),
487     GNUNET_MY_result_spec_end
488   };
489
490   ret = GNUNET_MY_exec_prepared (plugin->mc,
491                                  stmt,
492                                  params_select);
493   if (GNUNET_OK != ret)
494   {
495     proc (proc_cls,
496           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
497     return;
498   }
499
500   ret = GNUNET_MY_extract_result (stmt,
501                                   results_select);
502   if (GNUNET_OK != ret)
503   {
504     proc (proc_cls,
505           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
506     return;
507   }
508
509   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
511               (unsigned int) value_size,
512               GNUNET_h2s (&key),
513               (unsigned int) priority,
514               (unsigned int) anonymity,
515               GNUNET_STRINGS_absolute_time_to_string (expiration));
516   GNUNET_assert (value_size < MAX_DATUM_SIZE);
517   GNUNET_break (GNUNET_NO ==
518                 GNUNET_MY_extract_result (stmt,
519                                           NULL));
520   ret = proc (proc_cls,
521               &key,
522               value_size,
523               value,
524               type,
525               priority,
526               anonymity,
527               expiration,
528               uid);
529   GNUNET_MY_cleanup_result (results_select);
530   if (GNUNET_NO == ret)
531   {
532     do_delete_entry (plugin, uid);
533     if (0 != value_size)
534       plugin->env->duc (plugin->env->cls,
535                         - value_size);
536   }
537 }
538
539
540 /**
541  * Get one of the results for a particular key in the datastore.
542  *
543  * @param cls closure
544  * @param offset offset of the result (modulo num-results);
545  *               specific ordering does not matter for the offset
546  * @param key key to match, never NULL
547  * @param vhash hash of the value, maybe NULL (to
548  *        match all values that have the right key).
549  *        Note that for DBlocks there is no difference
550  *        betwen key and vhash, but for other blocks
551  *        there may be!
552  * @param type entries of which type are relevant?
553  *     Use 0 for any type.
554  * @param proc function to call on the matching value,
555  *        with NULL for if no value matches
556  * @param proc_cls closure for @a proc
557  */
558 static void
559 mysql_plugin_get_key (void *cls,
560                       uint64_t offset,
561                       const struct GNUNET_HashCode *key,
562                       const struct GNUNET_HashCode *vhash,
563                       enum GNUNET_BLOCK_Type type,
564                       PluginDatumProcessor proc,
565                       void *proc_cls)
566 {
567   struct Plugin *plugin = cls;
568   int ret;
569   uint64_t total;
570   struct GNUNET_MY_ResultSpec results_get[] = {
571     GNUNET_MY_result_spec_uint64 (&total),
572     GNUNET_MY_result_spec_end
573   };
574
575   total = UINT64_MAX;
576   if (0 != type)
577   {
578     if (NULL != vhash)
579     {
580       struct GNUNET_MY_QueryParam params_get[] = {
581         GNUNET_MY_query_param_auto_from_type (key),
582         GNUNET_MY_query_param_auto_from_type (vhash),
583         GNUNET_MY_query_param_uint32 (&type),
584         GNUNET_MY_query_param_end
585       };
586
587       ret =
588         GNUNET_MY_exec_prepared (plugin->mc,
589                                  plugin->count_entry_by_hash_vhash_and_type,
590                                  params_get);
591       GNUNET_break (GNUNET_OK == ret);
592       if (GNUNET_OK == ret)
593         ret =
594           GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
595                                     results_get);
596       if (GNUNET_OK == ret)
597         GNUNET_break (GNUNET_NO ==
598                       GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
599                                                 NULL));
600     }
601     else
602     {
603       struct GNUNET_MY_QueryParam params_get[] = {
604         GNUNET_MY_query_param_auto_from_type (key),
605         GNUNET_MY_query_param_uint32 (&type),
606         GNUNET_MY_query_param_end
607       };
608
609       ret =
610         GNUNET_MY_exec_prepared (plugin->mc,
611                                  plugin->count_entry_by_hash_and_type,
612                                  params_get);
613       GNUNET_break (GNUNET_OK == ret);
614       if (GNUNET_OK == ret)
615         ret =
616           GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
617                                     results_get);
618       if (GNUNET_OK == ret)
619         GNUNET_break (GNUNET_NO ==
620                       GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
621                                                 NULL));
622     }
623   }
624   else
625   {
626     if (NULL != vhash)
627     {
628       struct GNUNET_MY_QueryParam params_get[] = {
629         GNUNET_MY_query_param_auto_from_type (key),
630         GNUNET_MY_query_param_auto_from_type (vhash),
631         GNUNET_MY_query_param_end
632       };
633
634       ret =
635         GNUNET_MY_exec_prepared (plugin->mc,
636                                  plugin->count_entry_by_hash_and_vhash,
637                                  params_get);
638       GNUNET_break (GNUNET_OK == ret);
639       if (GNUNET_OK == ret)
640         ret =
641           GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
642                                     results_get);
643       if (GNUNET_OK == ret)
644         GNUNET_break (GNUNET_NO ==
645                       GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
646                                                 NULL));
647     }
648     else
649     {
650       struct GNUNET_MY_QueryParam params_get[] = {
651         GNUNET_MY_query_param_auto_from_type (key),
652         GNUNET_MY_query_param_end
653       };
654
655       ret =
656         GNUNET_MY_exec_prepared (plugin->mc,
657                                  plugin->count_entry_by_hash,
658                                  params_get);
659       GNUNET_break (GNUNET_OK == ret);
660       if (GNUNET_OK == ret)
661         ret =
662           GNUNET_MY_extract_result (plugin->count_entry_by_hash,
663                                     results_get);
664       if (GNUNET_OK == ret)
665         GNUNET_break (GNUNET_NO ==
666                       GNUNET_MY_extract_result (plugin->count_entry_by_hash,
667                                                 NULL));
668     }
669   }
670   if ( (GNUNET_OK != ret) ||
671        (0 >= total) )
672   {
673     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
674     return;
675   }
676   offset = offset % total;
677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678               "Obtaining %llu/%lld result for GET `%s'\n",
679               (unsigned long long) offset,
680               (unsigned long long) total,
681               GNUNET_h2s (key));
682   if (type != GNUNET_BLOCK_TYPE_ANY)
683   {
684     if (NULL != vhash)
685     {
686       struct GNUNET_MY_QueryParam params_select[] = {
687         GNUNET_MY_query_param_auto_from_type (key),
688         GNUNET_MY_query_param_auto_from_type (vhash),
689         GNUNET_MY_query_param_uint32 (&type),
690         GNUNET_MY_query_param_uint64 (&offset),
691         GNUNET_MY_query_param_end
692       };
693
694       execute_select (plugin,
695                       plugin->select_entry_by_hash_vhash_and_type,
696                       proc,
697                       proc_cls,
698                       params_select);
699     }
700     else
701     {
702       struct GNUNET_MY_QueryParam params_select[] = {
703         GNUNET_MY_query_param_auto_from_type (key),
704         GNUNET_MY_query_param_uint32 (&type),
705         GNUNET_MY_query_param_uint64 (&offset),
706         GNUNET_MY_query_param_end
707       };
708
709       execute_select (plugin,
710                       plugin->select_entry_by_hash_and_type,
711                       proc,
712                       proc_cls,
713                       params_select);
714     }
715   }
716   else
717   {
718     if (NULL != vhash)
719     {
720       struct GNUNET_MY_QueryParam params_select[] = {
721         GNUNET_MY_query_param_auto_from_type (key),
722         GNUNET_MY_query_param_auto_from_type (vhash),
723         GNUNET_MY_query_param_uint64 (&offset),
724         GNUNET_MY_query_param_end
725       };
726
727       execute_select (plugin,
728                       plugin->select_entry_by_hash_and_vhash,
729                       proc,
730                       proc_cls,
731                       params_select);
732     }
733     else
734     {
735       struct GNUNET_MY_QueryParam params_select[] = {
736         GNUNET_MY_query_param_auto_from_type (key),
737         GNUNET_MY_query_param_uint64 (&offset),
738         GNUNET_MY_query_param_end
739       };
740
741       execute_select (plugin,
742                       plugin->select_entry_by_hash,
743                       proc,
744                       proc_cls,
745                       params_select);
746     }
747   }
748
749 }
750
751
752 /**
753  * Get a zero-anonymity datum from the datastore.
754  *
755  * @param cls our `struct Plugin *`
756  * @param offset offset of the result
757  * @param type entries of which type should be considered?
758  *        Use 0 for any type.
759  * @param proc function to call on a matching value or NULL
760  * @param proc_cls closure for @a proc
761  */
762 static void
763 mysql_plugin_get_zero_anonymity (void *cls,
764                                  uint64_t offset,
765                                  enum GNUNET_BLOCK_Type type,
766                                  PluginDatumProcessor proc,
767                                  void *proc_cls)
768 {
769   struct Plugin *plugin = cls;
770   uint32_t typei = (uint32_t) type;
771   uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
772                                               UINT64_MAX);
773   struct GNUNET_MY_QueryParam params_zero_iter[] = {
774     GNUNET_MY_query_param_uint32 (&typei),
775     GNUNET_MY_query_param_uint64 (&rvalue),
776     GNUNET_MY_query_param_uint32 (&typei),
777     GNUNET_MY_query_param_uint64 (&rvalue),
778     GNUNET_MY_query_param_end
779   };
780
781   execute_select (plugin,
782                   plugin->zero_iter,
783                   proc,
784                   proc_cls,
785                   params_zero_iter);
786 }
787
788
789 /**
790  * Context for #repl_proc() function.
791  */
792 struct ReplCtx
793 {
794
795   /**
796    * Plugin handle.
797    */
798   struct Plugin *plugin;
799
800   /**
801    * Function to call for the result (or the NULL).
802    */
803   PluginDatumProcessor proc;
804
805   /**
806    * Closure for @e proc.
807    */
808   void *proc_cls;
809 };
810
811
812 /**
813  * Wrapper for the processor for #mysql_plugin_get_replication().
814  * Decrements the replication counter and calls the original
815  * iterator.
816  *
817  * @param cls closure
818  * @param key key for the content
819  * @param size number of bytes in @a data
820  * @param data content stored
821  * @param type type of the content
822  * @param priority priority of the content
823  * @param anonymity anonymity-level for the content
824  * @param expiration expiration time for the content
825  * @param uid unique identifier for the datum;
826  *        maybe 0 if no unique identifier is available
827  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
828  *         (continue on call to "next", of course),
829  *         #GNUNET_NO to delete the item and continue (if supported)
830  */
831 static int
832 repl_proc (void *cls,
833            const struct GNUNET_HashCode *key,
834            uint32_t size,
835            const void *data,
836            enum GNUNET_BLOCK_Type type,
837            uint32_t priority,
838            uint32_t anonymity,
839            struct GNUNET_TIME_Absolute expiration,
840            uint64_t uid)
841 {
842   struct ReplCtx *rc = cls;
843   struct Plugin *plugin = rc->plugin;
844   int ret;
845   int iret;
846
847   ret = rc->proc (rc->proc_cls,
848                   key,
849                   size,
850                   data,
851                   type,
852                   priority,
853                   anonymity,
854                   expiration,
855                   uid);
856   if (NULL != key)
857   {
858     struct GNUNET_MY_QueryParam params_proc[] = {
859       GNUNET_MY_query_param_uint64 (&uid),
860       GNUNET_MY_query_param_end
861     };
862
863     iret = GNUNET_MY_exec_prepared (plugin->mc,
864                                     plugin->dec_repl,
865                                     params_proc);
866     if (GNUNET_SYSERR == iret)
867     {
868       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
869                   "Failed to reduce replication counter\n");
870       return GNUNET_SYSERR;
871     }
872   }
873   return ret;
874 }
875
876
877 /**
878  * Get a random item for replication.  Returns a single, not expired,
879  * random item from those with the highest replication counters.  The
880  * item's replication counter is decremented by one IF it was positive
881  * before.  Call @a proc with all values ZERO or NULL if the datastore
882  * is empty.
883  *
884  * @param cls closure
885  * @param proc function to call the value (once only).
886  * @param proc_cls closure for @a proc
887  */
888 static void
889 mysql_plugin_get_replication (void *cls,
890                               PluginDatumProcessor proc,
891                               void *proc_cls)
892 {
893   struct Plugin *plugin = cls;
894   uint64_t rvalue;
895   uint32_t repl;
896   struct ReplCtx rc;
897   struct GNUNET_MY_QueryParam params_get[] = {
898     GNUNET_MY_query_param_end
899   };
900   struct GNUNET_MY_ResultSpec results_get[] = {
901     GNUNET_MY_result_spec_uint32 (&repl),
902     GNUNET_MY_result_spec_end
903   };
904   struct GNUNET_MY_QueryParam params_select[] = {
905     GNUNET_MY_query_param_uint32 (&repl),
906     GNUNET_MY_query_param_uint64 (&rvalue),
907     GNUNET_MY_query_param_uint32 (&repl),
908     GNUNET_MY_query_param_uint64 (&rvalue),
909     GNUNET_MY_query_param_end
910   };
911
912   rc.plugin = plugin;
913   rc.proc = proc;
914   rc.proc_cls = proc_cls;
915
916   if (1 !=
917       GNUNET_MY_exec_prepared (plugin->mc,
918                                plugin->max_repl,
919                                params_get))
920   {
921     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
922     return;
923   }
924
925   if (GNUNET_OK !=
926       GNUNET_MY_extract_result (plugin->max_repl,
927                                 results_get))
928   {
929     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
930     return;
931   }
932   GNUNET_break (GNUNET_NO ==
933                 GNUNET_MY_extract_result (plugin->max_repl,
934                                           NULL));
935   rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
936                                      UINT64_MAX);
937
938   execute_select (plugin,
939                   plugin->select_replication,
940                   &repl_proc,
941                   &rc,
942                   params_select);
943 }
944
945
946 /**
947  * Get all of the keys in the datastore.
948  *
949  * @param cls closure
950  * @param proc function to call on each key
951  * @param proc_cls closure for @a proc
952  */
953 static void
954 mysql_plugin_get_keys (void *cls,
955                        PluginKeyProcessor proc,
956                        void *proc_cls)
957 {
958   struct Plugin *plugin = cls;
959   int ret;
960   MYSQL_STMT *statement;
961   unsigned int cnt;
962   struct GNUNET_HashCode key;
963   struct GNUNET_HashCode last;
964   struct GNUNET_MY_QueryParam params_select[] = {
965     GNUNET_MY_query_param_end
966   };
967   struct GNUNET_MY_ResultSpec results_select[] = {
968     GNUNET_MY_result_spec_auto_from_type (&key),
969     GNUNET_MY_result_spec_end
970   };
971
972   GNUNET_assert (NULL != proc);
973   statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
974   if (GNUNET_OK !=
975       GNUNET_MY_exec_prepared (plugin->mc,
976                                plugin->get_all_keys,
977                                params_select))
978   {
979     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
980                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
981                 "mysql_stmt_execute",
982                 GET_ALL_KEYS,
983                 __FILE__,
984                 __LINE__,
985                 mysql_stmt_error (statement));
986     GNUNET_MYSQL_statements_invalidate (plugin->mc);
987     proc (proc_cls, NULL, 0);
988     return;
989   }
990   memset (&last, 0, sizeof (last)); /* make static analysis happy */
991   ret = GNUNET_YES;
992   cnt = 0;
993   while (ret == GNUNET_YES)
994   {
995     ret = GNUNET_MY_extract_result (plugin->get_all_keys,
996                                     results_select);
997     if (0 != memcmp (&last,
998                      &key,
999                      sizeof (key)))
1000     {
1001       if (0 != cnt)
1002         proc (proc_cls,
1003               &last,
1004               cnt);
1005       cnt = 1;
1006       last = key;
1007     }
1008     else
1009     {
1010       cnt++;
1011     }
1012   }
1013   if (0 != cnt)
1014     proc (proc_cls,
1015           &last,
1016           cnt);
1017   /* finally, let app know we are done */
1018   proc (proc_cls,
1019         NULL,
1020         0);
1021   if (GNUNET_SYSERR == ret)
1022   {
1023     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1024                 _("`%s' failed at %s:%d with error: %s\n"),
1025                 "mysql_stmt_fetch",
1026                 __FILE__,
1027                 __LINE__,
1028                 mysql_stmt_error (statement));
1029     GNUNET_MYSQL_statements_invalidate (plugin->mc);
1030     return;
1031   }
1032 }
1033
1034
1035 /**
1036  * Context for #expi_proc() function.
1037  */
1038 struct ExpiCtx
1039 {
1040
1041   /**
1042    * Plugin handle.
1043    */
1044   struct Plugin *plugin;
1045
1046   /**
1047    * Function to call for the result (or the NULL).
1048    */
1049   PluginDatumProcessor proc;
1050
1051   /**
1052    * Closure for @e proc.
1053    */
1054   void *proc_cls;
1055 };
1056
1057
1058
1059 /**
1060  * Wrapper for the processor for #mysql_plugin_get_expiration().
1061  * If no expired value was found, we do a second query for
1062  * low-priority content.
1063  *
1064  * @param cls closure
1065  * @param key key for the content
1066  * @param size number of bytes in data
1067  * @param data content stored
1068  * @param type type of the content
1069  * @param priority priority of the content
1070  * @param anonymity anonymity-level for the content
1071  * @param expiration expiration time for the content
1072  * @param uid unique identifier for the datum;
1073  *        maybe 0 if no unique identifier is available
1074  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
1075  *         (continue on call to "next", of course),
1076  *         #GNUNET_NO to delete the item and continue (if supported)
1077  */
1078 static int
1079 expi_proc (void *cls,
1080            const struct GNUNET_HashCode *key,
1081            uint32_t size,
1082            const void *data,
1083            enum GNUNET_BLOCK_Type type,
1084            uint32_t priority,
1085            uint32_t anonymity,
1086            struct GNUNET_TIME_Absolute expiration,
1087            uint64_t uid)
1088 {
1089   struct ExpiCtx *rc = cls;
1090   struct Plugin *plugin = rc->plugin;
1091   struct GNUNET_MY_QueryParam params_select[] = {
1092     GNUNET_MY_query_param_end
1093   };
1094
1095   if (NULL == key)
1096   {
1097     execute_select (plugin,
1098                     plugin->select_priority,
1099                     rc->proc,
1100                     rc->proc_cls,
1101                     params_select);
1102     return GNUNET_SYSERR;
1103   }
1104   return rc->proc (rc->proc_cls,
1105                    key,
1106                    size,
1107                    data,
1108                    type,
1109                    priority,
1110                    anonymity,
1111                    expiration,
1112                    uid);
1113 }
1114
1115
1116 /**
1117  * Get a random item for expiration.
1118  * Call @a proc with all values ZERO or NULL if the datastore is empty.
1119  *
1120  * @param cls closure
1121  * @param proc function to call the value (once only).
1122  * @param proc_cls closure for @a proc
1123  */
1124 static void
1125 mysql_plugin_get_expiration (void *cls,
1126                              PluginDatumProcessor proc,
1127                              void *proc_cls)
1128 {
1129   struct Plugin *plugin = cls;
1130   struct GNUNET_TIME_Absolute now;
1131   struct GNUNET_MY_QueryParam params_select[] = {
1132     GNUNET_MY_query_param_absolute_time (&now),
1133     GNUNET_MY_query_param_end
1134   };
1135   struct ExpiCtx rc;
1136
1137   rc.plugin = plugin;
1138   rc.proc = proc;
1139   rc.proc_cls = proc_cls;
1140   now = GNUNET_TIME_absolute_get ();
1141   execute_select (plugin,
1142                   plugin->select_expiration,
1143                   expi_proc,
1144                   &rc,
1145                   params_select);
1146 }
1147
1148
1149 /**
1150  * Drop database.
1151  *
1152  * @param cls the `struct Plugin *`
1153  */
1154 static void
1155 mysql_plugin_drop (void *cls)
1156 {
1157   struct Plugin *plugin = cls;
1158
1159   if (GNUNET_OK !=
1160       GNUNET_MYSQL_statement_run (plugin->mc,
1161                                   "DROP TABLE gn090"))
1162     return;                     /* error */
1163   plugin->env->duc (plugin->env->cls, 0);
1164 }
1165
1166
1167 /**
1168  * Entry point for the plugin.
1169  *
1170  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
1171  * @return our `struct Plugin *`
1172  */
1173 void *
1174 libgnunet_plugin_datastore_mysql_init (void *cls)
1175 {
1176   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1177   struct GNUNET_DATASTORE_PluginFunctions *api;
1178   struct Plugin *plugin;
1179
1180   plugin = GNUNET_new (struct Plugin);
1181   plugin->env = env;
1182   plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
1183                                             "datastore-mysql");
1184   if (NULL == plugin->mc)
1185   {
1186     GNUNET_free (plugin);
1187     return NULL;
1188   }
1189 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
1190 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
1191   if (MRUNS
1192       ("CREATE TABLE IF NOT EXISTS gn090 ("
1193        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1194        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1195        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1196        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1197        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1198        " rvalue BIGINT UNSIGNED NOT NULL,"
1199        " hash BINARY(64) NOT NULL DEFAULT '',"
1200        " vhash BINARY(64) NOT NULL DEFAULT '',"
1201        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
1202        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
1203        " INDEX idx_hash_uid (hash(64),uid),"
1204        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1205        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
1206        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
1207        " INDEX idx_expire (expire),"
1208        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
1209        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
1210       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1211       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1212       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1213       PINIT (plugin->select_entry_by_hash_and_vhash,
1214              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
1215       PINIT (plugin->select_entry_by_hash_and_type,
1216              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1217       PINIT (plugin->select_entry_by_hash_vhash_and_type,
1218              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1219       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
1220       PINIT (plugin->get_size, SELECT_SIZE) ||
1221       PINIT (plugin->count_entry_by_hash_and_vhash,
1222              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1223       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1224       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1225                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1226       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1227       PINIT (plugin->dec_repl, DEC_REPL) ||
1228       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1229       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1230       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1231       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1232       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1233       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1234   {
1235     GNUNET_MYSQL_context_destroy (plugin->mc);
1236     GNUNET_free (plugin);
1237     return NULL;
1238   }
1239 #undef PINIT
1240 #undef MRUNS
1241
1242   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1243   api->cls = plugin;
1244   api->estimate_size = &mysql_plugin_estimate_size;
1245   api->put = &mysql_plugin_put;
1246   api->update = &mysql_plugin_update;
1247   api->get_key = &mysql_plugin_get_key;
1248   api->get_replication = &mysql_plugin_get_replication;
1249   api->get_expiration = &mysql_plugin_get_expiration;
1250   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1251   api->get_keys = &mysql_plugin_get_keys;
1252   api->drop = &mysql_plugin_drop;
1253   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1254                    _("Mysql database running\n"));
1255   return api;
1256 }
1257
1258
1259 /**
1260  * Exit point from the plugin.
1261  *
1262  * @param cls our `struct Plugin *`
1263  * @return always NULL
1264  */
1265 void *
1266 libgnunet_plugin_datastore_mysql_done (void *cls)
1267 {
1268   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1269   struct Plugin *plugin = api->cls;
1270
1271   GNUNET_MYSQL_context_destroy (plugin->mc);
1272   GNUNET_free (plugin);
1273   GNUNET_free (api);
1274   return NULL;
1275 }
1276
1277 /* end of plugin_datastore_mysql.c */