Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  *
28  * NOTE: This db module does NOT work with mysql prior to 4.1 since
29  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
30  * in MyISAM that is causing us grief.  At the time of this writing,
31  * that version is yet to be released.  In anticipation, the code
32  * will use MyISAM with 5.0.46 (and higher).  If you run such a
33  * version, please run "make check" to verify that the MySQL bug
34  * was actually fixed in your version (and if not, change the
35  * code below to use MyISAM for gn071).
36  *
37  * HIGHLIGHTS
38  *
39  * Pros
40  * + On up-to-date hardware where mysql can be used comfortably, this
41  *   module will have better performance than the other db choices
42  *   (according to our tests).
43  * + Its often possible to recover the mysql database from internal
44  *   inconsistencies. The other db choices do not support repair!
45  * Cons
46  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
47  * - Manual setup
48  *
49  * MANUAL SETUP INSTRUCTIONS
50  *
51  * 1) in gnunet.conf, set
52  * @verbatim
53        [datastore]
54        DATABASE = "mysql"
55    @endverbatim
56  * 2) Then access mysql as root,
57  * @verbatim
58      $ mysql -u root -p
59    @endverbatim
60  *    and do the following. [You should replace $USER with the username
61  *    that will be running the gnunetd process].
62  * @verbatim
63       CREATE DATABASE gnunet;
64       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
65          ON gnunet.* TO $USER@localhost;
66       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
67       FLUSH PRIVILEGES;
68    @endverbatim
69  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
70  *    with the following lines
71  * @verbatim
72       [client]
73       user=$USER
74       password=$the_password_you_like
75    @endverbatim
76  *
77  * Thats it. Note that .my.cnf file is a security risk unless its on
78  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
79  * link. Even greater security risk can be achieved by setting no
80  * password for $USER.  Luckily $USER has only priviledges to mess
81  * up GNUnet's tables, nothing else (unless you give him more,
82  * of course).<p>
83  *
84  * 4) Still, perhaps you should briefly try if the DB connection
85  *    works. First, login as $USER. Then use,
86  *
87  * @verbatim
88      $ mysql -u $USER -p $the_password_you_like
89      mysql> use gnunet;
90    @endverbatim
91  *
92  *    If you get the message &quot;Database changed&quot; it probably works.
93  *
94  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
95  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
96  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
97  *     so there may be some additional trouble depending on your mysql setup.]
98  *
99  * REPAIRING TABLES
100  *
101  * - Its probably healthy to check your tables for inconsistencies
102  *   every now and then.
103  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
104  *   databases have been corrupted.
105  * - The tables can be verified/fixed in two ways;
106  *   1) by running mysqlcheck -A, or
107  *   2) by executing (inside of mysql using the GNUnet database):
108  * @verbatim
109      mysql> REPAIR TABLE gn090;
110    @endverbatim
111  *
112  * PROBLEMS?
113  *
114  * If you have problems related to the mysql module, your best
115  * friend is probably the mysql manual. The first thing to check
116  * is that mysql is basically operational, that you can connect
117  * to it, create tables, issue queries etc.
118  */
119
120 #include "platform.h"
121 #include "gnunet_datastore_plugin.h"
122 #include "gnunet_util_lib.h"
123 #include "gnunet_mysql_lib.h"
124 #include "gnunet_my_lib.h"
125
126 #define MAX_DATUM_SIZE 65536
127
128
129 /**
130  * Context for all functions in this plugin.
131  */
132 struct Plugin
133 {
134   /**
135    * Our execution environment.
136    */
137   struct GNUNET_DATASTORE_PluginEnvironment *env;
138
139   /**
140    * Handle to talk to MySQL.
141    */
142   struct GNUNET_MYSQL_Context *mc;
143
144   /**
145    * Prepared statements.
146    */
147 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
148   struct GNUNET_MYSQL_StatementHandle *insert_entry;
149
150 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
151   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
152
153 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
154   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
155
156 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
157   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
158
159 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
160   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
161
162 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
163   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
164
165 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
166   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
167
168 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
169   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
170
171 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
172   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
173
174 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
175   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
176
177 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
178   struct GNUNET_MYSQL_StatementHandle *update_entry;
179
180 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
181   struct GNUNET_MYSQL_StatementHandle *dec_repl;
182
183 #define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
184   struct GNUNET_MYSQL_StatementHandle *get_size;
185
186 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
187    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
188    "WHERE anonLevel=0 AND type=? AND "\
189    "(rvalue >= ? OR"\
190    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
191    "ORDER BY rvalue ASC LIMIT 1"
192   struct GNUNET_MYSQL_StatementHandle *zero_iter;
193
194 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
195   struct GNUNET_MYSQL_StatementHandle *select_expiration;
196
197 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
198   struct GNUNET_MYSQL_StatementHandle *select_priority;
199
200 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
201   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
202   "WHERE repl=? AND "\
203   " (rvalue>=? OR"\
204   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
205   "ORDER BY rvalue ASC "\
206   "LIMIT 1"
207   struct GNUNET_MYSQL_StatementHandle *select_replication;
208
209 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
210   struct GNUNET_MYSQL_StatementHandle *max_repl;
211
212 #define GET_ALL_KEYS "SELECT hash from gn090"
213   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
214
215 };
216
217 #define MAX_PARAM 16
218
219 /**
220  * Delete an entry from the gn090 table.
221  *
222  * @param plugin plugin context
223  * @param uid unique ID of the entry to delete
224  * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
225  */
226 static int
227 do_delete_entry (struct Plugin *plugin,
228                  unsigned long long uid)
229 {
230   int ret;
231   uint64_t uid64 = (uint64_t) uid;
232   struct GNUNET_MY_QueryParam params_delete[] = {
233     GNUNET_MY_query_param_uint64 (&uid64),
234     GNUNET_MY_query_param_end
235   };
236
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238               "Deleting value %llu from gn090 table\n",
239               uid);
240   ret = GNUNET_MY_exec_prepared (plugin->mc,
241                                  plugin->delete_entry_by_uid,
242                                  params_delete);
243   if (ret >= 0)
244   {
245     return GNUNET_OK;
246   }
247   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
248               "Deleting value %llu from gn090 table failed\n",
249               (unsigned long long) uid);
250   return ret;
251 }
252
253
254 /**
255  * Get an estimate of how much space the database is
256  * currently using.
257  *
258  * @param cls our `struct Plugin *`
259  * @return number of bytes used on disk
260  */
261 static void
262 mysql_plugin_estimate_size (void *cls,
263                             unsigned long long *estimate)
264 {
265   struct Plugin *plugin = cls;
266   uint64_t total;
267   int ret;
268   struct GNUNET_MY_QueryParam params_get[] = {
269     GNUNET_MY_query_param_end
270   };
271   struct GNUNET_MY_ResultSpec results_get[] = {
272     GNUNET_MY_result_spec_uint64 (&total),
273     GNUNET_MY_result_spec_end
274   };
275
276   ret = GNUNET_MY_exec_prepared (plugin->mc,
277                                  plugin->get_size,
278                                  params_get);
279   *estimate = 0;
280   total = UINT64_MAX;
281   if ( (GNUNET_OK == ret) &&
282        (GNUNET_OK ==
283         GNUNET_MY_extract_result (plugin->get_size,
284                                   results_get)) )
285   {
286     *estimate = (unsigned long long) total;
287     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
288                 "Size estimate for MySQL payload is %lld\n",
289                 (long long) total);
290     GNUNET_assert (UINT64_MAX != total);
291     GNUNET_break (GNUNET_NO ==
292                   GNUNET_MY_extract_result (plugin->get_size,
293                                             NULL));
294   }
295 }
296
297
298 /**
299  * Store an item in the datastore.
300  *
301  * @param cls closure
302  * @param key key for the item
303  * @param size number of bytes in @a data
304  * @param data content stored
305  * @param type type of the content
306  * @param priority priority of the content
307  * @param anonymity anonymity-level for the content
308  * @param replication replication-level for the content
309  * @param expiration expiration time for the content
310  * @param cont continuation called with success or failure status
311  * @param cont_cls closure for @a cont
312  */
313 static void
314 mysql_plugin_put (void *cls,
315                   const struct GNUNET_HashCode *key,
316                   uint32_t size,
317                   const void *data,
318                   enum GNUNET_BLOCK_Type type,
319                   uint32_t priority,
320                   uint32_t anonymity,
321                   uint32_t replication,
322                   struct GNUNET_TIME_Absolute expiration,
323                   PluginPutCont cont,
324                   void *cont_cls)
325 {
326   struct Plugin *plugin = cls;
327   uint64_t lexpiration = expiration.abs_value_us;
328   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
329                                                UINT64_MAX);
330   struct GNUNET_HashCode vhash;
331   struct GNUNET_MY_QueryParam params_insert[] = {
332     GNUNET_MY_query_param_uint32 (&replication),
333     GNUNET_MY_query_param_uint32 (&type),
334     GNUNET_MY_query_param_uint32 (&priority),
335     GNUNET_MY_query_param_uint32 (&anonymity),
336     GNUNET_MY_query_param_uint64 (&lexpiration),
337     GNUNET_MY_query_param_uint64 (&lrvalue),
338     GNUNET_MY_query_param_auto_from_type (key),
339     GNUNET_MY_query_param_auto_from_type (&vhash),
340     GNUNET_MY_query_param_fixed_size (data, size),
341     GNUNET_MY_query_param_end
342   };
343
344   if (size > MAX_DATUM_SIZE)
345   {
346     GNUNET_break (0);
347     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
348     return;
349   }
350   GNUNET_CRYPTO_hash (data,
351                       size,
352                       &vhash);
353
354   if (GNUNET_OK !=
355       GNUNET_MY_exec_prepared (plugin->mc,
356                                plugin->insert_entry,
357                                params_insert))
358   {
359     cont (cont_cls,
360           key,
361           size,
362           GNUNET_SYSERR,
363           _("MySQL statement run failure"));
364     return;
365   }
366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367               "Inserted value `%s' with size %u into gn090 table\n",
368               GNUNET_h2s (key),
369               (unsigned int) size);
370   if (size > 0)
371     plugin->env->duc (plugin->env->cls,
372                       size);
373   GNUNET_break (GNUNET_NO ==
374                 GNUNET_MY_extract_result (plugin->insert_entry,
375                                           NULL));
376   cont (cont_cls,
377         key,
378         size,
379         GNUNET_OK,
380         NULL);
381 }
382
383
384 /**
385  * Update the priority for a particular key in the datastore.  If
386  * the expiration time in value is different than the time found in
387  * the datastore, the higher value should be kept.  For the
388  * anonymity level, the lower value is to be used.  The specified
389  * priority should be added to the existing priority, ignoring the
390  * priority in value.
391  *
392  * Note that it is possible for multiple values to match this put.
393  * In that case, all of the respective values are updated.
394  *
395  * @param cls our "struct Plugin*"
396  * @param uid unique identifier of the datum
397  * @param delta by how much should the priority
398  *     change?  If priority + delta < 0 the
399  *     priority should be set to 0 (never go
400  *     negative).
401  * @param expire new expiration time should be the
402  *     MAX of any existing expiration time and
403  *     this value
404  * @param cont continuation called with success or failure status
405  * @param cons_cls continuation closure
406  */
407 static void
408 mysql_plugin_update (void *cls,
409                      uint64_t uid,
410                      int delta,
411                      struct GNUNET_TIME_Absolute expire,
412                      PluginUpdateCont cont,
413                      void *cont_cls)
414 {
415   struct Plugin *plugin = cls;
416   uint32_t idelta = (uint32_t) delta;
417   uint64_t lexpire = expire.abs_value_us;
418   int ret;
419
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421               "Updating value %llu adding %d to priority and maxing exp at %s\n",
422               (unsigned long long) uid,
423               delta,
424               GNUNET_STRINGS_absolute_time_to_string (expire));
425
426   struct GNUNET_MY_QueryParam params_update[] = {
427     GNUNET_MY_query_param_uint32 (&idelta),
428     GNUNET_MY_query_param_uint64 (&lexpire),
429     GNUNET_MY_query_param_uint64 (&lexpire),
430     GNUNET_MY_query_param_uint64 (&uid),
431     GNUNET_MY_query_param_end
432   };
433
434   ret = GNUNET_MY_exec_prepared (plugin->mc,
435                                  plugin->update_entry,
436                                  params_update);
437
438   if (GNUNET_OK != ret)
439   {
440     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
441                 "Failed to update value %llu\n",
442                 (unsigned long long) uid);
443   }
444   else
445   {
446     GNUNET_break (GNUNET_NO ==
447                   GNUNET_MY_extract_result (plugin->update_entry,
448                                             NULL));
449   }
450   cont (cont_cls,
451         ret,
452         NULL);
453 }
454
455
456 /**
457  * Run the given select statement and call 'proc' on the resulting
458  * values (which must be in particular positions).
459  *
460  * @param plugin the plugin handle
461  * @param stmt select statement to run
462  * @param proc function to call on result
463  * @param proc_cls closure for @a proc
464  * @param params_select arguments to initialize stmt
465  */
466 static void
467 execute_select (struct Plugin *plugin,
468                 struct GNUNET_MYSQL_StatementHandle *stmt,
469                 PluginDatumProcessor proc,
470                 void *proc_cls,
471                 struct GNUNET_MY_QueryParam *params_select)
472 {
473   int ret;
474   uint32_t type;
475   uint32_t priority;
476   uint32_t anonymity;
477   uint64_t uid;
478   size_t value_size;
479   void *value;
480   struct GNUNET_HashCode key;
481   struct GNUNET_TIME_Absolute expiration;
482   struct GNUNET_MY_ResultSpec results_select[] = {
483     GNUNET_MY_result_spec_uint32 (&type),
484     GNUNET_MY_result_spec_uint32 (&priority),
485     GNUNET_MY_result_spec_uint32 (&anonymity),
486     GNUNET_MY_result_spec_absolute_time (&expiration),
487     GNUNET_MY_result_spec_auto_from_type (&key),
488     GNUNET_MY_result_spec_variable_size (&value, &value_size),
489     GNUNET_MY_result_spec_uint64 (&uid),
490     GNUNET_MY_result_spec_end
491   };
492
493   ret = GNUNET_MY_exec_prepared (plugin->mc,
494                                  stmt,
495                                  params_select);
496   if (GNUNET_OK != ret)
497   {
498     proc (proc_cls,
499           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
500     return;
501   }
502
503   ret = GNUNET_MY_extract_result (stmt,
504                                   results_select);
505   if (GNUNET_OK != ret)
506   {
507     proc (proc_cls,
508           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
509     return;
510   }
511
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
514               (unsigned int) value_size,
515               GNUNET_h2s (&key),
516               (unsigned int) priority,
517               (unsigned int) anonymity,
518               GNUNET_STRINGS_absolute_time_to_string (expiration));
519   GNUNET_assert (value_size < MAX_DATUM_SIZE);
520   GNUNET_break (GNUNET_NO ==
521                 GNUNET_MY_extract_result (stmt,
522                                           NULL));
523   ret = proc (proc_cls,
524               &key,
525               value_size,
526               value,
527               type,
528               priority,
529               anonymity,
530               expiration,
531               uid);
532   GNUNET_MY_cleanup_result (results_select);
533   if (GNUNET_NO == ret)
534   {
535     do_delete_entry (plugin, uid);
536     if (0 != value_size)
537       plugin->env->duc (plugin->env->cls,
538                         - value_size);
539   }
540 }
541
542
543 /**
544  * Get one of the results for a particular key in the datastore.
545  *
546  * @param cls closure
547  * @param offset offset of the result (modulo num-results);
548  *               specific ordering does not matter for the offset
549  * @param key key to match, never NULL
550  * @param vhash hash of the value, maybe NULL (to
551  *        match all values that have the right key).
552  *        Note that for DBlocks there is no difference
553  *        betwen key and vhash, but for other blocks
554  *        there may be!
555  * @param type entries of which type are relevant?
556  *     Use 0 for any type.
557  * @param proc function to call on the matching value,
558  *        with NULL for if no value matches
559  * @param proc_cls closure for @a proc
560  */
561 static void
562 mysql_plugin_get_key (void *cls,
563                       uint64_t offset,
564                       const struct GNUNET_HashCode *key,
565                       const struct GNUNET_HashCode *vhash,
566                       enum GNUNET_BLOCK_Type type,
567                       PluginDatumProcessor proc,
568                       void *proc_cls)
569 {
570   struct Plugin *plugin = cls;
571   int ret;
572   uint64_t total;
573   struct GNUNET_MY_ResultSpec results_get[] = {
574     GNUNET_MY_result_spec_uint64 (&total),
575     GNUNET_MY_result_spec_end
576   };
577
578   total = UINT64_MAX;
579   if (0 != type)
580   {
581     if (NULL != vhash)
582     {
583       struct GNUNET_MY_QueryParam params_get[] = {
584         GNUNET_MY_query_param_auto_from_type (key),
585         GNUNET_MY_query_param_auto_from_type (vhash),
586         GNUNET_MY_query_param_uint32 (&type),
587         GNUNET_MY_query_param_end
588       };
589
590       ret =
591         GNUNET_MY_exec_prepared (plugin->mc,
592                                  plugin->count_entry_by_hash_vhash_and_type,
593                                  params_get);
594       GNUNET_break (GNUNET_OK == ret);
595       if (GNUNET_OK == ret)
596         ret =
597           GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
598                                     results_get);
599       if (GNUNET_OK == ret)
600         GNUNET_break (GNUNET_NO ==
601                       GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
602                                                 NULL));
603     }
604     else
605     {
606       struct GNUNET_MY_QueryParam params_get[] = {
607         GNUNET_MY_query_param_auto_from_type (key),
608         GNUNET_MY_query_param_uint32 (&type),
609         GNUNET_MY_query_param_end
610       };
611
612       ret =
613         GNUNET_MY_exec_prepared (plugin->mc,
614                                  plugin->count_entry_by_hash_and_type,
615                                  params_get);
616       GNUNET_break (GNUNET_OK == ret);
617       if (GNUNET_OK == ret)
618         ret =
619           GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
620                                     results_get);
621       if (GNUNET_OK == ret)
622         GNUNET_break (GNUNET_NO ==
623                       GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
624                                                 NULL));
625     }
626   }
627   else
628   {
629     if (NULL != vhash)
630     {
631       struct GNUNET_MY_QueryParam params_get[] = {
632         GNUNET_MY_query_param_auto_from_type (key),
633         GNUNET_MY_query_param_auto_from_type (vhash),
634         GNUNET_MY_query_param_end
635       };
636
637       ret =
638         GNUNET_MY_exec_prepared (plugin->mc,
639                                  plugin->count_entry_by_hash_and_vhash,
640                                  params_get);
641       GNUNET_break (GNUNET_OK == ret);
642       if (GNUNET_OK == ret)
643         ret =
644           GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
645                                     results_get);
646       if (GNUNET_OK == ret)
647         GNUNET_break (GNUNET_NO ==
648                       GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
649                                                 NULL));
650     }
651     else
652     {
653       struct GNUNET_MY_QueryParam params_get[] = {
654         GNUNET_MY_query_param_auto_from_type (key),
655         GNUNET_MY_query_param_end
656       };
657
658       ret =
659         GNUNET_MY_exec_prepared (plugin->mc,
660                                  plugin->count_entry_by_hash,
661                                  params_get);
662       GNUNET_break (GNUNET_OK == ret);
663       if (GNUNET_OK == ret)
664         ret =
665           GNUNET_MY_extract_result (plugin->count_entry_by_hash,
666                                     results_get);
667       if (GNUNET_OK == ret)
668         GNUNET_break (GNUNET_NO ==
669                       GNUNET_MY_extract_result (plugin->count_entry_by_hash,
670                                                 NULL));
671     }
672   }
673   if ( (GNUNET_OK != ret) ||
674        (0 >= total) )
675   {
676     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
677     return;
678   }
679   offset = offset % total;
680   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
681               "Obtaining %llu/%lld result for GET `%s'\n",
682               (unsigned long long) offset,
683               (unsigned long long) total,
684               GNUNET_h2s (key));
685   if (type != GNUNET_BLOCK_TYPE_ANY)
686   {
687     if (NULL != vhash)
688     {
689       struct GNUNET_MY_QueryParam params_select[] = {
690         GNUNET_MY_query_param_auto_from_type (key),
691         GNUNET_MY_query_param_auto_from_type (vhash),
692         GNUNET_MY_query_param_uint32 (&type),
693         GNUNET_MY_query_param_uint64 (&offset),
694         GNUNET_MY_query_param_end
695       };
696
697       execute_select (plugin,
698                       plugin->select_entry_by_hash_vhash_and_type,
699                       proc,
700                       proc_cls,
701                       params_select);
702     }
703     else
704     {
705       struct GNUNET_MY_QueryParam params_select[] = {
706         GNUNET_MY_query_param_auto_from_type (key),
707         GNUNET_MY_query_param_uint32 (&type),
708         GNUNET_MY_query_param_uint64 (&offset),
709         GNUNET_MY_query_param_end
710       };
711
712       execute_select (plugin,
713                       plugin->select_entry_by_hash_and_type,
714                       proc,
715                       proc_cls,
716                       params_select);
717     }
718   }
719   else
720   {
721     if (NULL != vhash)
722     {
723       struct GNUNET_MY_QueryParam params_select[] = {
724         GNUNET_MY_query_param_auto_from_type (key),
725         GNUNET_MY_query_param_auto_from_type (vhash),
726         GNUNET_MY_query_param_uint64 (&offset),
727         GNUNET_MY_query_param_end
728       };
729
730       execute_select (plugin,
731                       plugin->select_entry_by_hash_and_vhash,
732                       proc,
733                       proc_cls,
734                       params_select);
735     }
736     else
737     {
738       struct GNUNET_MY_QueryParam params_select[] = {
739         GNUNET_MY_query_param_auto_from_type (key),
740         GNUNET_MY_query_param_uint64 (&offset),
741         GNUNET_MY_query_param_end
742       };
743
744       execute_select (plugin,
745                       plugin->select_entry_by_hash,
746                       proc,
747                       proc_cls,
748                       params_select);
749     }
750   }
751
752 }
753
754
755 /**
756  * Get a zero-anonymity datum from the datastore.
757  *
758  * @param cls our `struct Plugin *`
759  * @param offset offset of the result
760  * @param type entries of which type should be considered?
761  *        Use 0 for any type.
762  * @param proc function to call on a matching value or NULL
763  * @param proc_cls closure for @a proc
764  */
765 static void
766 mysql_plugin_get_zero_anonymity (void *cls,
767                                  uint64_t offset,
768                                  enum GNUNET_BLOCK_Type type,
769                                  PluginDatumProcessor proc,
770                                  void *proc_cls)
771 {
772   struct Plugin *plugin = cls;
773   uint32_t typei = (uint32_t) type;
774   uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
775                                               UINT64_MAX);
776   struct GNUNET_MY_QueryParam params_zero_iter[] = {
777     GNUNET_MY_query_param_uint32 (&typei),
778     GNUNET_MY_query_param_uint64 (&rvalue),
779     GNUNET_MY_query_param_uint32 (&typei),
780     GNUNET_MY_query_param_uint64 (&rvalue),
781     GNUNET_MY_query_param_end
782   };
783
784   execute_select (plugin,
785                   plugin->zero_iter,
786                   proc,
787                   proc_cls,
788                   params_zero_iter);
789 }
790
791
792 /**
793  * Context for #repl_proc() function.
794  */
795 struct ReplCtx
796 {
797
798   /**
799    * Plugin handle.
800    */
801   struct Plugin *plugin;
802
803   /**
804    * Function to call for the result (or the NULL).
805    */
806   PluginDatumProcessor proc;
807
808   /**
809    * Closure for @e proc.
810    */
811   void *proc_cls;
812 };
813
814
815 /**
816  * Wrapper for the processor for #mysql_plugin_get_replication().
817  * Decrements the replication counter and calls the original
818  * iterator.
819  *
820  * @param cls closure
821  * @param key key for the content
822  * @param size number of bytes in @a data
823  * @param data content stored
824  * @param type type of the content
825  * @param priority priority of the content
826  * @param anonymity anonymity-level for the content
827  * @param expiration expiration time for the content
828  * @param uid unique identifier for the datum;
829  *        maybe 0 if no unique identifier is available
830  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
831  *         (continue on call to "next", of course),
832  *         #GNUNET_NO to delete the item and continue (if supported)
833  */
834 static int
835 repl_proc (void *cls,
836            const struct GNUNET_HashCode *key,
837            uint32_t size,
838            const void *data,
839            enum GNUNET_BLOCK_Type type,
840            uint32_t priority,
841            uint32_t anonymity,
842            struct GNUNET_TIME_Absolute expiration,
843            uint64_t uid)
844 {
845   struct ReplCtx *rc = cls;
846   struct Plugin *plugin = rc->plugin;
847   int ret;
848   int iret;
849
850   ret = rc->proc (rc->proc_cls,
851                   key,
852                   size,
853                   data,
854                   type,
855                   priority,
856                   anonymity,
857                   expiration,
858                   uid);
859   if (NULL != key)
860   {
861     struct GNUNET_MY_QueryParam params_proc[] = {
862       GNUNET_MY_query_param_uint64 (&uid),
863       GNUNET_MY_query_param_end
864     };
865
866     iret = GNUNET_MY_exec_prepared (plugin->mc,
867                                     plugin->dec_repl,
868                                     params_proc);
869     if (GNUNET_SYSERR == iret)
870     {
871       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
872                   "Failed to reduce replication counter\n");
873       return GNUNET_SYSERR;
874     }
875   }
876   return ret;
877 }
878
879
880 /**
881  * Get a random item for replication.  Returns a single, not expired,
882  * random item from those with the highest replication counters.  The
883  * item's replication counter is decremented by one IF it was positive
884  * before.  Call @a proc with all values ZERO or NULL if the datastore
885  * is empty.
886  *
887  * @param cls closure
888  * @param proc function to call the value (once only).
889  * @param proc_cls closure for @a proc
890  */
891 static void
892 mysql_plugin_get_replication (void *cls,
893                               PluginDatumProcessor proc,
894                               void *proc_cls)
895 {
896   struct Plugin *plugin = cls;
897   uint64_t rvalue;
898   uint32_t repl;
899   struct ReplCtx rc;
900   struct GNUNET_MY_QueryParam params_get[] = {
901     GNUNET_MY_query_param_end
902   };
903   struct GNUNET_MY_ResultSpec results_get[] = {
904     GNUNET_MY_result_spec_uint32 (&repl),
905     GNUNET_MY_result_spec_end
906   };
907   struct GNUNET_MY_QueryParam params_select[] = {
908     GNUNET_MY_query_param_uint32 (&repl),
909     GNUNET_MY_query_param_uint64 (&rvalue),
910     GNUNET_MY_query_param_uint32 (&repl),
911     GNUNET_MY_query_param_uint64 (&rvalue),
912     GNUNET_MY_query_param_end
913   };
914
915   rc.plugin = plugin;
916   rc.proc = proc;
917   rc.proc_cls = proc_cls;
918
919   if (1 !=
920       GNUNET_MY_exec_prepared (plugin->mc,
921                                plugin->max_repl,
922                                params_get))
923   {
924     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
925     return;
926   }
927
928   if (GNUNET_OK !=
929       GNUNET_MY_extract_result (plugin->max_repl,
930                                 results_get))
931   {
932     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
933     return;
934   }
935   GNUNET_break (GNUNET_NO ==
936                 GNUNET_MY_extract_result (plugin->max_repl,
937                                           NULL));
938   rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
939                                      UINT64_MAX);
940
941   execute_select (plugin,
942                   plugin->select_replication,
943                   &repl_proc,
944                   &rc,
945                   params_select);
946 }
947
948
949 /**
950  * Get all of the keys in the datastore.
951  *
952  * @param cls closure
953  * @param proc function to call on each key
954  * @param proc_cls closure for @a proc
955  */
956 static void
957 mysql_plugin_get_keys (void *cls,
958                        PluginKeyProcessor proc,
959                        void *proc_cls)
960 {
961   struct Plugin *plugin = cls;
962   int ret;
963   MYSQL_STMT *statement;
964   unsigned int cnt;
965   struct GNUNET_HashCode key;
966   struct GNUNET_HashCode last;
967   struct GNUNET_MY_QueryParam params_select[] = {
968     GNUNET_MY_query_param_end
969   };
970   struct GNUNET_MY_ResultSpec results_select[] = {
971     GNUNET_MY_result_spec_auto_from_type (&key),
972     GNUNET_MY_result_spec_end
973   };
974
975   GNUNET_assert (NULL != proc);
976   statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
977   if (GNUNET_OK !=
978       GNUNET_MY_exec_prepared (plugin->mc,
979                                plugin->get_all_keys,
980                                params_select))
981   {
982     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
983                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
984                 "mysql_stmt_execute",
985                 GET_ALL_KEYS,
986                 __FILE__,
987                 __LINE__,
988                 mysql_stmt_error (statement));
989     GNUNET_MYSQL_statements_invalidate (plugin->mc);
990     proc (proc_cls, NULL, 0);
991     return;
992   }
993   memset (&last, 0, sizeof (last)); /* make static analysis happy */
994   ret = GNUNET_YES;
995   cnt = 0;
996   while (ret == GNUNET_YES)
997   {
998     ret = GNUNET_MY_extract_result (plugin->get_all_keys,
999                                     results_select);
1000     if (0 != memcmp (&last,
1001                      &key,
1002                      sizeof (key)))
1003     {
1004       if (0 != cnt)
1005         proc (proc_cls,
1006               &last,
1007               cnt);
1008       cnt = 1;
1009       last = key;
1010     }
1011     else
1012     {
1013       cnt++;
1014     }
1015   }
1016   if (0 != cnt)
1017     proc (proc_cls,
1018           &last,
1019           cnt);
1020   /* finally, let app know we are done */
1021   proc (proc_cls,
1022         NULL,
1023         0);
1024   if (GNUNET_SYSERR == ret)
1025   {
1026     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1027                 _("`%s' failed at %s:%d with error: %s\n"),
1028                 "mysql_stmt_fetch",
1029                 __FILE__,
1030                 __LINE__,
1031                 mysql_stmt_error (statement));
1032     GNUNET_MYSQL_statements_invalidate (plugin->mc);
1033     return;
1034   }
1035 }
1036
1037
1038 /**
1039  * Context for #expi_proc() function.
1040  */
1041 struct ExpiCtx
1042 {
1043
1044   /**
1045    * Plugin handle.
1046    */
1047   struct Plugin *plugin;
1048
1049   /**
1050    * Function to call for the result (or the NULL).
1051    */
1052   PluginDatumProcessor proc;
1053
1054   /**
1055    * Closure for @e proc.
1056    */
1057   void *proc_cls;
1058 };
1059
1060
1061
1062 /**
1063  * Wrapper for the processor for #mysql_plugin_get_expiration().
1064  * If no expired value was found, we do a second query for
1065  * low-priority content.
1066  *
1067  * @param cls closure
1068  * @param key key for the content
1069  * @param size number of bytes in data
1070  * @param data content stored
1071  * @param type type of the content
1072  * @param priority priority of the content
1073  * @param anonymity anonymity-level for the content
1074  * @param expiration expiration time for the content
1075  * @param uid unique identifier for the datum;
1076  *        maybe 0 if no unique identifier is available
1077  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
1078  *         (continue on call to "next", of course),
1079  *         #GNUNET_NO to delete the item and continue (if supported)
1080  */
1081 static int
1082 expi_proc (void *cls,
1083            const struct GNUNET_HashCode *key,
1084            uint32_t size,
1085            const void *data,
1086            enum GNUNET_BLOCK_Type type,
1087            uint32_t priority,
1088            uint32_t anonymity,
1089            struct GNUNET_TIME_Absolute expiration,
1090            uint64_t uid)
1091 {
1092   struct ExpiCtx *rc = cls;
1093   struct Plugin *plugin = rc->plugin;
1094   struct GNUNET_MY_QueryParam params_select[] = {
1095     GNUNET_MY_query_param_end
1096   };
1097
1098   if (NULL == key)
1099   {
1100     execute_select (plugin,
1101                     plugin->select_priority,
1102                     rc->proc,
1103                     rc->proc_cls,
1104                     params_select);
1105     return GNUNET_SYSERR;
1106   }
1107   return rc->proc (rc->proc_cls,
1108                    key,
1109                    size,
1110                    data,
1111                    type,
1112                    priority,
1113                    anonymity,
1114                    expiration,
1115                    uid);
1116 }
1117
1118
1119 /**
1120  * Get a random item for expiration.
1121  * Call @a proc with all values ZERO or NULL if the datastore is empty.
1122  *
1123  * @param cls closure
1124  * @param proc function to call the value (once only).
1125  * @param proc_cls closure for @a proc
1126  */
1127 static void
1128 mysql_plugin_get_expiration (void *cls,
1129                              PluginDatumProcessor proc,
1130                              void *proc_cls)
1131 {
1132   struct Plugin *plugin = cls;
1133   struct GNUNET_TIME_Absolute now;
1134   struct GNUNET_MY_QueryParam params_select[] = {
1135     GNUNET_MY_query_param_absolute_time (&now),
1136     GNUNET_MY_query_param_end
1137   };
1138   struct ExpiCtx rc;
1139
1140   rc.plugin = plugin;
1141   rc.proc = proc;
1142   rc.proc_cls = proc_cls;
1143   now = GNUNET_TIME_absolute_get ();
1144   execute_select (plugin,
1145                   plugin->select_expiration,
1146                   expi_proc,
1147                   &rc,
1148                   params_select);
1149 }
1150
1151
1152 /**
1153  * Drop database.
1154  *
1155  * @param cls the `struct Plugin *`
1156  */
1157 static void
1158 mysql_plugin_drop (void *cls)
1159 {
1160   struct Plugin *plugin = cls;
1161
1162   if (GNUNET_OK !=
1163       GNUNET_MYSQL_statement_run (plugin->mc,
1164                                   "DROP TABLE gn090"))
1165     return;                     /* error */
1166   plugin->env->duc (plugin->env->cls, 0);
1167 }
1168
1169
1170 /**
1171  * Entry point for the plugin.
1172  *
1173  * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
1174  * @return our `struct Plugin *`
1175  */
1176 void *
1177 libgnunet_plugin_datastore_mysql_init (void *cls)
1178 {
1179   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1180   struct GNUNET_DATASTORE_PluginFunctions *api;
1181   struct Plugin *plugin;
1182
1183   plugin = GNUNET_new (struct Plugin);
1184   plugin->env = env;
1185   plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
1186                                             "datastore-mysql");
1187   if (NULL == plugin->mc)
1188   {
1189     GNUNET_free (plugin);
1190     return NULL;
1191   }
1192 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
1193 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
1194   if (MRUNS
1195       ("CREATE TABLE IF NOT EXISTS gn090 ("
1196        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1197        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1198        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1199        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1200        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1201        " rvalue BIGINT UNSIGNED NOT NULL,"
1202        " hash BINARY(64) NOT NULL DEFAULT '',"
1203        " vhash BINARY(64) NOT NULL DEFAULT '',"
1204        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
1205        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
1206        " INDEX idx_hash_uid (hash(64),uid),"
1207        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1208        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
1209        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
1210        " INDEX idx_expire (expire),"
1211        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
1212        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
1213       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1214       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1215       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1216       PINIT (plugin->select_entry_by_hash_and_vhash,
1217              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
1218       PINIT (plugin->select_entry_by_hash_and_type,
1219              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1220       PINIT (plugin->select_entry_by_hash_vhash_and_type,
1221              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1222       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
1223       PINIT (plugin->get_size, SELECT_SIZE) ||
1224       PINIT (plugin->count_entry_by_hash_and_vhash,
1225              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1226       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1227       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1228                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1229       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1230       PINIT (plugin->dec_repl, DEC_REPL) ||
1231       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1232       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1233       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1234       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1235       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1236       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1237   {
1238     GNUNET_MYSQL_context_destroy (plugin->mc);
1239     GNUNET_free (plugin);
1240     return NULL;
1241   }
1242 #undef PINIT
1243 #undef MRUNS
1244
1245   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1246   api->cls = plugin;
1247   api->estimate_size = &mysql_plugin_estimate_size;
1248   api->put = &mysql_plugin_put;
1249   api->update = &mysql_plugin_update;
1250   api->get_key = &mysql_plugin_get_key;
1251   api->get_replication = &mysql_plugin_get_replication;
1252   api->get_expiration = &mysql_plugin_get_expiration;
1253   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1254   api->get_keys = &mysql_plugin_get_keys;
1255   api->drop = &mysql_plugin_drop;
1256   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1257                    _("Mysql database running\n"));
1258   return api;
1259 }
1260
1261
1262 /**
1263  * Exit point from the plugin.
1264  *
1265  * @param cls our `struct Plugin *`
1266  * @return always NULL
1267  */
1268 void *
1269 libgnunet_plugin_datastore_mysql_done (void *cls)
1270 {
1271   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1272   struct Plugin *plugin = api->cls;
1273
1274   GNUNET_MYSQL_context_destroy (plugin->mc);
1275   GNUNET_free (plugin);
1276   GNUNET_free (api);
1277   return NULL;
1278 }
1279
1280 /* end of plugin_datastore_mysql.c */