fix use-after-free on exit
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include "gnunet_mysql_lib.h"
123
124
125 #define MAX_DATUM_SIZE 65536
126
127
128 /**
129  * Context for all functions in this plugin.
130  */
131 struct Plugin
132 {
133   /**
134    * Our execution environment.
135    */
136   struct GNUNET_DATASTORE_PluginEnvironment *env;
137
138   /**
139    * Handle to talk to MySQL.
140    */
141   struct GNUNET_MYSQL_Context *mc;
142
143   /**
144    * Prepared statements.
145    */
146 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
147   struct GNUNET_MYSQL_StatementHandle *insert_entry;
148
149 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
150   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
151
152 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
153   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
154
155 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
156   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
157
158 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
159   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
160
161 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
162   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
163
164 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
165   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
166
167 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
168   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
169
170 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
171   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
172
173 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
174   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
175
176 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
177   struct GNUNET_MYSQL_StatementHandle *update_entry;
178
179 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
180   struct GNUNET_MYSQL_StatementHandle *dec_repl;
181
182 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
183   struct GNUNET_MYSQL_StatementHandle *get_size;
184
185 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
186    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
187    "WHERE anonLevel=0 AND type=? AND "\
188    "(rvalue >= ? OR"\
189    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
190    "ORDER BY rvalue ASC LIMIT 1"
191   struct GNUNET_MYSQL_StatementHandle *zero_iter;
192
193 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
194   struct GNUNET_MYSQL_StatementHandle *select_expiration;
195
196 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
197   struct GNUNET_MYSQL_StatementHandle *select_priority;
198
199 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
200   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
201   "WHERE repl=? AND "\
202   " (rvalue>=? OR"\
203   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
204   "ORDER BY rvalue ASC "\
205   "LIMIT 1"
206   struct GNUNET_MYSQL_StatementHandle *select_replication;
207
208 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
209   struct GNUNET_MYSQL_StatementHandle *max_repl;
210
211 #define GET_ALL_KEYS "SELECT hash from gn090"
212   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
213
214 };
215
216
217 /**
218  * Delete an entry from the gn090 table.
219  *
220  * @param plugin plugin context
221  * @param uid unique ID of the entry to delete
222  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
223  */
224 static int
225 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
226 {
227   int ret;
228
229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
230               uid);
231   ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
232                                              plugin->delete_entry_by_uid, NULL,
233                                              MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
234   if (ret >= 0)
235     return GNUNET_OK;
236   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
237               "Deleting value %llu from gn090 table failed\n", uid);
238   return ret;
239 }
240
241
242 /**
243  * Get an estimate of how much space the database is
244  * currently using.
245  *
246  * @param cls our "struct Plugin *"
247  * @return number of bytes used on disk
248  */
249 static void
250 mysql_plugin_estimate_size (void *cls, unsigned long long *estimate)
251 {
252   struct Plugin *plugin = cls;
253   MYSQL_BIND cbind[1];
254   long long total;
255
256   if (NULL == estimate)
257     return;
258   memset (cbind, 0, sizeof (cbind));
259   total = 0;
260   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
261   cbind[0].buffer = &total;
262   cbind[0].is_unsigned = GNUNET_NO;
263   if (GNUNET_OK ==
264       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
265     *estimate = total;
266   else
267     *estimate = 0;
268 }
269
270
271 /**
272  * Store an item in the datastore.
273  *
274  * @param cls closure
275  * @param key key for the item
276  * @param size number of bytes in data
277  * @param data content stored
278  * @param type type of the content
279  * @param priority priority of the content
280  * @param anonymity anonymity-level for the content
281  * @param replication replication-level for the content
282  * @param expiration expiration time for the content
283  * @param msg set to error message
284  * @return GNUNET_OK on success
285  */
286 static int
287 mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
288                   const void *data, enum GNUNET_BLOCK_Type type,
289                   uint32_t priority, uint32_t anonymity, uint32_t replication,
290                   struct GNUNET_TIME_Absolute expiration, char **msg)
291 {
292   struct Plugin *plugin = cls;
293   unsigned int irepl = replication;
294   unsigned int ipriority = priority;
295   unsigned int ianonymity = anonymity;
296   unsigned long long lexpiration = expiration.abs_value_us;
297   unsigned long long lrvalue =
298       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
299                                                      UINT64_MAX);
300   unsigned long hashSize;
301   unsigned long hashSize2;
302   unsigned long lsize;
303   struct GNUNET_HashCode vhash;
304
305   if (size > MAX_DATUM_SIZE)
306   {
307     GNUNET_break (0);
308     return GNUNET_SYSERR;
309   }
310   hashSize = sizeof (struct GNUNET_HashCode);
311   hashSize2 = sizeof (struct GNUNET_HashCode);
312   lsize = size;
313   GNUNET_CRYPTO_hash (data, size, &vhash);
314   if (GNUNET_OK !=
315       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
316                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
317                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
318                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
319                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
320                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
321                               MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
322                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
323                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
324                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
325     return GNUNET_SYSERR;
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Inserted value `%s' with size %u into gn090 table\n",
328               GNUNET_h2s (key), (unsigned int) size);
329   if (size > 0)
330     plugin->env->duc (plugin->env->cls, size);
331   return GNUNET_OK;
332 }
333
334
335 /**
336  * Update the priority for a particular key in the datastore.  If
337  * the expiration time in value is different than the time found in
338  * the datastore, the higher value should be kept.  For the
339  * anonymity level, the lower value is to be used.  The specified
340  * priority should be added to the existing priority, ignoring the
341  * priority in value.
342  *
343  * Note that it is possible for multiple values to match this put.
344  * In that case, all of the respective values are updated.
345  *
346  * @param cls our "struct Plugin*"
347  * @param uid unique identifier of the datum
348  * @param delta by how much should the priority
349  *     change?  If priority + delta < 0 the
350  *     priority should be set to 0 (never go
351  *     negative).
352  * @param expire new expiration time should be the
353  *     MAX of any existing expiration time and
354  *     this value
355  * @param msg set to error message
356  * @return GNUNET_OK on success
357  */
358 static int
359 mysql_plugin_update (void *cls, uint64_t uid, int delta,
360                      struct GNUNET_TIME_Absolute expire, char **msg)
361 {
362   struct Plugin *plugin = cls;
363   unsigned long long vkey = uid;
364   unsigned long long lexpire = expire.abs_value_us;
365   int ret;
366
367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368               "Updating value %llu adding %d to priority and maxing exp at %s\n",
369               vkey, delta,
370               GNUNET_STRINGS_absolute_time_to_string (expire));
371   ret =
372     GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
373                                          MYSQL_TYPE_LONG, &delta, GNUNET_NO,
374                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
375                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
376                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
377   if (ret != GNUNET_OK)
378   {
379     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
380                 vkey);
381   }
382   return ret;
383 }
384
385
386 /**
387  * Run the given select statement and call 'proc' on the resulting
388  * values (which must be in particular positions).
389  *
390  * @param plugin the plugin handle
391  * @param stmt select statement to run
392  * @param proc function to call on result
393  * @param proc_cls closure for proc
394  * @param ... arguments to initialize stmt
395  */
396 static void
397 execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
398                 PluginDatumProcessor proc, void *proc_cls, ...)
399 {
400   va_list ap;
401   int ret;
402   unsigned int type;
403   unsigned int priority;
404   unsigned int anonymity;
405   unsigned long long exp;
406   unsigned long hashSize;
407   unsigned long size;
408   unsigned long long uid;
409   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
410   struct GNUNET_HashCode key;
411   struct GNUNET_TIME_Absolute expiration;
412   MYSQL_BIND rbind[7];
413
414   hashSize = sizeof (struct GNUNET_HashCode);
415   memset (rbind, 0, sizeof (rbind));
416   rbind[0].buffer_type = MYSQL_TYPE_LONG;
417   rbind[0].buffer = &type;
418   rbind[0].is_unsigned = 1;
419   rbind[1].buffer_type = MYSQL_TYPE_LONG;
420   rbind[1].buffer = &priority;
421   rbind[1].is_unsigned = 1;
422   rbind[2].buffer_type = MYSQL_TYPE_LONG;
423   rbind[2].buffer = &anonymity;
424   rbind[2].is_unsigned = 1;
425   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
426   rbind[3].buffer = &exp;
427   rbind[3].is_unsigned = 1;
428   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
429   rbind[4].buffer = &key;
430   rbind[4].buffer_length = hashSize;
431   rbind[4].length = &hashSize;
432   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
433   rbind[5].buffer = value;
434   rbind[5].buffer_length = size = sizeof (value);
435   rbind[5].length = &size;
436   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
437   rbind[6].buffer = &uid;
438   rbind[6].is_unsigned = 1;
439
440   va_start (ap, proc_cls);
441   ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
442   va_end (ap);
443   if (ret <= 0)
444   {
445     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
446     return;
447   }
448   GNUNET_assert (size <= sizeof (value));
449   if ((rbind[4].buffer_length != sizeof (struct GNUNET_HashCode)) ||
450       (hashSize != sizeof (struct GNUNET_HashCode)))
451   {
452     GNUNET_break (0);
453     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
454     return;
455   }
456   expiration.abs_value_us = exp;
457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
459               (unsigned int) size, GNUNET_h2s (&key),
460               priority, anonymity,
461               GNUNET_STRINGS_absolute_time_to_string (expiration));
462   GNUNET_assert (size < MAX_DATUM_SIZE);
463   ret =
464       proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
465             uid);
466   if (ret == GNUNET_NO)
467   {
468     do_delete_entry (plugin, uid);
469     if (size != 0)
470       plugin->env->duc (plugin->env->cls, -size);
471   }
472 }
473
474
475
476 /**
477  * Get one of the results for a particular key in the datastore.
478  *
479  * @param cls closure
480  * @param offset offset of the result (modulo num-results);
481  *               specific ordering does not matter for the offset
482  * @param key key to match, never NULL
483  * @param vhash hash of the value, maybe NULL (to
484  *        match all values that have the right key).
485  *        Note that for DBlocks there is no difference
486  *        betwen key and vhash, but for other blocks
487  *        there may be!
488  * @param type entries of which type are relevant?
489  *     Use 0 for any type.
490  * @param proc function to call on the matching value,
491  *        with NULL for if no value matches
492  * @param proc_cls closure for proc
493  */
494 static void
495 mysql_plugin_get_key (void *cls, uint64_t offset, const struct GNUNET_HashCode * key,
496                       const struct GNUNET_HashCode * vhash,
497                       enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
498                       void *proc_cls)
499 {
500   struct Plugin *plugin = cls;
501   int ret;
502   MYSQL_BIND cbind[1];
503   long long total;
504   unsigned long hashSize;
505   unsigned long hashSize2;
506   unsigned long long off;
507
508   GNUNET_assert (key != NULL);
509   GNUNET_assert (NULL != proc);
510   hashSize = sizeof (struct GNUNET_HashCode);
511   hashSize2 = sizeof (struct GNUNET_HashCode);
512   memset (cbind, 0, sizeof (cbind));
513   total = -1;
514   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
515   cbind[0].buffer = &total;
516   cbind[0].is_unsigned = GNUNET_NO;
517   if (type != 0)
518   {
519     if (vhash != NULL)
520     {
521       ret =
522         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
523                                          plugin->
524                                          count_entry_by_hash_vhash_and_type, 1,
525                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
526                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
527                                          hashSize2, &hashSize2, MYSQL_TYPE_LONG,
528                                          &type, GNUNET_YES, -1);
529     }
530     else
531     {
532       ret =
533         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
534                                          plugin->count_entry_by_hash_and_type,
535                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
536                                          hashSize, &hashSize, MYSQL_TYPE_LONG,
537                                          &type, GNUNET_YES, -1);
538     }
539   }
540   else
541   {
542     if (vhash != NULL)
543     {
544       ret =
545         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
546                                          plugin->count_entry_by_hash_and_vhash,
547                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
548                                          hashSize, &hashSize, MYSQL_TYPE_BLOB,
549                                          vhash, hashSize2, &hashSize2, -1);
550
551     }
552     else
553     {
554       ret =
555         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
556                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
557                                          &hashSize, -1);
558     }
559   }
560   if ((ret != GNUNET_OK) || (0 >= total))
561   {
562     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
563     return;
564   }
565   offset = offset % total;
566   off = (unsigned long long) offset;
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "Obtaining %llu/%lld result for GET `%s'\n", off, total,
569               GNUNET_h2s (key));
570   if (type != GNUNET_BLOCK_TYPE_ANY)
571   {
572     if (NULL != vhash)
573     {
574       execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
575                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
576                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
577                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
578                       &off, GNUNET_YES, -1);
579     }
580     else
581     {
582       execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
583                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
584                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
585                       &off, GNUNET_YES, -1);
586     }
587   }
588   else
589   {
590     if (NULL != vhash)
591     {
592       execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
593                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
594                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
595                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
596     }
597     else
598     {
599       execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
600                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
601                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
602     }
603   }
604 }
605
606
607 /**
608  * Get a zero-anonymity datum from the datastore.
609  *
610  * @param cls our "struct Plugin*"
611  * @param offset offset of the result
612  * @param type entries of which type should be considered?
613  *        Use 0 for any type.
614  * @param proc function to call on a matching value or NULL
615  * @param proc_cls closure for iter
616  */
617 static void
618 mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
619                                  enum GNUNET_BLOCK_Type type,
620                                  PluginDatumProcessor proc, void *proc_cls)
621 {
622   struct Plugin *plugin = cls;
623   unsigned long long rvalue =
624       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
625                                                      UINT64_MAX);
626
627   execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
628                   &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
629                   MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
630                   &rvalue, GNUNET_YES, -1);
631 }
632
633
634 /**
635  * Context for 'repl_proc' function.
636  */
637 struct ReplCtx
638 {
639
640   /**
641    * Plugin handle.
642    */
643   struct Plugin *plugin;
644
645   /**
646    * Function to call for the result (or the NULL).
647    */
648   PluginDatumProcessor proc;
649
650   /**
651    * Closure for proc.
652    */
653   void *proc_cls;
654 };
655
656
657 /**
658  * Wrapper for the processor for 'mysql_plugin_get_replication'.
659  * Decrements the replication counter and calls the original
660  * iterator.
661  *
662  * @param cls closure
663  * @param key key for the content
664  * @param size number of bytes in data
665  * @param data content stored
666  * @param type type of the content
667  * @param priority priority of the content
668  * @param anonymity anonymity-level for the content
669  * @param expiration expiration time for the content
670  * @param uid unique identifier for the datum;
671  *        maybe 0 if no unique identifier is available
672  *
673  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
674  *         (continue on call to "next", of course),
675  *         GNUNET_NO to delete the item and continue (if supported)
676  */
677 static int
678 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
679            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
680            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
681            uint64_t uid)
682 {
683   struct ReplCtx *rc = cls;
684   struct Plugin *plugin = rc->plugin;
685   unsigned long long oid;
686   int ret;
687   int iret;
688
689   ret =
690       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
691                 expiration, uid);
692   if (NULL != key)
693   {
694     oid = (unsigned long long) uid;
695     iret =
696       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
697                                            MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
698     if (iret == GNUNET_SYSERR)
699     {
700       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
701                   "Failed to reduce replication counter\n");
702       return GNUNET_SYSERR;
703     }
704   }
705   return ret;
706 }
707
708
709 /**
710  * Get a random item for replication.  Returns a single, not expired,
711  * random item from those with the highest replication counters.  The
712  * item's replication counter is decremented by one IF it was positive
713  * before.  Call 'proc' with all values ZERO or NULL if the datastore
714  * is empty.
715  *
716  * @param cls closure
717  * @param proc function to call the value (once only).
718  * @param proc_cls closure for proc
719  */
720 static void
721 mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
722                               void *proc_cls)
723 {
724   struct Plugin *plugin = cls;
725   struct ReplCtx rc;
726   unsigned long long rvalue;
727   unsigned long repl;
728   MYSQL_BIND results;
729
730   rc.plugin = plugin;
731   rc.proc = proc;
732   rc.proc_cls = proc_cls;
733   memset (&results, 0, sizeof (results));
734   results.buffer_type = MYSQL_TYPE_LONG;
735   results.buffer = &repl;
736   results.is_unsigned = GNUNET_YES;
737
738   if (1 !=
739       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
740   {
741     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
742     return;
743   }
744
745   rvalue =
746       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
747                                                      UINT64_MAX);
748   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
749                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
750                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
751                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
752
753 }
754
755
756 /**
757  * Get all of the keys in the datastore.
758  *
759  * @param cls closure
760  * @param proc function to call on each key
761  * @param proc_cls closure for proc
762  */
763 static void
764 mysql_plugin_get_keys (void *cls,
765                         PluginKeyProcessor proc,
766                         void *proc_cls)
767 {
768   struct Plugin *plugin = cls;
769   const char *query = "SELECT hash FROM gn090";
770   int ret;
771   MYSQL_STMT *statement;
772   struct GNUNET_HashCode key;
773   MYSQL_BIND cbind[1];
774   unsigned long length;
775
776   statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
777                                                plugin->get_all_keys);
778   if (statement == NULL)
779   {
780     GNUNET_MYSQL_statements_invalidate (plugin->mc);
781     return;
782   }
783   if (mysql_stmt_prepare (statement, query, strlen (query)))
784   {
785     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
786                      _("Failed to prepare statement `%s'\n"), query);
787     GNUNET_MYSQL_statements_invalidate (plugin->mc);
788     return;
789   }
790   GNUNET_assert (proc != NULL);
791   if (mysql_stmt_execute (statement))
792   {
793     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
794                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
795                 "mysql_stmt_execute", query, __FILE__, __LINE__,
796                 mysql_stmt_error (statement));
797     GNUNET_MYSQL_statements_invalidate (plugin->mc);
798     return;
799   }
800   memset (cbind, 0, sizeof (cbind));
801   cbind[0].buffer_type = MYSQL_TYPE_BLOB;
802   cbind[0].buffer = &key;
803   cbind[0].buffer_length = sizeof (key);
804   cbind[0].length = &length;
805   cbind[0].is_unsigned = GNUNET_NO;
806   if (mysql_stmt_bind_result (statement, cbind))
807   {
808     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
809                 _("`%s' failed at %s:%d with error: %s\n"),
810                 "mysql_stmt_bind_result", __FILE__, __LINE__,
811                 mysql_stmt_error (statement));
812     GNUNET_MYSQL_statements_invalidate (plugin->mc);
813     return;
814   }
815   while (0 == (ret = mysql_stmt_fetch (statement)))
816   {
817     if (sizeof (struct GNUNET_HashCode) == length)
818       proc (proc_cls, &key, 1);
819   }
820   if (ret != MYSQL_NO_DATA)
821   {
822     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
823                 _("`%s' failed at %s:%d with error: %s\n"),
824                      "mysql_stmt_fetch", __FILE__, __LINE__,
825                      mysql_stmt_error (statement));
826     GNUNET_MYSQL_statements_invalidate (plugin->mc);
827     return;
828   }
829   mysql_stmt_reset (statement);
830 }
831
832
833 /**
834  * Context for 'expi_proc' function.
835  */
836 struct ExpiCtx
837 {
838
839   /**
840    * Plugin handle.
841    */
842   struct Plugin *plugin;
843
844   /**
845    * Function to call for the result (or the NULL).
846    */
847   PluginDatumProcessor proc;
848
849   /**
850    * Closure for proc.
851    */
852   void *proc_cls;
853 };
854
855
856
857 /**
858  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
859  * If no expired value was found, we do a second query for
860  * low-priority content.
861  *
862  * @param cls closure
863  * @param key key for the content
864  * @param size number of bytes in data
865  * @param data content stored
866  * @param type type of the content
867  * @param priority priority of the content
868  * @param anonymity anonymity-level for the content
869  * @param expiration expiration time for the content
870  * @param uid unique identifier for the datum;
871  *        maybe 0 if no unique identifier is available
872  *
873  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
874  *         (continue on call to "next", of course),
875  *         GNUNET_NO to delete the item and continue (if supported)
876  */
877 static int
878 expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
879            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
880            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
881            uint64_t uid)
882 {
883   struct ExpiCtx *rc = cls;
884   struct Plugin *plugin = rc->plugin;
885
886   if (NULL == key)
887   {
888     execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
889                     -1);
890     return GNUNET_SYSERR;
891   }
892   return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
893                    expiration, uid);
894 }
895
896
897 /**
898  * Get a random item for expiration.
899  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
900  *
901  * @param cls closure
902  * @param proc function to call the value (once only).
903  * @param proc_cls closure for proc
904  */
905 static void
906 mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
907                              void *proc_cls)
908 {
909   struct Plugin *plugin = cls;
910   long long nt;
911   struct ExpiCtx rc;
912
913   rc.plugin = plugin;
914   rc.proc = proc;
915   rc.proc_cls = proc_cls;
916   nt = (long long) GNUNET_TIME_absolute_get ().abs_value_us;
917   execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
918                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
919
920 }
921
922
923 /**
924  * Drop database.
925  *
926  * @param cls the "struct Plugin*"
927  */
928 static void
929 mysql_plugin_drop (void *cls)
930 {
931   struct Plugin *plugin = cls;
932
933   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
934     return;                     /* error */
935   plugin->env->duc (plugin->env->cls, 0);
936 }
937
938
939 /**
940  * Entry point for the plugin.
941  *
942  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
943  * @return our "struct Plugin*"
944  */
945 void *
946 libgnunet_plugin_datastore_mysql_init (void *cls)
947 {
948   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
949   struct GNUNET_DATASTORE_PluginFunctions *api;
950   struct Plugin *plugin;
951
952   plugin = GNUNET_new (struct Plugin);
953   plugin->env = env;
954   plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
955   if (NULL == plugin->mc)
956   {
957     GNUNET_free (plugin);
958     return NULL;
959   }
960 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
961 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
962   if (MRUNS
963       ("CREATE TABLE IF NOT EXISTS gn090 ("
964        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
965        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
966        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
967        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
968        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
969        " rvalue BIGINT UNSIGNED NOT NULL,"
970        " hash BINARY(64) NOT NULL DEFAULT '',"
971        " vhash BINARY(64) NOT NULL DEFAULT '',"
972        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
973        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
974        " INDEX idx_hash_uid (hash(64),uid),"
975        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
976        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
977        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
978        " INDEX idx_expire (expire),"
979        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
980        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
981       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
982       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
983       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
984       PINIT (plugin->select_entry_by_hash_and_vhash,
985              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
986       PINIT (plugin->select_entry_by_hash_and_type,
987              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
988       PINIT (plugin->select_entry_by_hash_vhash_and_type,
989              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
990       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
991       PINIT (plugin->get_size, SELECT_SIZE) ||
992       PINIT (plugin->count_entry_by_hash_and_vhash,
993              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
994       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
995       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
996                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
997       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
998       PINIT (plugin->dec_repl, DEC_REPL) ||
999       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1000       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1001       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1002       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1003       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1004       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1005   {
1006     GNUNET_MYSQL_context_destroy (plugin->mc);
1007     GNUNET_free (plugin);
1008     return NULL;
1009   }
1010 #undef PINIT
1011 #undef MRUNS
1012
1013   api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
1014   api->cls = plugin;
1015   api->estimate_size = &mysql_plugin_estimate_size;
1016   api->put = &mysql_plugin_put;
1017   api->update = &mysql_plugin_update;
1018   api->get_key = &mysql_plugin_get_key;
1019   api->get_replication = &mysql_plugin_get_replication;
1020   api->get_expiration = &mysql_plugin_get_expiration;
1021   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1022   api->get_keys = &mysql_plugin_get_keys;
1023   api->drop = &mysql_plugin_drop;
1024   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1025                    _("Mysql database running\n"));
1026   return api;
1027 }
1028
1029
1030 /**
1031  * Exit point from the plugin.
1032  * @param cls our "struct Plugin*"
1033  * @return always NULL
1034  */
1035 void *
1036 libgnunet_plugin_datastore_mysql_done (void *cls)
1037 {
1038   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1039   struct Plugin *plugin = api->cls;
1040
1041   GNUNET_MYSQL_context_destroy (plugin->mc);
1042   GNUNET_free (plugin);
1043   GNUNET_free (api);
1044   return NULL;
1045 }
1046
1047 /* end of plugin_datastore_mysql.c */