-regex
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include "gnunet_mysql_lib.h"
123
124 #define DEBUG_MYSQL GNUNET_EXTRA_LOGGING
125
126 #define MAX_DATUM_SIZE 65536
127
128
129 /**
130  * Context for all functions in this plugin.
131  */
132 struct Plugin
133 {
134   /**
135    * Our execution environment.
136    */
137   struct GNUNET_DATASTORE_PluginEnvironment *env;
138
139   /**
140    * Handle to talk to MySQL.
141    */
142   struct GNUNET_MYSQL_Context *mc;
143
144   /**
145    * Prepared statements.
146    */
147 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
148   struct GNUNET_MYSQL_StatementHandle *insert_entry;
149
150 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
151   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
152
153 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
154   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
155
156 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
157   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
158
159 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
160   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
161
162 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
163   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
164
165 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
166   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
167
168 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
169   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
170
171 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
172   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
173
174 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
175   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
176
177 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
178   struct GNUNET_MYSQL_StatementHandle *update_entry;
179
180 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
181   struct GNUNET_MYSQL_StatementHandle *dec_repl;
182
183 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
184   struct GNUNET_MYSQL_StatementHandle *get_size;
185
186 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
187    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
188    "WHERE anonLevel=0 AND type=? AND "\
189    "(rvalue >= ? OR"\
190    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
191    "ORDER BY rvalue ASC LIMIT 1"
192   struct GNUNET_MYSQL_StatementHandle *zero_iter;
193
194 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
195   struct GNUNET_MYSQL_StatementHandle *select_expiration;
196
197 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
198   struct GNUNET_MYSQL_StatementHandle *select_priority;
199
200 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
201   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
202   "WHERE repl=? AND "\
203   " (rvalue>=? OR"\
204   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
205   "ORDER BY rvalue ASC "\
206   "LIMIT 1"
207   struct GNUNET_MYSQL_StatementHandle *select_replication;
208
209 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
210   struct GNUNET_MYSQL_StatementHandle *max_repl;
211
212 #define GET_ALL_KEYS "SELECT hash from gn090"
213   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
214
215 };
216
217
218 /**
219  * Delete an entry from the gn090 table.
220  *
221  * @param plugin plugin context
222  * @param uid unique ID of the entry to delete
223  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
224  */
225 static int
226 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
227 {
228   int ret;
229
230 #if DEBUG_MYSQL
231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
232               uid);
233 #endif
234   ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
235                                              plugin->delete_entry_by_uid, NULL,
236                                              MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
237   if (ret >= 0)
238     return GNUNET_OK;
239   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
240               "Deleting value %llu from gn090 table failed\n", uid);
241   return ret;
242 }
243
244
245 /**
246  * Get an estimate of how much space the database is
247  * currently using.
248  *
249  * @param cls our "struct Plugin *"
250  * @return number of bytes used on disk
251  */
252 static unsigned long long
253 mysql_plugin_estimate_size (void *cls)
254 {
255   struct Plugin *plugin = cls;
256   MYSQL_BIND cbind[1];
257   long long total;
258
259   memset (cbind, 0, sizeof (cbind));
260   total = 0;
261   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
262   cbind[0].buffer = &total;
263   cbind[0].is_unsigned = GNUNET_NO;
264   if (GNUNET_OK !=
265       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
266     return 0;
267   return total;
268 }
269
270
271 /**
272  * Store an item in the datastore.
273  *
274  * @param cls closure
275  * @param key key for the item
276  * @param size number of bytes in data
277  * @param data content stored
278  * @param type type of the content
279  * @param priority priority of the content
280  * @param anonymity anonymity-level for the content
281  * @param replication replication-level for the content
282  * @param expiration expiration time for the content
283  * @param msg set to error message
284  * @return GNUNET_OK on success
285  */
286 static int
287 mysql_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size,
288                   const void *data, enum GNUNET_BLOCK_Type type,
289                   uint32_t priority, uint32_t anonymity, uint32_t replication,
290                   struct GNUNET_TIME_Absolute expiration, char **msg)
291 {
292   struct Plugin *plugin = cls;
293   unsigned int irepl = replication;
294   unsigned int ipriority = priority;
295   unsigned int ianonymity = anonymity;
296   unsigned long long lexpiration = expiration.abs_value;
297   unsigned long long lrvalue =
298       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
299                                                      UINT64_MAX);
300   unsigned long hashSize;
301   unsigned long hashSize2;
302   unsigned long lsize;
303   GNUNET_HashCode vhash;
304
305   if (size > MAX_DATUM_SIZE)
306   {
307     GNUNET_break (0);
308     return GNUNET_SYSERR;
309   }
310   hashSize = sizeof (GNUNET_HashCode);
311   hashSize2 = sizeof (GNUNET_HashCode);
312   lsize = size;
313   GNUNET_CRYPTO_hash (data, size, &vhash);
314   if (GNUNET_OK !=
315       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
316                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
317                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
318                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
319                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
320                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
321                               MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
322                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
323                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
324                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
325     return GNUNET_SYSERR;
326 #if DEBUG_MYSQL
327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328               "Inserted value `%s' with size %u into gn090 table\n",
329               GNUNET_h2s (key), (unsigned int) size);
330 #endif
331   if (size > 0)
332     plugin->env->duc (plugin->env->cls, size);
333   return GNUNET_OK;
334 }
335
336
337 /**
338  * Update the priority for a particular key in the datastore.  If
339  * the expiration time in value is different than the time found in
340  * the datastore, the higher value should be kept.  For the
341  * anonymity level, the lower value is to be used.  The specified
342  * priority should be added to the existing priority, ignoring the
343  * priority in value.
344  *
345  * Note that it is possible for multiple values to match this put.
346  * In that case, all of the respective values are updated.
347  *
348  * @param cls our "struct Plugin*"
349  * @param uid unique identifier of the datum
350  * @param delta by how much should the priority
351  *     change?  If priority + delta < 0 the
352  *     priority should be set to 0 (never go
353  *     negative).
354  * @param expire new expiration time should be the
355  *     MAX of any existing expiration time and
356  *     this value
357  * @param msg set to error message
358  * @return GNUNET_OK on success
359  */
360 static int
361 mysql_plugin_update (void *cls, uint64_t uid, int delta,
362                      struct GNUNET_TIME_Absolute expire, char **msg)
363 {
364   struct Plugin *plugin = cls;
365   unsigned long long vkey = uid;
366   unsigned long long lexpire = expire.abs_value;
367   int ret;
368
369 #if DEBUG_MYSQL
370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
372               vkey, delta, lexpire);
373 #endif
374   ret =
375     GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
376                                          MYSQL_TYPE_LONG, &delta, GNUNET_NO,
377                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
378                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
379                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
380   if (ret != GNUNET_OK)
381   {
382     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
383                 vkey);
384   }
385   return ret;
386 }
387
388
389 /**
390  * Run the given select statement and call 'proc' on the resulting
391  * values (which must be in particular positions).
392  *
393  * @param plugin the plugin handle
394  * @param stmt select statement to run
395  * @param proc function to call on result
396  * @param proc_cls closure for proc
397  * @param ... arguments to initialize stmt
398  */
399 static void
400 execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
401                 PluginDatumProcessor proc, void *proc_cls, ...)
402 {
403   va_list ap;
404   int ret;
405   unsigned int type;
406   unsigned int priority;
407   unsigned int anonymity;
408   unsigned long long exp;
409   unsigned long hashSize;
410   unsigned long size;
411   unsigned long long uid;
412   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
413   GNUNET_HashCode key;
414   struct GNUNET_TIME_Absolute expiration;
415   MYSQL_BIND rbind[7];
416
417   hashSize = sizeof (GNUNET_HashCode);
418   memset (rbind, 0, sizeof (rbind));
419   rbind[0].buffer_type = MYSQL_TYPE_LONG;
420   rbind[0].buffer = &type;
421   rbind[0].is_unsigned = 1;
422   rbind[1].buffer_type = MYSQL_TYPE_LONG;
423   rbind[1].buffer = &priority;
424   rbind[1].is_unsigned = 1;
425   rbind[2].buffer_type = MYSQL_TYPE_LONG;
426   rbind[2].buffer = &anonymity;
427   rbind[2].is_unsigned = 1;
428   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
429   rbind[3].buffer = &exp;
430   rbind[3].is_unsigned = 1;
431   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
432   rbind[4].buffer = &key;
433   rbind[4].buffer_length = hashSize;
434   rbind[4].length = &hashSize;
435   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
436   rbind[5].buffer = value;
437   rbind[5].buffer_length = size = sizeof (value);
438   rbind[5].length = &size;
439   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
440   rbind[6].buffer = &uid;
441   rbind[6].is_unsigned = 1;
442
443   va_start (ap, proc_cls);
444   ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
445   va_end (ap);
446   if (ret <= 0)
447   {
448     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
449     return;
450   }
451   GNUNET_assert (size <= sizeof (value));
452   if ((rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
453       (hashSize != sizeof (GNUNET_HashCode)))
454   {
455     GNUNET_break (0);
456     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
457     return;
458   }
459 #if DEBUG_MYSQL
460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
462               (unsigned int) size, GNUNET_h2s (&key), priority, anonymity, exp);
463 #endif
464   GNUNET_assert (size < MAX_DATUM_SIZE);
465   expiration.abs_value = exp;
466   ret =
467       proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
468             uid);
469   if (ret == GNUNET_NO)
470   {
471     do_delete_entry (plugin, uid);
472     if (size != 0)
473       plugin->env->duc (plugin->env->cls, -size);
474   }
475 }
476
477
478
479 /**
480  * Get one of the results for a particular key in the datastore.
481  *
482  * @param cls closure
483  * @param offset offset of the result (modulo num-results);
484  *               specific ordering does not matter for the offset
485  * @param key key to match, never NULL
486  * @param vhash hash of the value, maybe NULL (to
487  *        match all values that have the right key).
488  *        Note that for DBlocks there is no difference
489  *        betwen key and vhash, but for other blocks
490  *        there may be!
491  * @param type entries of which type are relevant?
492  *     Use 0 for any type.
493  * @param proc function to call on the matching value,
494  *        with NULL for if no value matches
495  * @param proc_cls closure for proc
496  */
497 static void
498 mysql_plugin_get_key (void *cls, uint64_t offset, const GNUNET_HashCode * key,
499                       const GNUNET_HashCode * vhash,
500                       enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
501                       void *proc_cls)
502 {
503   struct Plugin *plugin = cls;
504   int ret;
505   MYSQL_BIND cbind[1];
506   long long total;
507   unsigned long hashSize;
508   unsigned long hashSize2;
509   unsigned long long off;
510
511   GNUNET_assert (key != NULL);
512   GNUNET_assert (NULL != proc);
513   hashSize = sizeof (GNUNET_HashCode);
514   hashSize2 = sizeof (GNUNET_HashCode);
515   memset (cbind, 0, sizeof (cbind));
516   total = -1;
517   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
518   cbind[0].buffer = &total;
519   cbind[0].is_unsigned = GNUNET_NO;
520   if (type != 0)
521   {
522     if (vhash != NULL)
523     {
524       ret =
525         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
526                                          plugin->
527                                          count_entry_by_hash_vhash_and_type, 1,
528                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
529                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
530                                          hashSize2, &hashSize2, MYSQL_TYPE_LONG,
531                                          &type, GNUNET_YES, -1);
532     }
533     else
534     {
535       ret =
536         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
537                                          plugin->count_entry_by_hash_and_type,
538                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
539                                          hashSize, &hashSize, MYSQL_TYPE_LONG,
540                                          &type, GNUNET_YES, -1);
541     }
542   }
543   else
544   {
545     if (vhash != NULL)
546     {
547       ret =
548         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
549                                          plugin->count_entry_by_hash_and_vhash,
550                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
551                                          hashSize, &hashSize, MYSQL_TYPE_BLOB,
552                                          vhash, hashSize2, &hashSize2, -1);
553
554     }
555     else
556     {
557       ret =
558         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
559                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
560                                          &hashSize, -1);
561     }
562   }
563   if ((ret != GNUNET_OK) || (0 >= total))
564   {
565     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
566     return;
567   }
568   offset = offset % total;
569   off = (unsigned long long) offset;
570 #if DEBUG_MYSQL
571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
572               "Obtaining %llu/%lld result for GET `%s'\n", off, total,
573               GNUNET_h2s (key));
574 #endif
575
576   if (type != GNUNET_BLOCK_TYPE_ANY)
577   {
578     if (NULL != vhash)
579     {
580       execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
581                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
582                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
583                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
584                       &off, GNUNET_YES, -1);
585     }
586     else
587     {
588       execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
589                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
590                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
591                       &off, GNUNET_YES, -1);
592     }
593   }
594   else
595   {
596     if (NULL != vhash)
597     {
598       execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
599                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
600                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
601                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
602     }
603     else
604     {
605       execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
606                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
607                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
608     }
609   }
610 }
611
612
613 /**
614  * Get a zero-anonymity datum from the datastore.
615  *
616  * @param cls our "struct Plugin*"
617  * @param offset offset of the result
618  * @param type entries of which type should be considered?
619  *        Use 0 for any type.
620  * @param proc function to call on a matching value or NULL
621  * @param proc_cls closure for iter
622  */
623 static void
624 mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
625                                  enum GNUNET_BLOCK_Type type,
626                                  PluginDatumProcessor proc, void *proc_cls)
627 {
628   struct Plugin *plugin = cls;
629   unsigned long long rvalue =
630       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
631                                                      UINT64_MAX);
632
633   execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
634                   &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
635                   MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
636                   &rvalue, GNUNET_YES, -1);
637 }
638
639
640 /**
641  * Context for 'repl_proc' function.
642  */
643 struct ReplCtx
644 {
645
646   /**
647    * Plugin handle.
648    */
649   struct Plugin *plugin;
650
651   /**
652    * Function to call for the result (or the NULL).
653    */
654   PluginDatumProcessor proc;
655
656   /**
657    * Closure for proc.
658    */
659   void *proc_cls;
660 };
661
662
663 /**
664  * Wrapper for the processor for 'mysql_plugin_get_replication'.
665  * Decrements the replication counter and calls the original
666  * iterator.
667  *
668  * @param cls closure
669  * @param key key for the content
670  * @param size number of bytes in data
671  * @param data content stored
672  * @param type type of the content
673  * @param priority priority of the content
674  * @param anonymity anonymity-level for the content
675  * @param expiration expiration time for the content
676  * @param uid unique identifier for the datum;
677  *        maybe 0 if no unique identifier is available
678  *
679  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
680  *         (continue on call to "next", of course),
681  *         GNUNET_NO to delete the item and continue (if supported)
682  */
683 static int
684 repl_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
685            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
686            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
687            uint64_t uid)
688 {
689   struct ReplCtx *rc = cls;
690   struct Plugin *plugin = rc->plugin;
691   unsigned long long oid;
692   int ret;
693   int iret;
694
695   ret =
696       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
697                 expiration, uid);
698   if (NULL != key)
699   {
700     oid = (unsigned long long) uid;
701     iret =
702       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
703                                            MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
704     if (iret == GNUNET_SYSERR)
705     {
706       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
707                   "Failed to reduce replication counter\n");
708       return GNUNET_SYSERR;
709     }
710   }
711   return ret;
712 }
713
714
715 /**
716  * Get a random item for replication.  Returns a single, not expired,
717  * random item from those with the highest replication counters.  The
718  * item's replication counter is decremented by one IF it was positive
719  * before.  Call 'proc' with all values ZERO or NULL if the datastore
720  * is empty.
721  *
722  * @param cls closure
723  * @param proc function to call the value (once only).
724  * @param proc_cls closure for proc
725  */
726 static void
727 mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
728                               void *proc_cls)
729 {
730   struct Plugin *plugin = cls;
731   struct ReplCtx rc;
732   unsigned long long rvalue;
733   unsigned long repl;
734   MYSQL_BIND results;
735
736   rc.plugin = plugin;
737   rc.proc = proc;
738   rc.proc_cls = proc_cls;
739   memset (&results, 0, sizeof (results));
740   results.buffer_type = MYSQL_TYPE_LONG;
741   results.buffer = &repl;
742   results.is_unsigned = GNUNET_YES;
743
744   if (1 !=
745       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
746   {
747     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
748     return;
749   }
750
751   rvalue =
752       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
753                                                      UINT64_MAX);
754   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
755                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
756                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
757                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
758
759 }
760
761
762 /**
763  * Get all of the keys in the datastore.
764  *
765  * @param cls closure
766  * @param proc function to call on each key
767  * @param proc_cls closure for proc
768  */
769 static void
770 mysql_plugin_get_keys (void *cls,
771                         PluginKeyProcessor proc,
772                         void *proc_cls)
773 {
774   struct Plugin *plugin = cls;
775   const char *query = "SELECT hash FROM gn090";
776   int ret;
777   MYSQL_STMT *statement;
778   GNUNET_HashCode key;
779   MYSQL_BIND cbind[1];
780   unsigned long length;
781  
782   statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
783                                                plugin->get_all_keys);
784   if (statement == NULL)
785   {
786     GNUNET_MYSQL_statements_invalidate (plugin->mc);
787     return;
788   }
789   if (mysql_stmt_prepare (statement, query, strlen (query)))
790   {
791     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
792                      _("Failed to prepare statement `%s'\n"), query);
793     GNUNET_MYSQL_statements_invalidate (plugin->mc);
794     return;
795   }
796   GNUNET_assert (proc != NULL);
797   if (mysql_stmt_execute (statement))
798   {
799     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
800                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
801                 "mysql_stmt_execute", query, __FILE__, __LINE__,
802                 mysql_stmt_error (statement));
803     GNUNET_MYSQL_statements_invalidate (plugin->mc);
804     return;
805   }
806   memset (cbind, 0, sizeof (cbind));
807   cbind[0].buffer_type = MYSQL_TYPE_BLOB;
808   cbind[0].buffer = &key;
809   cbind[0].buffer_length = sizeof (key);
810   cbind[0].length = &length;
811   cbind[0].is_unsigned = GNUNET_NO;
812   if (mysql_stmt_bind_result (statement, cbind))
813   {
814     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
815                 _("`%s' failed at %s:%d with error: %s\n"),
816                 "mysql_stmt_bind_result", __FILE__, __LINE__,
817                 mysql_stmt_error (statement));
818     GNUNET_MYSQL_statements_invalidate (plugin->mc);
819     return;
820   }
821   while (0 == (ret = mysql_stmt_fetch (statement)))
822   {
823     if (sizeof (GNUNET_HashCode) == length)
824       proc (proc_cls, &key, 1);    
825   }
826   if (ret != MYSQL_NO_DATA)
827   {
828     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
829                 _("`%s' failed at %s:%d with error: %s\n"),
830                      "mysql_stmt_fetch", __FILE__, __LINE__,
831                      mysql_stmt_error (statement));    
832     GNUNET_MYSQL_statements_invalidate (plugin->mc);
833     return;
834   }
835   mysql_stmt_reset (statement);
836 }
837
838
839 /**
840  * Context for 'expi_proc' function.
841  */
842 struct ExpiCtx
843 {
844
845   /**
846    * Plugin handle.
847    */
848   struct Plugin *plugin;
849
850   /**
851    * Function to call for the result (or the NULL).
852    */
853   PluginDatumProcessor proc;
854
855   /**
856    * Closure for proc.
857    */
858   void *proc_cls;
859 };
860
861
862
863 /**
864  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
865  * If no expired value was found, we do a second query for
866  * low-priority content.
867  *
868  * @param cls closure
869  * @param key key for the content
870  * @param size number of bytes in data
871  * @param data content stored
872  * @param type type of the content
873  * @param priority priority of the content
874  * @param anonymity anonymity-level for the content
875  * @param expiration expiration time for the content
876  * @param uid unique identifier for the datum;
877  *        maybe 0 if no unique identifier is available
878  *
879  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
880  *         (continue on call to "next", of course),
881  *         GNUNET_NO to delete the item and continue (if supported)
882  */
883 static int
884 expi_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
885            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
886            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
887            uint64_t uid)
888 {
889   struct ExpiCtx *rc = cls;
890   struct Plugin *plugin = rc->plugin;
891
892   if (NULL == key)
893   {
894     execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
895                     -1);
896     return GNUNET_SYSERR;
897   }
898   return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
899                    expiration, uid);
900 }
901
902
903 /**
904  * Get a random item for expiration.
905  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
906  *
907  * @param cls closure
908  * @param proc function to call the value (once only).
909  * @param proc_cls closure for proc
910  */
911 static void
912 mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
913                              void *proc_cls)
914 {
915   struct Plugin *plugin = cls;
916   long long nt;
917   struct ExpiCtx rc;
918
919   rc.plugin = plugin;
920   rc.proc = proc;
921   rc.proc_cls = proc_cls;
922   nt = (long long) GNUNET_TIME_absolute_get ().abs_value;
923   execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
924                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
925
926 }
927
928
929 /**
930  * Drop database.
931  *
932  * @param cls the "struct Plugin*"
933  */
934 static void
935 mysql_plugin_drop (void *cls)
936 {
937   struct Plugin *plugin = cls;
938
939   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
940     return;                     /* error */
941   plugin->env->duc (plugin->env->cls, 0);
942 }
943
944
945 /**
946  * Entry point for the plugin.
947  *
948  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
949  * @return our "struct Plugin*"
950  */
951 void *
952 libgnunet_plugin_datastore_mysql_init (void *cls)
953 {
954   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
955   struct GNUNET_DATASTORE_PluginFunctions *api;
956   struct Plugin *plugin;
957
958   plugin = GNUNET_malloc (sizeof (struct Plugin));
959   plugin->env = env;
960   plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
961   if (NULL == plugin->mc)
962   {
963     GNUNET_free (plugin);
964     return NULL;
965   }
966 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
967 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
968   if (MRUNS
969       ("CREATE TABLE IF NOT EXISTS gn090 ("
970        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
971        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
972        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
973        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
974        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
975        " rvalue BIGINT UNSIGNED NOT NULL,"
976        " hash BINARY(64) NOT NULL DEFAULT '',"
977        " vhash BINARY(64) NOT NULL DEFAULT '',"
978        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
979        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
980        " INDEX idx_hash_uid (hash(64),uid),"
981        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
982        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
983        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
984        " INDEX idx_expire (expire),"
985        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
986        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
987       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
988       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
989       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
990       PINIT (plugin->select_entry_by_hash_and_vhash,
991              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
992       PINIT (plugin->select_entry_by_hash_and_type,
993              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
994       PINIT (plugin->select_entry_by_hash_vhash_and_type,
995              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
996       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
997       PINIT (plugin->get_size, SELECT_SIZE) ||
998       PINIT (plugin->count_entry_by_hash_and_vhash,
999              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1000       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1001       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1002                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1003       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1004       PINIT (plugin->dec_repl, DEC_REPL) ||
1005       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1006       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1007       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1008       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1009       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1010       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1011   {
1012     GNUNET_MYSQL_context_destroy (plugin->mc);
1013     GNUNET_free (plugin);
1014     return NULL;
1015   }
1016 #undef PINIT
1017 #undef MRUNS
1018
1019   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1020   api->cls = plugin;
1021   api->estimate_size = &mysql_plugin_estimate_size;
1022   api->put = &mysql_plugin_put;
1023   api->update = &mysql_plugin_update;
1024   api->get_key = &mysql_plugin_get_key;
1025   api->get_replication = &mysql_plugin_get_replication;
1026   api->get_expiration = &mysql_plugin_get_expiration;
1027   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1028   api->get_keys = &mysql_plugin_get_keys;
1029   api->drop = &mysql_plugin_drop;
1030   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1031                    _("Mysql database running\n"));
1032   return api;
1033 }
1034
1035
1036 /**
1037  * Exit point from the plugin.
1038  * @param cls our "struct Plugin*"
1039  * @return always NULL
1040  */
1041 void *
1042 libgnunet_plugin_datastore_mysql_done (void *cls)
1043 {
1044   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1045   struct Plugin *plugin = api->cls;
1046
1047   GNUNET_MYSQL_context_destroy (plugin->mc);
1048   GNUNET_free (plugin);
1049   GNUNET_free (api);
1050   return NULL;
1051 }
1052
1053 /* end of plugin_datastore_mysql.c */