09dbdc5b6206a2f77052fbc18c73a2ae82611d82
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  * @author Christophe Genevey
27  *
28  * NOTE: This db module does NOT work with mysql prior to 4.1 since
29  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
30  * in MyISAM that is causing us grief.  At the time of this writing,
31  * that version is yet to be released.  In anticipation, the code
32  * will use MyISAM with 5.0.46 (and higher).  If you run such a
33  * version, please run "make check" to verify that the MySQL bug
34  * was actually fixed in your version (and if not, change the
35  * code below to use MyISAM for gn071).
36  *
37  * HIGHLIGHTS
38  *
39  * Pros
40  * + On up-to-date hardware where mysql can be used comfortably, this
41  *   module will have better performance than the other db choices
42  *   (according to our tests).
43  * + Its often possible to recover the mysql database from internal
44  *   inconsistencies. The other db choices do not support repair!
45  * Cons
46  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
47  * - Manual setup
48  *
49  * MANUAL SETUP INSTRUCTIONS
50  *
51  * 1) in gnunet.conf, set
52  * @verbatim
53        [datastore]
54        DATABASE = "mysql"
55    @endverbatim
56  * 2) Then access mysql as root,
57  * @verbatim
58      $ mysql -u root -p
59    @endverbatim
60  *    and do the following. [You should replace $USER with the username
61  *    that will be running the gnunetd process].
62  * @verbatim
63       CREATE DATABASE gnunet;
64       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
65          ON gnunet.* TO $USER@localhost;
66       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
67       FLUSH PRIVILEGES;
68    @endverbatim
69  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
70  *    with the following lines
71  * @verbatim
72       [client]
73       user=$USER
74       password=$the_password_you_like
75    @endverbatim
76  *
77  * Thats it. Note that .my.cnf file is a security risk unless its on
78  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
79  * link. Even greater security risk can be achieved by setting no
80  * password for $USER.  Luckily $USER has only priviledges to mess
81  * up GNUnet's tables, nothing else (unless you give him more,
82  * of course).<p>
83  *
84  * 4) Still, perhaps you should briefly try if the DB connection
85  *    works. First, login as $USER. Then use,
86  *
87  * @verbatim
88      $ mysql -u $USER -p $the_password_you_like
89      mysql> use gnunet;
90    @endverbatim
91  *
92  *    If you get the message &quot;Database changed&quot; it probably works.
93  *
94  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
95  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
96  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
97  *     so there may be some additional trouble depending on your mysql setup.]
98  *
99  * REPAIRING TABLES
100  *
101  * - Its probably healthy to check your tables for inconsistencies
102  *   every now and then.
103  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
104  *   databases have been corrupted.
105  * - The tables can be verified/fixed in two ways;
106  *   1) by running mysqlcheck -A, or
107  *   2) by executing (inside of mysql using the GNUnet database):
108  * @verbatim
109      mysql> REPAIR TABLE gn090;
110    @endverbatim
111  *
112  * PROBLEMS?
113  *
114  * If you have problems related to the mysql module, your best
115  * friend is probably the mysql manual. The first thing to check
116  * is that mysql is basically operational, that you can connect
117  * to it, create tables, issue queries etc.
118  */
119
120 #include "platform.h"
121 #include "gnunet_datastore_plugin.h"
122 #include "gnunet_util_lib.h"
123 #include "gnunet_mysql_lib.h"
124 #include "gnunet_my_lib.h"
125
126 #define MAX_DATUM_SIZE 65536
127
128
129 /**
130  * Context for all functions in this plugin.
131  */
132 struct Plugin
133 {
134   /**
135    * Our execution environment.
136    */
137   struct GNUNET_DATASTORE_PluginEnvironment *env;
138
139   /**
140    * Handle to talk to MySQL.
141    */
142   struct GNUNET_MYSQL_Context *mc;
143
144   /**
145    * Prepared statements.
146    */
147 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
148   struct GNUNET_MYSQL_StatementHandle *insert_entry;
149
150 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
151   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
152
153 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
154   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
155
156 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
157   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
158
159 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
160   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
161
162 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
163   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
164
165 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
166   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
167
168 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
169   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
170
171 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
172   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
173
174 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
175   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
176
177 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
178   struct GNUNET_MYSQL_StatementHandle *update_entry;
179
180 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
181   struct GNUNET_MYSQL_StatementHandle *dec_repl;
182
183 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
184   struct GNUNET_MYSQL_StatementHandle *get_size;
185
186 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
187    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
188    "WHERE anonLevel=0 AND type=? AND "\
189    "(rvalue >= ? OR"\
190    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
191    "ORDER BY rvalue ASC LIMIT 1"
192   struct GNUNET_MYSQL_StatementHandle *zero_iter;
193
194 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
195   struct GNUNET_MYSQL_StatementHandle *select_expiration;
196
197 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
198   struct GNUNET_MYSQL_StatementHandle *select_priority;
199
200 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
201   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
202   "WHERE repl=? AND "\
203   " (rvalue>=? OR"\
204   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
205   "ORDER BY rvalue ASC "\
206   "LIMIT 1"
207   struct GNUNET_MYSQL_StatementHandle *select_replication;
208
209 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
210   struct GNUNET_MYSQL_StatementHandle *max_repl;
211
212 #define GET_ALL_KEYS "SELECT hash from gn090"
213   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
214
215 };
216
217 #define MAX_PARAM 16
218
219 /**
220  * Delete an entry from the gn090 table.
221  *
222  * @param plugin plugin context
223  * @param uid unique ID of the entry to delete
224  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
225  */
226 static int
227 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
228 {
229   int ret;
230   uint64_t uid64 = (uint64_t) uid;
231
232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
233               "Deleting value %llu from gn090 table\n",
234               uid);
235
236   struct GNUNET_MY_QueryParam params_delete[] = {
237     GNUNET_MY_query_param_uint64 (&uid64),
238     GNUNET_MY_query_param_end
239   };
240
241   ret = GNUNET_MY_exec_prepared (plugin->mc,
242                                  plugin->delete_entry_by_uid,
243                                  params_delete);
244   if (ret >= 0)
245   {
246     return GNUNET_OK;
247   }
248   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
249               "Deleting value %llu from gn090 table failed\n",
250               uid);
251   return ret;
252 }
253
254
255 /**
256  * Get an estimate of how much space the database is
257  * currently using.
258  *
259  * @param cls our "struct Plugin *"
260  * @return number of bytes used on disk
261  */
262 static void
263 mysql_plugin_estimate_size (void *cls,
264                             unsigned long long *estimate)
265 {
266   struct Plugin *plugin = cls;
267   uint64_t total;
268   struct GNUNET_MY_QueryParam params_get[] = {
269     GNUNET_MY_query_param_end
270   };
271
272   struct GNUNET_MY_ResultSpec results_get[] = {
273     GNUNET_MY_result_spec_uint64 (&total),
274     GNUNET_MY_result_spec_end
275   };
276   int ret;
277
278   ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->get_size, params_get);
279   if (GNUNET_OK == ret)
280     {
281       if (GNUNET_OK == GNUNET_MY_extract_result (plugin->get_size, results_get))
282       {
283         *estimate = (unsigned long long)total;
284       }
285     }
286   else
287     *estimate = 0;
288 }
289
290
291 /**
292  * Store an item in the datastore.
293  *
294  * @param cls closure
295  * @param key key for the item
296  * @param size number of bytes in data
297  * @param data content stored
298  * @param type type of the content
299  * @param priority priority of the content
300  * @param anonymity anonymity-level for the content
301  * @param replication replication-level for the content
302  * @param expiration expiration time for the content
303  * @param cont continuation called with success or failure status
304  * @param cont_cls continuation closure
305  */
306 static void
307 mysql_plugin_put (void *cls,
308                   const struct GNUNET_HashCode *key,
309                   uint32_t size,
310                   const void *data,
311                   enum GNUNET_BLOCK_Type type,
312                   uint32_t priority,
313                   uint32_t anonymity,
314                   uint32_t replication,
315                   struct GNUNET_TIME_Absolute expiration,
316                   PluginPutCont cont,
317                   void *cont_cls)
318 {
319   struct Plugin *plugin = cls;
320   unsigned int irepl = replication;
321   unsigned int ipriority = priority;
322   unsigned int ianonymity = anonymity;
323   uint64_t lexpiration = expiration.abs_value_us;
324   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
325                                                UINT64_MAX);
326   unsigned long lsize;
327   struct GNUNET_HashCode vhash;
328   struct GNUNET_MY_QueryParam params_insert[] = {
329     GNUNET_MY_query_param_uint32 (&irepl),
330     GNUNET_MY_query_param_uint32 (&type),
331     GNUNET_MY_query_param_uint32 (&ipriority),
332     GNUNET_MY_query_param_uint32 (&ianonymity),
333     GNUNET_MY_query_param_uint64 (&lexpiration),
334     GNUNET_MY_query_param_uint64 (&lrvalue),
335     GNUNET_MY_query_param_auto_from_type (key),
336     GNUNET_MY_query_param_auto_from_type (&vhash),
337     GNUNET_MY_query_param_fixed_size (data, lsize),
338     GNUNET_MY_query_param_end
339   };
340
341   if (size > MAX_DATUM_SIZE)
342   {
343     GNUNET_break (0);
344     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
345     return;
346   }
347   lsize = size;
348   GNUNET_CRYPTO_hash (data,
349                       size,
350                       &vhash);
351
352   if (GNUNET_OK !=
353       GNUNET_MY_exec_prepared (plugin->mc,
354                                plugin->insert_entry,
355                                params_insert))
356   {
357     cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure"));
358     return;
359   }
360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361               "Inserted value `%s' with size %u into gn090 table\n",
362               GNUNET_h2s (key), (unsigned int) size);
363   if (size > 0)
364     plugin->env->duc (plugin->env->cls, size);
365   cont (cont_cls, key, size, GNUNET_OK, NULL);
366 }
367
368
369 /**
370  * Update the priority for a particular key in the datastore.  If
371  * the expiration time in value is different than the time found in
372  * the datastore, the higher value should be kept.  For the
373  * anonymity level, the lower value is to be used.  The specified
374  * priority should be added to the existing priority, ignoring the
375  * priority in value.
376  *
377  * Note that it is possible for multiple values to match this put.
378  * In that case, all of the respective values are updated.
379  *
380  * @param cls our "struct Plugin*"
381  * @param uid unique identifier of the datum
382  * @param delta by how much should the priority
383  *     change?  If priority + delta < 0 the
384  *     priority should be set to 0 (never go
385  *     negative).
386  * @param expire new expiration time should be the
387  *     MAX of any existing expiration time and
388  *     this value
389  * @param cont continuation called with success or failure status
390  * @param cons_cls continuation closure
391  */
392 static void
393 mysql_plugin_update (void *cls, uint64_t uid, int delta,
394                      struct GNUNET_TIME_Absolute expire,
395                      PluginUpdateCont cont, void *cont_cls)
396 {
397   struct Plugin *plugin = cls;
398   uint32_t delta1 = (uint32_t)delta;
399   unsigned long long vkey = uid;
400 //  unsigned long long lexpire = expire.abs_value_us;
401   uint64_t lexpire = expire.abs_value_us;
402   int ret;
403
404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
405               "Updating value %llu adding %d to priority and maxing exp at %s\n",
406               vkey, delta,
407               GNUNET_STRINGS_absolute_time_to_string (expire));
408 /*  ret =
409     GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
410                                          MYSQL_TYPE_LONG, &delta, GNUNET_NO,
411                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
412                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
413                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
414 */
415   struct GNUNET_MY_QueryParam params_update[] = {
416     GNUNET_MY_query_param_uint32 (&delta1),
417     GNUNET_MY_query_param_uint64 (&lexpire),
418     GNUNET_MY_query_param_uint64 (&lexpire),
419     GNUNET_MY_query_param_uint64 (&uid),
420     GNUNET_MY_query_param_end
421   };
422
423   ret = GNUNET_MY_exec_prepared (plugin->mc,
424                                  plugin->update_entry,
425                                  params_update);
426
427   if (ret != GNUNET_OK)
428   {
429     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
430                 vkey);
431   }
432   cont (cont_cls, ret, NULL);
433 }
434
435
436 /**
437  * Run the given select statement and call 'proc' on the resulting
438  * values (which must be in particular positions).
439  *
440  * @param plugin the plugin handle
441  * @param stmt select statement to run
442  * @param proc function to call on result
443  * @param proc_cls closure for proc
444  * @param params_select arguments to initialize stmt
445  */
446 static void
447 execute_select (struct Plugin *plugin,
448                 struct GNUNET_MYSQL_StatementHandle *stmt,
449                 PluginDatumProcessor proc,
450                 void *proc_cls,
451                 struct GNUNET_MY_QueryParam *params_select)
452 {
453   int ret;
454   uint32_t type;
455   uint32_t priority;
456   uint32_t anonymity;
457   uint64_t uid;
458   size_t value_size;
459   void *value;
460   struct GNUNET_HashCode key;
461   struct GNUNET_TIME_Absolute expiration;
462   struct GNUNET_MY_ResultSpec results_select[] = {
463     GNUNET_MY_result_spec_uint32 (&type),
464     GNUNET_MY_result_spec_uint32 (&priority),
465     GNUNET_MY_result_spec_uint32 (&anonymity),
466     GNUNET_MY_result_spec_absolute_time (&expiration),
467     GNUNET_MY_result_spec_auto_from_type (&key),
468     GNUNET_MY_result_spec_variable_size (&value, &value_size),
469     GNUNET_MY_result_spec_uint64 (&uid),
470     GNUNET_MY_result_spec_end
471   };
472
473   ret = GNUNET_MY_exec_prepared (plugin->mc,
474                                  stmt,
475                                  params_select);
476   if (GNUNET_OK != ret)
477   {
478     proc (proc_cls,
479           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
480     return;
481   }
482
483   ret = GNUNET_MY_extract_result (stmt,
484                                   results_select);
485   if (ret <= 0)
486   {
487     proc (proc_cls,
488           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
489     return;
490   }
491
492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
493               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
494               (unsigned int) value_size,
495               GNUNET_h2s (&key),
496               (unsigned int) priority,
497               (unsigned int) anonymity,
498               GNUNET_STRINGS_absolute_time_to_string (expiration));
499   GNUNET_assert (value_size < MAX_DATUM_SIZE);
500   ret = proc (proc_cls,
501               &key,
502               value_size,
503               value,
504               type,
505               priority,
506               anonymity,
507               expiration,
508               uid);
509   if (ret == GNUNET_NO)
510   {
511     do_delete_entry (plugin, uid);
512     if (0 != value_size)
513       plugin->env->duc (plugin->env->cls,
514                         - value_size);
515   }
516 }
517
518
519 /**
520  * Get one of the results for a particular key in the datastore.
521  *
522  * @param cls closure
523  * @param offset offset of the result (modulo num-results);
524  *               specific ordering does not matter for the offset
525  * @param key key to match, never NULL
526  * @param vhash hash of the value, maybe NULL (to
527  *        match all values that have the right key).
528  *        Note that for DBlocks there is no difference
529  *        betwen key and vhash, but for other blocks
530  *        there may be!
531  * @param type entries of which type are relevant?
532  *     Use 0 for any type.
533  * @param proc function to call on the matching value,
534  *        with NULL for if no value matches
535  * @param proc_cls closure for @a proc
536  */
537 static void
538 mysql_plugin_get_key (void *cls,
539                       uint64_t offset,
540                       const struct GNUNET_HashCode *key,
541                       const struct GNUNET_HashCode *vhash,
542                       enum GNUNET_BLOCK_Type type,
543                       PluginDatumProcessor proc,
544                       void *proc_cls)
545 {
546   struct Plugin *plugin = cls;
547   int ret;
548   uint64_t total;
549   unsigned long hashSize;
550   unsigned long hashSize2;
551
552   total = -1;
553   struct GNUNET_MY_ResultSpec results_get[] = {
554     GNUNET_MY_result_spec_uint64 (&total),
555     GNUNET_MY_result_spec_end
556   };
557
558   if (type != 0)
559   {
560     if (vhash != NULL)
561     {
562       struct GNUNET_MY_QueryParam params_get[] = {
563         GNUNET_MY_query_param_fixed_size (key, hashSize),
564         GNUNET_MY_query_param_fixed_size (vhash, hashSize2),
565         GNUNET_MY_query_param_uint32 (&type),
566         GNUNET_MY_query_param_end
567       };
568
569       ret =
570         GNUNET_MY_exec_prepared (plugin->mc,
571                                  plugin->count_entry_by_hash_vhash_and_type,
572                                  params_get);
573       ret =
574         GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
575                                   results_get);
576     }
577     else
578     {
579       struct GNUNET_MY_QueryParam params_get[] = {
580         GNUNET_MY_query_param_fixed_size (key, hashSize),
581         GNUNET_MY_query_param_uint32 (&type),
582         GNUNET_MY_query_param_end
583       };
584
585       ret =
586         GNUNET_MY_exec_prepared (plugin->mc,
587                                  plugin->count_entry_by_hash_and_type,
588                                  params_get);
589       ret =
590         GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
591                                   results_get);
592     }
593   }
594   else
595   {
596     if (vhash != NULL)
597     {
598       struct GNUNET_MY_QueryParam params_get[] = {
599         GNUNET_MY_query_param_fixed_size (key, hashSize),
600         GNUNET_MY_query_param_fixed_size (vhash, hashSize2),
601         GNUNET_MY_query_param_end
602       };
603
604       ret =
605         GNUNET_MY_exec_prepared (plugin->mc,
606                                  plugin->count_entry_by_hash_and_vhash,
607                                  params_get);
608       ret =
609         GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
610                                   results_get);
611     }
612     else
613     {
614       struct GNUNET_MY_QueryParam params_get[] = {
615         GNUNET_MY_query_param_fixed_size (key, hashSize),
616         GNUNET_MY_query_param_end
617       };
618
619       ret =
620         GNUNET_MY_exec_prepared (plugin->mc,
621                                  plugin->count_entry_by_hash,
622                                  params_get);
623       ret =
624         GNUNET_MY_extract_result (plugin->count_entry_by_hash,
625                                   results_get);
626     }
627   }
628   if ((ret != GNUNET_OK) || (0 >= total))
629   {
630     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
631     return;
632   }
633   offset = offset % total;
634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635               "Obtaining %llu/%lld result for GET `%s'\n",
636               (unsigned long long) offset,
637               (unsigned long long) total,
638               GNUNET_h2s (key));
639   if (type != GNUNET_BLOCK_TYPE_ANY)
640   {
641     if (NULL != vhash)
642     {
643       struct GNUNET_MY_QueryParam params_select[] = {
644         GNUNET_MY_query_param_auto_from_type (key),
645         GNUNET_MY_query_param_auto_from_type (vhash),
646         GNUNET_MY_query_param_uint32 (&type),
647         GNUNET_MY_query_param_uint64 (&offset),
648         GNUNET_MY_query_param_end
649       };
650
651       execute_select (plugin,
652                       plugin->select_entry_by_hash_vhash_and_type,
653                       proc, proc_cls,
654                       params_select);
655     }
656     else
657     {
658       struct GNUNET_MY_QueryParam params_select[] = {
659         GNUNET_MY_query_param_auto_from_type (key),
660         GNUNET_MY_query_param_uint32 (&type),
661         GNUNET_MY_query_param_uint64 (&offset),
662         GNUNET_MY_query_param_end
663       };
664
665       execute_select (plugin,
666                       plugin->select_entry_by_hash_and_type,
667                       proc,  proc_cls,
668                       params_select);
669     }
670   }
671   else
672   {
673     if (NULL != vhash)
674     {
675       struct GNUNET_MY_QueryParam params_select[] = {
676         GNUNET_MY_query_param_auto_from_type (key),
677         GNUNET_MY_query_param_auto_from_type (vhash),
678         GNUNET_MY_query_param_uint64 (&offset),
679         GNUNET_MY_query_param_end
680       };
681
682       execute_select (plugin,
683                       plugin->select_entry_by_hash_and_vhash,
684                       proc, proc_cls,
685                       params_select);
686     }
687     else
688     {
689       struct GNUNET_MY_QueryParam params_select[] = {
690         GNUNET_MY_query_param_auto_from_type (key),
691         GNUNET_MY_query_param_uint64 (&offset),
692         GNUNET_MY_query_param_end
693       };
694
695       execute_select (plugin,
696                       plugin->select_entry_by_hash,
697                       proc, proc_cls,
698                       params_select);
699     }
700   }
701 }
702
703
704 /**
705  * Get a zero-anonymity datum from the datastore.
706  *
707  * @param cls our "struct Plugin*"
708  * @param offset offset of the result
709  * @param type entries of which type should be considered?
710  *        Use 0 for any type.
711  * @param proc function to call on a matching value or NULL
712  * @param proc_cls closure for iter
713  */
714 static void
715 mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
716                                  enum GNUNET_BLOCK_Type type,
717                                  PluginDatumProcessor proc, void *proc_cls)
718 {
719   struct Plugin *plugin = cls;
720   uint32_t typei = (uint32_t) type;
721   uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
722                                               UINT64_MAX);
723   struct GNUNET_MY_QueryParam params_zero_iter[] = {
724     GNUNET_MY_query_param_uint32 (&typei),
725     GNUNET_MY_query_param_uint64 (&rvalue),
726     GNUNET_MY_query_param_uint32 (&typei),
727     GNUNET_MY_query_param_uint64 (&rvalue),
728     GNUNET_MY_query_param_end
729   };
730
731   execute_select (plugin, plugin->zero_iter,
732                   proc, proc_cls,
733                   params_zero_iter);
734 }
735
736
737 /**
738  * Context for #repl_proc() function.
739  */
740 struct ReplCtx
741 {
742
743   /**
744    * Plugin handle.
745    */
746   struct Plugin *plugin;
747
748   /**
749    * Function to call for the result (or the NULL).
750    */
751   PluginDatumProcessor proc;
752
753   /**
754    * Closure for @e proc.
755    */
756   void *proc_cls;
757 };
758
759
760 /**
761  * Wrapper for the processor for 'mysql_plugin_get_replication'.
762  * Decrements the replication counter and calls the original
763  * iterator.
764  *
765  * @param cls closure
766  * @param key key for the content
767  * @param size number of bytes in data
768  * @param data content stored
769  * @param type type of the content
770  * @param priority priority of the content
771  * @param anonymity anonymity-level for the content
772  * @param expiration expiration time for the content
773  * @param uid unique identifier for the datum;
774  *        maybe 0 if no unique identifier is available
775  *
776  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
777  *         (continue on call to "next", of course),
778  *         GNUNET_NO to delete the item and continue (if supported)
779  */
780 static int
781 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
782            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
783            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
784            uint64_t uid)
785 {
786   struct ReplCtx *rc = cls;
787   struct Plugin *plugin = rc->plugin;
788   int ret;
789   int iret;
790
791   ret =
792       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
793                 expiration, uid);
794   if (NULL != key)
795   {
796       struct GNUNET_MY_QueryParam params_proc[] = {
797         GNUNET_MY_query_param_uint64 (&uid),
798         GNUNET_MY_query_param_end
799       };
800
801       iret =
802       GNUNET_MY_exec_prepared (plugin->mc,
803                                plugin->dec_repl,
804                                params_proc);
805     if (iret == GNUNET_SYSERR)
806     {
807       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
808                   "Failed to reduce replication counter\n");
809       return GNUNET_SYSERR;
810     }
811   }
812   return ret;
813 }
814
815
816 /**
817  * Get a random item for replication.  Returns a single, not expired,
818  * random item from those with the highest replication counters.  The
819  * item's replication counter is decremented by one IF it was positive
820  * before.  Call 'proc' with all values ZERO or NULL if the datastore
821  * is empty.
822  *
823  * @param cls closure
824  * @param proc function to call the value (once only).
825  * @param proc_cls closure for proc
826  */
827 static void
828 mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
829                               void *proc_cls)
830 {
831   struct Plugin *plugin = cls;
832   struct ReplCtx rc;
833   unsigned long long rvalue;
834   //unsigned long repl;
835   uint32_t repl;
836   MYSQL_BIND results;
837
838   rc.plugin = plugin;
839   rc.proc = proc;
840   rc.proc_cls = proc_cls;
841   memset (&results, 0, sizeof (results));
842   results.buffer_type = MYSQL_TYPE_LONG;
843   results.buffer = &repl;
844   results.is_unsigned = GNUNET_YES;
845
846   struct GNUNET_MY_QueryParam params_get[] = {
847     GNUNET_MY_query_param_end
848   };
849
850   struct GNUNET_MY_ResultSpec results_get[] = {
851     GNUNET_MY_result_spec_uint32 (&repl),
852     GNUNET_MY_result_spec_end
853   };
854 /*
855   if (1 !=
856       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
857   {
858     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
859     return;
860   }
861 */
862   if (1 !=
863       GNUNET_MY_exec_prepared (plugin->mc, plugin->max_repl, params_get))
864   {
865     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
866     return;
867   }
868
869   if (1 !=
870       GNUNET_MY_extract_result (plugin->max_repl, results_get))
871   {
872       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
873       return;
874   }
875
876   rvalue =
877       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
878                                                      UINT64_MAX);
879   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
880                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
881                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
882                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
883
884 }
885
886
887 /**
888  * Get all of the keys in the datastore.
889  *
890  * @param cls closure
891  * @param proc function to call on each key
892  * @param proc_cls closure for proc
893  */
894 static void
895 mysql_plugin_get_keys (void *cls,
896                         PluginKeyProcessor proc,
897                         void *proc_cls)
898 {
899   struct Plugin *plugin = cls;
900 //  const char *query = "SELECT hash FROM gn090";
901   char *query = "SELECT hash FROM gn090";
902   int ret;
903   MYSQL_STMT *statement;
904   struct GNUNET_MYSQL_StatementHandle *statements_handle_select = NULL;
905
906
907   struct GNUNET_HashCode key;
908 //  MYSQL_BIND cbind[1];
909 //  unsigned long length;
910
911   statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
912
913
914   statements_handle_select = GNUNET_MYSQL_statement_prepare (plugin->mc,
915                                                              query);
916 /*
917   if (statement == NULL)
918   {
919     GNUNET_MYSQL_statements_invalidate (plugin->mc);
920     proc (proc_cls, NULL, 0);
921     return;
922   }
923
924   if (mysql_stmt_prepare (statement, query, strlen (query)))
925   {
926     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
927                      _("Failed to prepare statement `%s'\n"), query);
928     GNUNET_MYSQL_statements_invalidate (plugin->mc);
929     proc (proc_cls, NULL, 0);
930     return;
931   }
932 */
933   GNUNET_assert (proc != NULL);
934
935   struct GNUNET_MY_QueryParam params_select[] = {
936     GNUNET_MY_query_param_end
937   };
938
939   struct GNUNET_MY_ResultSpec results_select[] = {
940     GNUNET_MY_result_spec_auto_from_type (&key),
941     GNUNET_MY_result_spec_end
942   };
943
944   if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
945                               statements_handle_select,
946                               params_select))
947   {
948     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
949                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
950                 "mysql_stmt_execute", query, __FILE__, __LINE__,
951                 mysql_stmt_error (statement));
952     GNUNET_MYSQL_statements_invalidate (plugin->mc);
953     proc (proc_cls, NULL, 0);
954     return;
955   }
956
957   ret = GNUNET_MY_extract_result (statements_handle_select,
958                                   results_select);
959 /*  if (mysql_stmt_execute (statement))
960   {
961     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
962                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
963                 "mysql_stmt_execute", query, __FILE__, __LINE__,
964                 mysql_stmt_error (statement));
965     GNUNET_MYSQL_statements_invalidate (plugin->mc);
966     proc (proc_cls, NULL, 0);
967     return;
968   }
969   memset (cbind, 0, sizeof (cbind));
970   cbind[0].buffer_type = MYSQL_TYPE_BLOB;
971   cbind[0].buffer = &key;
972   cbind[0].buffer_length = sizeof (key);
973   cbind[0].length = &length;
974   cbind[0].is_unsigned = GNUNET_NO;
975   if (mysql_stmt_bind_result (statement, cbind))
976   {
977     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
978                 _("`%s' failed at %s:%d with error: %s\n"),
979                 "mysql_stmt_bind_result", __FILE__, __LINE__,
980                 mysql_stmt_error (statement));
981     GNUNET_MYSQL_statements_invalidate (plugin->mc);
982     proc (proc_cls, NULL, 0);
983     return;
984   }
985   while (0 == (ret = mysql_stmt_fetch (statement)))
986   {
987     if (sizeof (struct GNUNET_HashCode) == length)
988       proc (proc_cls, &key, 1);
989   }
990   proc (proc_cls, NULL, 0);
991 */
992   if (ret != MYSQL_NO_DATA)
993   {
994     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
995                 _("`%s' failed at %s:%d with error: %s\n"),
996                      "mysql_stmt_fetch", __FILE__, __LINE__,
997                      mysql_stmt_error (statement));
998     GNUNET_MYSQL_statements_invalidate (plugin->mc);
999     return;
1000   }
1001
1002   mysql_stmt_reset (statement);
1003 }
1004
1005
1006 /**
1007  * Context for 'expi_proc' function.
1008  */
1009 struct ExpiCtx
1010 {
1011
1012   /**
1013    * Plugin handle.
1014    */
1015   struct Plugin *plugin;
1016
1017   /**
1018    * Function to call for the result (or the NULL).
1019    */
1020   PluginDatumProcessor proc;
1021
1022   /**
1023    * Closure for proc.
1024    */
1025   void *proc_cls;
1026 };
1027
1028
1029
1030 /**
1031  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
1032  * If no expired value was found, we do a second query for
1033  * low-priority content.
1034  *
1035  * @param cls closure
1036  * @param key key for the content
1037  * @param size number of bytes in data
1038  * @param data content stored
1039  * @param type type of the content
1040  * @param priority priority of the content
1041  * @param anonymity anonymity-level for the content
1042  * @param expiration expiration time for the content
1043  * @param uid unique identifier for the datum;
1044  *        maybe 0 if no unique identifier is available
1045  *
1046  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1047  *         (continue on call to "next", of course),
1048  *         GNUNET_NO to delete the item and continue (if supported)
1049  */
1050 static int
1051 expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
1052            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
1053            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
1054            uint64_t uid)
1055 {
1056   struct ExpiCtx *rc = cls;
1057   struct Plugin *plugin = rc->plugin;
1058
1059   if (NULL == key)
1060   {
1061     execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
1062                     -1);
1063     return GNUNET_SYSERR;
1064   }
1065   return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
1066                    expiration, uid);
1067 }
1068
1069
1070 /**
1071  * Get a random item for expiration.
1072  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1073  *
1074  * @param cls closure
1075  * @param proc function to call the value (once only).
1076  * @param proc_cls closure for proc
1077  */
1078 static void
1079 mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
1080                              void *proc_cls)
1081 {
1082   struct Plugin *plugin = cls;
1083   long long nt;
1084   struct ExpiCtx rc;
1085
1086   rc.plugin = plugin;
1087   rc.proc = proc;
1088   rc.proc_cls = proc_cls;
1089   nt = (long long) GNUNET_TIME_absolute_get ().abs_value_us;
1090   execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
1091                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
1092
1093 }
1094
1095
1096 /**
1097  * Drop database.
1098  *
1099  * @param cls the "struct Plugin*"
1100  */
1101 static void
1102 mysql_plugin_drop (void *cls)
1103 {
1104   struct Plugin *plugin = cls;
1105
1106   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
1107     return;                     /* error */
1108   plugin->env->duc (plugin->env->cls, 0);
1109 }
1110
1111
1112 /**
1113  * Entry point for the plugin.
1114  *
1115  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1116  * @return our "struct Plugin*"
1117  */
1118 void *
1119 libgnunet_plugin_datastore_mysql_init (void *cls)
1120 {
1121   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1122   struct GNUNET_DATASTORE_PluginFunctions *api;
1123   struct Plugin *plugin;
1124
1125   plugin = GNUNET_new (struct Plugin);
1126   plugin->env = env;
1127   plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
1128   if (NULL == plugin->mc)
1129   {
1130     GNUNET_free (plugin);
1131     return NULL;
1132   }
1133 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
1134 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
1135   if (MRUNS
1136       ("CREATE TABLE IF NOT EXISTS gn090 ("
1137        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1138        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1139        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1140        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1141        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1142        " rvalue BIGINT UNSIGNED NOT NULL,"
1143        " hash BINARY(64) NOT NULL DEFAULT '',"
1144        " vhash BINARY(64) NOT NULL DEFAULT '',"
1145        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
1146        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
1147        " INDEX idx_hash_uid (hash(64),uid),"
1148        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1149        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
1150        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
1151        " INDEX idx_expire (expire),"
1152        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
1153        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
1154       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1155       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1156       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1157       PINIT (plugin->select_entry_by_hash_and_vhash,
1158              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
1159       PINIT (plugin->select_entry_by_hash_and_type,
1160              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1161       PINIT (plugin->select_entry_by_hash_vhash_and_type,
1162              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1163       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
1164       PINIT (plugin->get_size, SELECT_SIZE) ||
1165       PINIT (plugin->count_entry_by_hash_and_vhash,
1166              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1167       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1168       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1169                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1170       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1171       PINIT (plugin->dec_repl, DEC_REPL) ||
1172       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1173       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1174       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1175       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1176       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1177       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1178   {
1179     GNUNET_MYSQL_context_destroy (plugin->mc);
1180     GNUNET_free (plugin);
1181     return NULL;
1182   }
1183 #undef PINIT
1184 #undef MRUNS
1185
1186   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1187   api->cls = plugin;
1188   api->estimate_size = &mysql_plugin_estimate_size;
1189   api->put = &mysql_plugin_put;
1190   api->update = &mysql_plugin_update;
1191   api->get_key = &mysql_plugin_get_key;
1192   api->get_replication = &mysql_plugin_get_replication;
1193   api->get_expiration = &mysql_plugin_get_expiration;
1194   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1195   api->get_keys = &mysql_plugin_get_keys;
1196   api->drop = &mysql_plugin_drop;
1197   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1198                    _("Mysql database running\n"));
1199   return api;
1200 }
1201
1202
1203 /**
1204  * Exit point from the plugin.
1205  * @param cls our "struct Plugin*"
1206  * @return always NULL
1207  */
1208 void *
1209 libgnunet_plugin_datastore_mysql_done (void *cls)
1210 {
1211   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1212   struct Plugin *plugin = api->cls;
1213
1214   GNUNET_MYSQL_context_destroy (plugin->mc);
1215   GNUNET_free (plugin);
1216   GNUNET_free (api);
1217   return NULL;
1218 }
1219
1220 /* end of plugin_datastore_mysql.c */