fix #4546
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include "gnunet_mysql_lib.h"
123
124
125 #define MAX_DATUM_SIZE 65536
126
127
128 /**
129  * Context for all functions in this plugin.
130  */
131 struct Plugin
132 {
133   /**
134    * Our execution environment.
135    */
136   struct GNUNET_DATASTORE_PluginEnvironment *env;
137
138   /**
139    * Handle to talk to MySQL.
140    */
141   struct GNUNET_MYSQL_Context *mc;
142
143   /**
144    * Prepared statements.
145    */
146 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
147   struct GNUNET_MYSQL_StatementHandle *insert_entry;
148
149 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
150   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
151
152 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
153   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
154
155 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
156   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
157
158 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
159   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
160
161 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
162   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
163
164 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
165   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
166
167 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
168   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
169
170 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
171   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
172
173 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
174   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
175
176 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
177   struct GNUNET_MYSQL_StatementHandle *update_entry;
178
179 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
180   struct GNUNET_MYSQL_StatementHandle *dec_repl;
181
182 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
183   struct GNUNET_MYSQL_StatementHandle *get_size;
184
185 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
186    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
187    "WHERE anonLevel=0 AND type=? AND "\
188    "(rvalue >= ? OR"\
189    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
190    "ORDER BY rvalue ASC LIMIT 1"
191   struct GNUNET_MYSQL_StatementHandle *zero_iter;
192
193 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
194   struct GNUNET_MYSQL_StatementHandle *select_expiration;
195
196 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
197   struct GNUNET_MYSQL_StatementHandle *select_priority;
198
199 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
200   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
201   "WHERE repl=? AND "\
202   " (rvalue>=? OR"\
203   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
204   "ORDER BY rvalue ASC "\
205   "LIMIT 1"
206   struct GNUNET_MYSQL_StatementHandle *select_replication;
207
208 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
209   struct GNUNET_MYSQL_StatementHandle *max_repl;
210
211 #define GET_ALL_KEYS "SELECT hash from gn090"
212   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
213
214 };
215
216
217 /**
218  * Delete an entry from the gn090 table.
219  *
220  * @param plugin plugin context
221  * @param uid unique ID of the entry to delete
222  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
223  */
224 static int
225 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
226 {
227   int ret;
228
229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
230               uid);
231   ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
232                                              plugin->delete_entry_by_uid, NULL,
233                                              MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
234   if (ret >= 0)
235     return GNUNET_OK;
236   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
237               "Deleting value %llu from gn090 table failed\n", uid);
238   return ret;
239 }
240
241
242 /**
243  * Get an estimate of how much space the database is
244  * currently using.
245  *
246  * @param cls our "struct Plugin *"
247  * @return number of bytes used on disk
248  */
249 static void
250 mysql_plugin_estimate_size (void *cls, unsigned long long *estimate)
251 {
252   struct Plugin *plugin = cls;
253   MYSQL_BIND cbind[1];
254   long long total;
255
256   if (NULL == estimate)
257     return;
258   memset (cbind, 0, sizeof (cbind));
259   total = 0;
260   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
261   cbind[0].buffer = &total;
262   cbind[0].is_unsigned = GNUNET_NO;
263   if (GNUNET_OK ==
264       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
265     *estimate = total;
266   else
267     *estimate = 0;
268 }
269
270
271 /**
272  * Store an item in the datastore.
273  *
274  * @param cls closure
275  * @param key key for the item
276  * @param size number of bytes in data
277  * @param data content stored
278  * @param type type of the content
279  * @param priority priority of the content
280  * @param anonymity anonymity-level for the content
281  * @param replication replication-level for the content
282  * @param expiration expiration time for the content
283  * @param cont continuation called with success or failure status
284  * @param cont_cls continuation closure
285  */
286 static void
287 mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
288                   const void *data, enum GNUNET_BLOCK_Type type,
289                   uint32_t priority, uint32_t anonymity, uint32_t replication,
290                   struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
291                   void *cont_cls)
292 {
293   struct Plugin *plugin = cls;
294   unsigned int irepl = replication;
295   unsigned int ipriority = priority;
296   unsigned int ianonymity = anonymity;
297   unsigned long long lexpiration = expiration.abs_value_us;
298   unsigned long long lrvalue =
299       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
300                                                      UINT64_MAX);
301   unsigned long hashSize;
302   unsigned long hashSize2;
303   unsigned long lsize;
304   struct GNUNET_HashCode vhash;
305
306   if (size > MAX_DATUM_SIZE)
307   {
308     GNUNET_break (0);
309     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
310     return;
311   }
312   hashSize = sizeof (struct GNUNET_HashCode);
313   hashSize2 = sizeof (struct GNUNET_HashCode);
314   lsize = size;
315   GNUNET_CRYPTO_hash (data, size, &vhash);
316   if (GNUNET_OK !=
317       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
318                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
319                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
320                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
321                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
322                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
323                               MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
324                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
325                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
326                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
327   {
328     cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure"));
329     return;
330   }
331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
332               "Inserted value `%s' with size %u into gn090 table\n",
333               GNUNET_h2s (key), (unsigned int) size);
334   if (size > 0)
335     plugin->env->duc (plugin->env->cls, size);
336   cont (cont_cls, key, size, GNUNET_OK, NULL);
337 }
338
339
340 /**
341  * Update the priority for a particular key in the datastore.  If
342  * the expiration time in value is different than the time found in
343  * the datastore, the higher value should be kept.  For the
344  * anonymity level, the lower value is to be used.  The specified
345  * priority should be added to the existing priority, ignoring the
346  * priority in value.
347  *
348  * Note that it is possible for multiple values to match this put.
349  * In that case, all of the respective values are updated.
350  *
351  * @param cls our "struct Plugin*"
352  * @param uid unique identifier of the datum
353  * @param delta by how much should the priority
354  *     change?  If priority + delta < 0 the
355  *     priority should be set to 0 (never go
356  *     negative).
357  * @param expire new expiration time should be the
358  *     MAX of any existing expiration time and
359  *     this value
360  * @param cont continuation called with success or failure status
361  * @param cons_cls continuation closure
362  */
363 static void
364 mysql_plugin_update (void *cls, uint64_t uid, int delta,
365                      struct GNUNET_TIME_Absolute expire,
366                      PluginUpdateCont cont, void *cont_cls)
367 {
368   struct Plugin *plugin = cls;
369   unsigned long long vkey = uid;
370   unsigned long long lexpire = expire.abs_value_us;
371   int ret;
372
373   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
374               "Updating value %llu adding %d to priority and maxing exp at %s\n",
375               vkey, delta,
376               GNUNET_STRINGS_absolute_time_to_string (expire));
377   ret =
378     GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
379                                          MYSQL_TYPE_LONG, &delta, GNUNET_NO,
380                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
381                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
382                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
383   if (ret != GNUNET_OK)
384   {
385     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
386                 vkey);
387   }
388   cont (cont_cls, ret, NULL);
389 }
390
391
392 /**
393  * Run the given select statement and call 'proc' on the resulting
394  * values (which must be in particular positions).
395  *
396  * @param plugin the plugin handle
397  * @param stmt select statement to run
398  * @param proc function to call on result
399  * @param proc_cls closure for proc
400  * @param ... arguments to initialize stmt
401  */
402 static void
403 execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
404                 PluginDatumProcessor proc, void *proc_cls, ...)
405 {
406   va_list ap;
407   int ret;
408   unsigned int type;
409   unsigned int priority;
410   unsigned int anonymity;
411   unsigned long long exp;
412   unsigned long hashSize;
413   unsigned long size;
414   unsigned long long uid;
415   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
416   struct GNUNET_HashCode key;
417   struct GNUNET_TIME_Absolute expiration;
418   MYSQL_BIND rbind[7];
419
420   hashSize = sizeof (struct GNUNET_HashCode);
421   memset (rbind, 0, sizeof (rbind));
422   rbind[0].buffer_type = MYSQL_TYPE_LONG;
423   rbind[0].buffer = &type;
424   rbind[0].is_unsigned = 1;
425   rbind[1].buffer_type = MYSQL_TYPE_LONG;
426   rbind[1].buffer = &priority;
427   rbind[1].is_unsigned = 1;
428   rbind[2].buffer_type = MYSQL_TYPE_LONG;
429   rbind[2].buffer = &anonymity;
430   rbind[2].is_unsigned = 1;
431   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
432   rbind[3].buffer = &exp;
433   rbind[3].is_unsigned = 1;
434   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
435   rbind[4].buffer = &key;
436   rbind[4].buffer_length = hashSize;
437   rbind[4].length = &hashSize;
438   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
439   rbind[5].buffer = value;
440   rbind[5].buffer_length = size = sizeof (value);
441   rbind[5].length = &size;
442   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
443   rbind[6].buffer = &uid;
444   rbind[6].is_unsigned = 1;
445
446   va_start (ap, proc_cls);
447   ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
448   va_end (ap);
449   if (ret <= 0)
450   {
451     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
452     return;
453   }
454   GNUNET_assert (size <= sizeof (value));
455   if ((rbind[4].buffer_length != sizeof (struct GNUNET_HashCode)) ||
456       (hashSize != sizeof (struct GNUNET_HashCode)))
457   {
458     GNUNET_break (0);
459     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
460     return;
461   }
462   expiration.abs_value_us = exp;
463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
465               (unsigned int) size, GNUNET_h2s (&key),
466               priority, anonymity,
467               GNUNET_STRINGS_absolute_time_to_string (expiration));
468   GNUNET_assert (size < MAX_DATUM_SIZE);
469   ret =
470       proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
471             uid);
472   if (ret == GNUNET_NO)
473   {
474     do_delete_entry (plugin, uid);
475     if (size != 0)
476       plugin->env->duc (plugin->env->cls, -size);
477   }
478 }
479
480
481
482 /**
483  * Get one of the results for a particular key in the datastore.
484  *
485  * @param cls closure
486  * @param offset offset of the result (modulo num-results);
487  *               specific ordering does not matter for the offset
488  * @param key key to match, never NULL
489  * @param vhash hash of the value, maybe NULL (to
490  *        match all values that have the right key).
491  *        Note that for DBlocks there is no difference
492  *        betwen key and vhash, but for other blocks
493  *        there may be!
494  * @param type entries of which type are relevant?
495  *     Use 0 for any type.
496  * @param proc function to call on the matching value,
497  *        with NULL for if no value matches
498  * @param proc_cls closure for proc
499  */
500 static void
501 mysql_plugin_get_key (void *cls, uint64_t offset, const struct GNUNET_HashCode * key,
502                       const struct GNUNET_HashCode * vhash,
503                       enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
504                       void *proc_cls)
505 {
506   struct Plugin *plugin = cls;
507   int ret;
508   MYSQL_BIND cbind[1];
509   long long total;
510   unsigned long hashSize;
511   unsigned long hashSize2;
512   unsigned long long off;
513
514   GNUNET_assert (key != NULL);
515   GNUNET_assert (NULL != proc);
516   hashSize = sizeof (struct GNUNET_HashCode);
517   hashSize2 = sizeof (struct GNUNET_HashCode);
518   memset (cbind, 0, sizeof (cbind));
519   total = -1;
520   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
521   cbind[0].buffer = &total;
522   cbind[0].is_unsigned = GNUNET_NO;
523   if (type != 0)
524   {
525     if (vhash != NULL)
526     {
527       ret =
528         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
529                                          plugin->
530                                          count_entry_by_hash_vhash_and_type, 1,
531                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
532                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
533                                          hashSize2, &hashSize2, MYSQL_TYPE_LONG,
534                                          &type, GNUNET_YES, -1);
535     }
536     else
537     {
538       ret =
539         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
540                                          plugin->count_entry_by_hash_and_type,
541                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
542                                          hashSize, &hashSize, MYSQL_TYPE_LONG,
543                                          &type, GNUNET_YES, -1);
544     }
545   }
546   else
547   {
548     if (vhash != NULL)
549     {
550       ret =
551         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
552                                          plugin->count_entry_by_hash_and_vhash,
553                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
554                                          hashSize, &hashSize, MYSQL_TYPE_BLOB,
555                                          vhash, hashSize2, &hashSize2, -1);
556
557     }
558     else
559     {
560       ret =
561         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
562                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
563                                          &hashSize, -1);
564     }
565   }
566   if ((ret != GNUNET_OK) || (0 >= total))
567   {
568     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
569     return;
570   }
571   offset = offset % total;
572   off = (unsigned long long) offset;
573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574               "Obtaining %llu/%lld result for GET `%s'\n", off, total,
575               GNUNET_h2s (key));
576   if (type != GNUNET_BLOCK_TYPE_ANY)
577   {
578     if (NULL != vhash)
579     {
580       execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
581                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
582                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
583                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
584                       &off, GNUNET_YES, -1);
585     }
586     else
587     {
588       execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
589                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
590                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
591                       &off, GNUNET_YES, -1);
592     }
593   }
594   else
595   {
596     if (NULL != vhash)
597     {
598       execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
599                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
600                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
601                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
602     }
603     else
604     {
605       execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
606                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
607                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
608     }
609   }
610 }
611
612
613 /**
614  * Get a zero-anonymity datum from the datastore.
615  *
616  * @param cls our "struct Plugin*"
617  * @param offset offset of the result
618  * @param type entries of which type should be considered?
619  *        Use 0 for any type.
620  * @param proc function to call on a matching value or NULL
621  * @param proc_cls closure for iter
622  */
623 static void
624 mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
625                                  enum GNUNET_BLOCK_Type type,
626                                  PluginDatumProcessor proc, void *proc_cls)
627 {
628   struct Plugin *plugin = cls;
629   unsigned long long rvalue =
630       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
631                                                      UINT64_MAX);
632
633   execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
634                   &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
635                   MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
636                   &rvalue, GNUNET_YES, -1);
637 }
638
639
640 /**
641  * Context for 'repl_proc' function.
642  */
643 struct ReplCtx
644 {
645
646   /**
647    * Plugin handle.
648    */
649   struct Plugin *plugin;
650
651   /**
652    * Function to call for the result (or the NULL).
653    */
654   PluginDatumProcessor proc;
655
656   /**
657    * Closure for proc.
658    */
659   void *proc_cls;
660 };
661
662
663 /**
664  * Wrapper for the processor for 'mysql_plugin_get_replication'.
665  * Decrements the replication counter and calls the original
666  * iterator.
667  *
668  * @param cls closure
669  * @param key key for the content
670  * @param size number of bytes in data
671  * @param data content stored
672  * @param type type of the content
673  * @param priority priority of the content
674  * @param anonymity anonymity-level for the content
675  * @param expiration expiration time for the content
676  * @param uid unique identifier for the datum;
677  *        maybe 0 if no unique identifier is available
678  *
679  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
680  *         (continue on call to "next", of course),
681  *         GNUNET_NO to delete the item and continue (if supported)
682  */
683 static int
684 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
685            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
686            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
687            uint64_t uid)
688 {
689   struct ReplCtx *rc = cls;
690   struct Plugin *plugin = rc->plugin;
691   unsigned long long oid;
692   int ret;
693   int iret;
694
695   ret =
696       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
697                 expiration, uid);
698   if (NULL != key)
699   {
700     oid = (unsigned long long) uid;
701     iret =
702       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
703                                            MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
704     if (iret == GNUNET_SYSERR)
705     {
706       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
707                   "Failed to reduce replication counter\n");
708       return GNUNET_SYSERR;
709     }
710   }
711   return ret;
712 }
713
714
715 /**
716  * Get a random item for replication.  Returns a single, not expired,
717  * random item from those with the highest replication counters.  The
718  * item's replication counter is decremented by one IF it was positive
719  * before.  Call 'proc' with all values ZERO or NULL if the datastore
720  * is empty.
721  *
722  * @param cls closure
723  * @param proc function to call the value (once only).
724  * @param proc_cls closure for proc
725  */
726 static void
727 mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
728                               void *proc_cls)
729 {
730   struct Plugin *plugin = cls;
731   struct ReplCtx rc;
732   unsigned long long rvalue;
733   unsigned long repl;
734   MYSQL_BIND results;
735
736   rc.plugin = plugin;
737   rc.proc = proc;
738   rc.proc_cls = proc_cls;
739   memset (&results, 0, sizeof (results));
740   results.buffer_type = MYSQL_TYPE_LONG;
741   results.buffer = &repl;
742   results.is_unsigned = GNUNET_YES;
743
744   if (1 !=
745       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
746   {
747     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
748     return;
749   }
750
751   rvalue =
752       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
753                                                      UINT64_MAX);
754   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
755                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
756                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
757                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
758
759 }
760
761
762 /**
763  * Get all of the keys in the datastore.
764  *
765  * @param cls closure
766  * @param proc function to call on each key
767  * @param proc_cls closure for proc
768  */
769 static void
770 mysql_plugin_get_keys (void *cls,
771                         PluginKeyProcessor proc,
772                         void *proc_cls)
773 {
774   struct Plugin *plugin = cls;
775   const char *query = "SELECT hash FROM gn090";
776   int ret;
777   MYSQL_STMT *statement;
778   struct GNUNET_HashCode key;
779   MYSQL_BIND cbind[1];
780   unsigned long length;
781
782   statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
783                                                plugin->get_all_keys);
784   if (statement == NULL)
785   {
786     GNUNET_MYSQL_statements_invalidate (plugin->mc);
787     proc (proc_cls, NULL, 0);
788     return;
789   }
790   if (mysql_stmt_prepare (statement, query, strlen (query)))
791   {
792     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
793                      _("Failed to prepare statement `%s'\n"), query);
794     GNUNET_MYSQL_statements_invalidate (plugin->mc);
795     proc (proc_cls, NULL, 0);
796     return;
797   }
798   GNUNET_assert (proc != NULL);
799   if (mysql_stmt_execute (statement))
800   {
801     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
802                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
803                 "mysql_stmt_execute", query, __FILE__, __LINE__,
804                 mysql_stmt_error (statement));
805     GNUNET_MYSQL_statements_invalidate (plugin->mc);
806     proc (proc_cls, NULL, 0);
807     return;
808   }
809   memset (cbind, 0, sizeof (cbind));
810   cbind[0].buffer_type = MYSQL_TYPE_BLOB;
811   cbind[0].buffer = &key;
812   cbind[0].buffer_length = sizeof (key);
813   cbind[0].length = &length;
814   cbind[0].is_unsigned = GNUNET_NO;
815   if (mysql_stmt_bind_result (statement, cbind))
816   {
817     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
818                 _("`%s' failed at %s:%d with error: %s\n"),
819                 "mysql_stmt_bind_result", __FILE__, __LINE__,
820                 mysql_stmt_error (statement));
821     GNUNET_MYSQL_statements_invalidate (plugin->mc);
822     proc (proc_cls, NULL, 0);
823     return;
824   }
825   while (0 == (ret = mysql_stmt_fetch (statement)))
826   {
827     if (sizeof (struct GNUNET_HashCode) == length)
828       proc (proc_cls, &key, 1);
829   }
830   proc (proc_cls, NULL, 0);
831   if (ret != MYSQL_NO_DATA)
832   {
833     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
834                 _("`%s' failed at %s:%d with error: %s\n"),
835                      "mysql_stmt_fetch", __FILE__, __LINE__,
836                      mysql_stmt_error (statement));
837     GNUNET_MYSQL_statements_invalidate (plugin->mc);
838     return;
839   }
840   mysql_stmt_reset (statement);
841 }
842
843
844 /**
845  * Context for 'expi_proc' function.
846  */
847 struct ExpiCtx
848 {
849
850   /**
851    * Plugin handle.
852    */
853   struct Plugin *plugin;
854
855   /**
856    * Function to call for the result (or the NULL).
857    */
858   PluginDatumProcessor proc;
859
860   /**
861    * Closure for proc.
862    */
863   void *proc_cls;
864 };
865
866
867
868 /**
869  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
870  * If no expired value was found, we do a second query for
871  * low-priority content.
872  *
873  * @param cls closure
874  * @param key key for the content
875  * @param size number of bytes in data
876  * @param data content stored
877  * @param type type of the content
878  * @param priority priority of the content
879  * @param anonymity anonymity-level for the content
880  * @param expiration expiration time for the content
881  * @param uid unique identifier for the datum;
882  *        maybe 0 if no unique identifier is available
883  *
884  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
885  *         (continue on call to "next", of course),
886  *         GNUNET_NO to delete the item and continue (if supported)
887  */
888 static int
889 expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
890            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
891            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
892            uint64_t uid)
893 {
894   struct ExpiCtx *rc = cls;
895   struct Plugin *plugin = rc->plugin;
896
897   if (NULL == key)
898   {
899     execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
900                     -1);
901     return GNUNET_SYSERR;
902   }
903   return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
904                    expiration, uid);
905 }
906
907
908 /**
909  * Get a random item for expiration.
910  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
911  *
912  * @param cls closure
913  * @param proc function to call the value (once only).
914  * @param proc_cls closure for proc
915  */
916 static void
917 mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
918                              void *proc_cls)
919 {
920   struct Plugin *plugin = cls;
921   long long nt;
922   struct ExpiCtx rc;
923
924   rc.plugin = plugin;
925   rc.proc = proc;
926   rc.proc_cls = proc_cls;
927   nt = (long long) GNUNET_TIME_absolute_get ().abs_value_us;
928   execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
929                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
930
931 }
932
933
934 /**
935  * Drop database.
936  *
937  * @param cls the "struct Plugin*"
938  */
939 static void
940 mysql_plugin_drop (void *cls)
941 {
942   struct Plugin *plugin = cls;
943
944   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
945     return;                     /* error */
946   plugin->env->duc (plugin->env->cls, 0);
947 }
948
949
950 /**
951  * Entry point for the plugin.
952  *
953  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
954  * @return our "struct Plugin*"
955  */
956 void *
957 libgnunet_plugin_datastore_mysql_init (void *cls)
958 {
959   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
960   struct GNUNET_DATASTORE_PluginFunctions *api;
961   struct Plugin *plugin;
962
963   plugin = GNUNET_new (struct Plugin);
964   plugin->env = env;
965   plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
966   if (NULL == plugin->mc)
967   {
968     GNUNET_free (plugin);
969     return NULL;
970   }
971 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
972 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
973   if (MRUNS
974       ("CREATE TABLE IF NOT EXISTS gn090 ("
975        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
976        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
977        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
978        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
979        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
980        " rvalue BIGINT UNSIGNED NOT NULL,"
981        " hash BINARY(64) NOT NULL DEFAULT '',"
982        " vhash BINARY(64) NOT NULL DEFAULT '',"
983        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
984        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
985        " INDEX idx_hash_uid (hash(64),uid),"
986        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
987        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
988        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
989        " INDEX idx_expire (expire),"
990        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
991        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
992       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
993       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
994       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
995       PINIT (plugin->select_entry_by_hash_and_vhash,
996              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
997       PINIT (plugin->select_entry_by_hash_and_type,
998              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
999       PINIT (plugin->select_entry_by_hash_vhash_and_type,
1000              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1001       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
1002       PINIT (plugin->get_size, SELECT_SIZE) ||
1003       PINIT (plugin->count_entry_by_hash_and_vhash,
1004              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1005       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1006       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1007                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1008       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1009       PINIT (plugin->dec_repl, DEC_REPL) ||
1010       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1011       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1012       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1013       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1014       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1015       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1016   {
1017     GNUNET_MYSQL_context_destroy (plugin->mc);
1018     GNUNET_free (plugin);
1019     return NULL;
1020   }
1021 #undef PINIT
1022 #undef MRUNS
1023
1024   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1025   api->cls = plugin;
1026   api->estimate_size = &mysql_plugin_estimate_size;
1027   api->put = &mysql_plugin_put;
1028   api->update = &mysql_plugin_update;
1029   api->get_key = &mysql_plugin_get_key;
1030   api->get_replication = &mysql_plugin_get_replication;
1031   api->get_expiration = &mysql_plugin_get_expiration;
1032   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1033   api->get_keys = &mysql_plugin_get_keys;
1034   api->drop = &mysql_plugin_drop;
1035   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1036                    _("Mysql database running\n"));
1037   return api;
1038 }
1039
1040
1041 /**
1042  * Exit point from the plugin.
1043  * @param cls our "struct Plugin*"
1044  * @return always NULL
1045  */
1046 void *
1047 libgnunet_plugin_datastore_mysql_done (void *cls)
1048 {
1049   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1050   struct Plugin *plugin = api->cls;
1051
1052   GNUNET_MYSQL_context_destroy (plugin->mc);
1053   GNUNET_free (plugin);
1054   GNUNET_free (api);
1055   return NULL;
1056 }
1057
1058 /* end of plugin_datastore_mysql.c */