-fix
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include "gnunet_mysql_lib.h"
123
124
125 #define MAX_DATUM_SIZE 65536
126
127
128 /**
129  * Context for all functions in this plugin.
130  */
131 struct Plugin
132 {
133   /**
134    * Our execution environment.
135    */
136   struct GNUNET_DATASTORE_PluginEnvironment *env;
137
138   /**
139    * Handle to talk to MySQL.
140    */
141   struct GNUNET_MYSQL_Context *mc;
142
143   /**
144    * Prepared statements.
145    */
146 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
147   struct GNUNET_MYSQL_StatementHandle *insert_entry;
148
149 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
150   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
151
152 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
153   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
154
155 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
156   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
157
158 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
159   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
160
161 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
162   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
163
164 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
165   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
166
167 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
168   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
169
170 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
171   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
172
173 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
174   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
175
176 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
177   struct GNUNET_MYSQL_StatementHandle *update_entry;
178
179 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
180   struct GNUNET_MYSQL_StatementHandle *dec_repl;
181
182 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
183   struct GNUNET_MYSQL_StatementHandle *get_size;
184
185 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
186    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
187    "WHERE anonLevel=0 AND type=? AND "\
188    "(rvalue >= ? OR"\
189    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
190    "ORDER BY rvalue ASC LIMIT 1"
191   struct GNUNET_MYSQL_StatementHandle *zero_iter;
192
193 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
194   struct GNUNET_MYSQL_StatementHandle *select_expiration;
195
196 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
197   struct GNUNET_MYSQL_StatementHandle *select_priority;
198
199 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
200   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
201   "WHERE repl=? AND "\
202   " (rvalue>=? OR"\
203   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
204   "ORDER BY rvalue ASC "\
205   "LIMIT 1"
206   struct GNUNET_MYSQL_StatementHandle *select_replication;
207
208 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
209   struct GNUNET_MYSQL_StatementHandle *max_repl;
210
211 #define GET_ALL_KEYS "SELECT hash from gn090"
212   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
213
214 };
215
216
217 /**
218  * Delete an entry from the gn090 table.
219  *
220  * @param plugin plugin context
221  * @param uid unique ID of the entry to delete
222  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
223  */
224 static int
225 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
226 {
227   int ret;
228
229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
230               uid);
231   ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
232                                              plugin->delete_entry_by_uid, NULL,
233                                              MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
234   if (ret >= 0)
235     return GNUNET_OK;
236   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
237               "Deleting value %llu from gn090 table failed\n", uid);
238   return ret;
239 }
240
241
242 /**
243  * Get an estimate of how much space the database is
244  * currently using.
245  *
246  * @param cls our "struct Plugin *"
247  * @return number of bytes used on disk
248  */
249 static unsigned long long
250 mysql_plugin_estimate_size (void *cls)
251 {
252   struct Plugin *plugin = cls;
253   MYSQL_BIND cbind[1];
254   long long total;
255
256   memset (cbind, 0, sizeof (cbind));
257   total = 0;
258   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
259   cbind[0].buffer = &total;
260   cbind[0].is_unsigned = GNUNET_NO;
261   if (GNUNET_OK !=
262       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
263     return 0;
264   return total;
265 }
266
267
268 /**
269  * Store an item in the datastore.
270  *
271  * @param cls closure
272  * @param key key for the item
273  * @param size number of bytes in data
274  * @param data content stored
275  * @param type type of the content
276  * @param priority priority of the content
277  * @param anonymity anonymity-level for the content
278  * @param replication replication-level for the content
279  * @param expiration expiration time for the content
280  * @param msg set to error message
281  * @return GNUNET_OK on success
282  */
283 static int
284 mysql_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size,
285                   const void *data, enum GNUNET_BLOCK_Type type,
286                   uint32_t priority, uint32_t anonymity, uint32_t replication,
287                   struct GNUNET_TIME_Absolute expiration, char **msg)
288 {
289   struct Plugin *plugin = cls;
290   unsigned int irepl = replication;
291   unsigned int ipriority = priority;
292   unsigned int ianonymity = anonymity;
293   unsigned long long lexpiration = expiration.abs_value;
294   unsigned long long lrvalue =
295       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
296                                                      UINT64_MAX);
297   unsigned long hashSize;
298   unsigned long hashSize2;
299   unsigned long lsize;
300   GNUNET_HashCode vhash;
301
302   if (size > MAX_DATUM_SIZE)
303   {
304     GNUNET_break (0);
305     return GNUNET_SYSERR;
306   }
307   hashSize = sizeof (GNUNET_HashCode);
308   hashSize2 = sizeof (GNUNET_HashCode);
309   lsize = size;
310   GNUNET_CRYPTO_hash (data, size, &vhash);
311   if (GNUNET_OK !=
312       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
313                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
314                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
315                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
316                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
317                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
318                               MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
319                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
320                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
321                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
322     return GNUNET_SYSERR;
323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324               "Inserted value `%s' with size %u into gn090 table\n",
325               GNUNET_h2s (key), (unsigned int) size);
326   if (size > 0)
327     plugin->env->duc (plugin->env->cls, size);
328   return GNUNET_OK;
329 }
330
331
332 /**
333  * Update the priority for a particular key in the datastore.  If
334  * the expiration time in value is different than the time found in
335  * the datastore, the higher value should be kept.  For the
336  * anonymity level, the lower value is to be used.  The specified
337  * priority should be added to the existing priority, ignoring the
338  * priority in value.
339  *
340  * Note that it is possible for multiple values to match this put.
341  * In that case, all of the respective values are updated.
342  *
343  * @param cls our "struct Plugin*"
344  * @param uid unique identifier of the datum
345  * @param delta by how much should the priority
346  *     change?  If priority + delta < 0 the
347  *     priority should be set to 0 (never go
348  *     negative).
349  * @param expire new expiration time should be the
350  *     MAX of any existing expiration time and
351  *     this value
352  * @param msg set to error message
353  * @return GNUNET_OK on success
354  */
355 static int
356 mysql_plugin_update (void *cls, uint64_t uid, int delta,
357                      struct GNUNET_TIME_Absolute expire, char **msg)
358 {
359   struct Plugin *plugin = cls;
360   unsigned long long vkey = uid;
361   unsigned long long lexpire = expire.abs_value;
362   int ret;
363
364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
366               vkey, delta, lexpire);
367   ret =
368     GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
369                                          MYSQL_TYPE_LONG, &delta, GNUNET_NO,
370                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
371                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
372                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
373   if (ret != GNUNET_OK)
374   {
375     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
376                 vkey);
377   }
378   return ret;
379 }
380
381
382 /**
383  * Run the given select statement and call 'proc' on the resulting
384  * values (which must be in particular positions).
385  *
386  * @param plugin the plugin handle
387  * @param stmt select statement to run
388  * @param proc function to call on result
389  * @param proc_cls closure for proc
390  * @param ... arguments to initialize stmt
391  */
392 static void
393 execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
394                 PluginDatumProcessor proc, void *proc_cls, ...)
395 {
396   va_list ap;
397   int ret;
398   unsigned int type;
399   unsigned int priority;
400   unsigned int anonymity;
401   unsigned long long exp;
402   unsigned long hashSize;
403   unsigned long size;
404   unsigned long long uid;
405   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
406   GNUNET_HashCode key;
407   struct GNUNET_TIME_Absolute expiration;
408   MYSQL_BIND rbind[7];
409
410   hashSize = sizeof (GNUNET_HashCode);
411   memset (rbind, 0, sizeof (rbind));
412   rbind[0].buffer_type = MYSQL_TYPE_LONG;
413   rbind[0].buffer = &type;
414   rbind[0].is_unsigned = 1;
415   rbind[1].buffer_type = MYSQL_TYPE_LONG;
416   rbind[1].buffer = &priority;
417   rbind[1].is_unsigned = 1;
418   rbind[2].buffer_type = MYSQL_TYPE_LONG;
419   rbind[2].buffer = &anonymity;
420   rbind[2].is_unsigned = 1;
421   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
422   rbind[3].buffer = &exp;
423   rbind[3].is_unsigned = 1;
424   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
425   rbind[4].buffer = &key;
426   rbind[4].buffer_length = hashSize;
427   rbind[4].length = &hashSize;
428   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
429   rbind[5].buffer = value;
430   rbind[5].buffer_length = size = sizeof (value);
431   rbind[5].length = &size;
432   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
433   rbind[6].buffer = &uid;
434   rbind[6].is_unsigned = 1;
435
436   va_start (ap, proc_cls);
437   ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
438   va_end (ap);
439   if (ret <= 0)
440   {
441     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
442     return;
443   }
444   GNUNET_assert (size <= sizeof (value));
445   if ((rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
446       (hashSize != sizeof (GNUNET_HashCode)))
447   {
448     GNUNET_break (0);
449     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
450     return;
451   }
452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
454               (unsigned int) size, GNUNET_h2s (&key), priority, anonymity, exp);
455   GNUNET_assert (size < MAX_DATUM_SIZE);
456   expiration.abs_value = exp;
457   ret =
458       proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
459             uid);
460   if (ret == GNUNET_NO)
461   {
462     do_delete_entry (plugin, uid);
463     if (size != 0)
464       plugin->env->duc (plugin->env->cls, -size);
465   }
466 }
467
468
469
470 /**
471  * Get one of the results for a particular key in the datastore.
472  *
473  * @param cls closure
474  * @param offset offset of the result (modulo num-results);
475  *               specific ordering does not matter for the offset
476  * @param key key to match, never NULL
477  * @param vhash hash of the value, maybe NULL (to
478  *        match all values that have the right key).
479  *        Note that for DBlocks there is no difference
480  *        betwen key and vhash, but for other blocks
481  *        there may be!
482  * @param type entries of which type are relevant?
483  *     Use 0 for any type.
484  * @param proc function to call on the matching value,
485  *        with NULL for if no value matches
486  * @param proc_cls closure for proc
487  */
488 static void
489 mysql_plugin_get_key (void *cls, uint64_t offset, const GNUNET_HashCode * key,
490                       const GNUNET_HashCode * vhash,
491                       enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
492                       void *proc_cls)
493 {
494   struct Plugin *plugin = cls;
495   int ret;
496   MYSQL_BIND cbind[1];
497   long long total;
498   unsigned long hashSize;
499   unsigned long hashSize2;
500   unsigned long long off;
501
502   GNUNET_assert (key != NULL);
503   GNUNET_assert (NULL != proc);
504   hashSize = sizeof (GNUNET_HashCode);
505   hashSize2 = sizeof (GNUNET_HashCode);
506   memset (cbind, 0, sizeof (cbind));
507   total = -1;
508   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
509   cbind[0].buffer = &total;
510   cbind[0].is_unsigned = GNUNET_NO;
511   if (type != 0)
512   {
513     if (vhash != NULL)
514     {
515       ret =
516         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
517                                          plugin->
518                                          count_entry_by_hash_vhash_and_type, 1,
519                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
520                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
521                                          hashSize2, &hashSize2, MYSQL_TYPE_LONG,
522                                          &type, GNUNET_YES, -1);
523     }
524     else
525     {
526       ret =
527         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
528                                          plugin->count_entry_by_hash_and_type,
529                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
530                                          hashSize, &hashSize, MYSQL_TYPE_LONG,
531                                          &type, GNUNET_YES, -1);
532     }
533   }
534   else
535   {
536     if (vhash != NULL)
537     {
538       ret =
539         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
540                                          plugin->count_entry_by_hash_and_vhash,
541                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
542                                          hashSize, &hashSize, MYSQL_TYPE_BLOB,
543                                          vhash, hashSize2, &hashSize2, -1);
544
545     }
546     else
547     {
548       ret =
549         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
550                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
551                                          &hashSize, -1);
552     }
553   }
554   if ((ret != GNUNET_OK) || (0 >= total))
555   {
556     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
557     return;
558   }
559   offset = offset % total;
560   off = (unsigned long long) offset;
561   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562               "Obtaining %llu/%lld result for GET `%s'\n", off, total,
563               GNUNET_h2s (key));
564   if (type != GNUNET_BLOCK_TYPE_ANY)
565   {
566     if (NULL != vhash)
567     {
568       execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
569                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
570                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
571                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
572                       &off, GNUNET_YES, -1);
573     }
574     else
575     {
576       execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
577                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
578                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
579                       &off, GNUNET_YES, -1);
580     }
581   }
582   else
583   {
584     if (NULL != vhash)
585     {
586       execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
587                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
588                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
589                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
590     }
591     else
592     {
593       execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
594                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
595                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
596     }
597   }
598 }
599
600
601 /**
602  * Get a zero-anonymity datum from the datastore.
603  *
604  * @param cls our "struct Plugin*"
605  * @param offset offset of the result
606  * @param type entries of which type should be considered?
607  *        Use 0 for any type.
608  * @param proc function to call on a matching value or NULL
609  * @param proc_cls closure for iter
610  */
611 static void
612 mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
613                                  enum GNUNET_BLOCK_Type type,
614                                  PluginDatumProcessor proc, void *proc_cls)
615 {
616   struct Plugin *plugin = cls;
617   unsigned long long rvalue =
618       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
619                                                      UINT64_MAX);
620
621   execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
622                   &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
623                   MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
624                   &rvalue, GNUNET_YES, -1);
625 }
626
627
628 /**
629  * Context for 'repl_proc' function.
630  */
631 struct ReplCtx
632 {
633
634   /**
635    * Plugin handle.
636    */
637   struct Plugin *plugin;
638
639   /**
640    * Function to call for the result (or the NULL).
641    */
642   PluginDatumProcessor proc;
643
644   /**
645    * Closure for proc.
646    */
647   void *proc_cls;
648 };
649
650
651 /**
652  * Wrapper for the processor for 'mysql_plugin_get_replication'.
653  * Decrements the replication counter and calls the original
654  * iterator.
655  *
656  * @param cls closure
657  * @param key key for the content
658  * @param size number of bytes in data
659  * @param data content stored
660  * @param type type of the content
661  * @param priority priority of the content
662  * @param anonymity anonymity-level for the content
663  * @param expiration expiration time for the content
664  * @param uid unique identifier for the datum;
665  *        maybe 0 if no unique identifier is available
666  *
667  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
668  *         (continue on call to "next", of course),
669  *         GNUNET_NO to delete the item and continue (if supported)
670  */
671 static int
672 repl_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
673            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
674            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
675            uint64_t uid)
676 {
677   struct ReplCtx *rc = cls;
678   struct Plugin *plugin = rc->plugin;
679   unsigned long long oid;
680   int ret;
681   int iret;
682
683   ret =
684       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
685                 expiration, uid);
686   if (NULL != key)
687   {
688     oid = (unsigned long long) uid;
689     iret =
690       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
691                                            MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
692     if (iret == GNUNET_SYSERR)
693     {
694       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
695                   "Failed to reduce replication counter\n");
696       return GNUNET_SYSERR;
697     }
698   }
699   return ret;
700 }
701
702
703 /**
704  * Get a random item for replication.  Returns a single, not expired,
705  * random item from those with the highest replication counters.  The
706  * item's replication counter is decremented by one IF it was positive
707  * before.  Call 'proc' with all values ZERO or NULL if the datastore
708  * is empty.
709  *
710  * @param cls closure
711  * @param proc function to call the value (once only).
712  * @param proc_cls closure for proc
713  */
714 static void
715 mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
716                               void *proc_cls)
717 {
718   struct Plugin *plugin = cls;
719   struct ReplCtx rc;
720   unsigned long long rvalue;
721   unsigned long repl;
722   MYSQL_BIND results;
723
724   rc.plugin = plugin;
725   rc.proc = proc;
726   rc.proc_cls = proc_cls;
727   memset (&results, 0, sizeof (results));
728   results.buffer_type = MYSQL_TYPE_LONG;
729   results.buffer = &repl;
730   results.is_unsigned = GNUNET_YES;
731
732   if (1 !=
733       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
734   {
735     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
736     return;
737   }
738
739   rvalue =
740       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
741                                                      UINT64_MAX);
742   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
743                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
744                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
745                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
746
747 }
748
749
750 /**
751  * Get all of the keys in the datastore.
752  *
753  * @param cls closure
754  * @param proc function to call on each key
755  * @param proc_cls closure for proc
756  */
757 static void
758 mysql_plugin_get_keys (void *cls,
759                         PluginKeyProcessor proc,
760                         void *proc_cls)
761 {
762   struct Plugin *plugin = cls;
763   const char *query = "SELECT hash FROM gn090";
764   int ret;
765   MYSQL_STMT *statement;
766   GNUNET_HashCode key;
767   MYSQL_BIND cbind[1];
768   unsigned long length;
769  
770   statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
771                                                plugin->get_all_keys);
772   if (statement == NULL)
773   {
774     GNUNET_MYSQL_statements_invalidate (plugin->mc);
775     return;
776   }
777   if (mysql_stmt_prepare (statement, query, strlen (query)))
778   {
779     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
780                      _("Failed to prepare statement `%s'\n"), query);
781     GNUNET_MYSQL_statements_invalidate (plugin->mc);
782     return;
783   }
784   GNUNET_assert (proc != NULL);
785   if (mysql_stmt_execute (statement))
786   {
787     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
788                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
789                 "mysql_stmt_execute", query, __FILE__, __LINE__,
790                 mysql_stmt_error (statement));
791     GNUNET_MYSQL_statements_invalidate (plugin->mc);
792     return;
793   }
794   memset (cbind, 0, sizeof (cbind));
795   cbind[0].buffer_type = MYSQL_TYPE_BLOB;
796   cbind[0].buffer = &key;
797   cbind[0].buffer_length = sizeof (key);
798   cbind[0].length = &length;
799   cbind[0].is_unsigned = GNUNET_NO;
800   if (mysql_stmt_bind_result (statement, cbind))
801   {
802     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
803                 _("`%s' failed at %s:%d with error: %s\n"),
804                 "mysql_stmt_bind_result", __FILE__, __LINE__,
805                 mysql_stmt_error (statement));
806     GNUNET_MYSQL_statements_invalidate (plugin->mc);
807     return;
808   }
809   while (0 == (ret = mysql_stmt_fetch (statement)))
810   {
811     if (sizeof (GNUNET_HashCode) == length)
812       proc (proc_cls, &key, 1);    
813   }
814   if (ret != MYSQL_NO_DATA)
815   {
816     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
817                 _("`%s' failed at %s:%d with error: %s\n"),
818                      "mysql_stmt_fetch", __FILE__, __LINE__,
819                      mysql_stmt_error (statement));    
820     GNUNET_MYSQL_statements_invalidate (plugin->mc);
821     return;
822   }
823   mysql_stmt_reset (statement);
824 }
825
826
827 /**
828  * Context for 'expi_proc' function.
829  */
830 struct ExpiCtx
831 {
832
833   /**
834    * Plugin handle.
835    */
836   struct Plugin *plugin;
837
838   /**
839    * Function to call for the result (or the NULL).
840    */
841   PluginDatumProcessor proc;
842
843   /**
844    * Closure for proc.
845    */
846   void *proc_cls;
847 };
848
849
850
851 /**
852  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
853  * If no expired value was found, we do a second query for
854  * low-priority content.
855  *
856  * @param cls closure
857  * @param key key for the content
858  * @param size number of bytes in data
859  * @param data content stored
860  * @param type type of the content
861  * @param priority priority of the content
862  * @param anonymity anonymity-level for the content
863  * @param expiration expiration time for the content
864  * @param uid unique identifier for the datum;
865  *        maybe 0 if no unique identifier is available
866  *
867  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
868  *         (continue on call to "next", of course),
869  *         GNUNET_NO to delete the item and continue (if supported)
870  */
871 static int
872 expi_proc (void *cls, const GNUNET_HashCode * key, uint32_t size,
873            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
874            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
875            uint64_t uid)
876 {
877   struct ExpiCtx *rc = cls;
878   struct Plugin *plugin = rc->plugin;
879
880   if (NULL == key)
881   {
882     execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
883                     -1);
884     return GNUNET_SYSERR;
885   }
886   return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
887                    expiration, uid);
888 }
889
890
891 /**
892  * Get a random item for expiration.
893  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
894  *
895  * @param cls closure
896  * @param proc function to call the value (once only).
897  * @param proc_cls closure for proc
898  */
899 static void
900 mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
901                              void *proc_cls)
902 {
903   struct Plugin *plugin = cls;
904   long long nt;
905   struct ExpiCtx rc;
906
907   rc.plugin = plugin;
908   rc.proc = proc;
909   rc.proc_cls = proc_cls;
910   nt = (long long) GNUNET_TIME_absolute_get ().abs_value;
911   execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
912                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
913
914 }
915
916
917 /**
918  * Drop database.
919  *
920  * @param cls the "struct Plugin*"
921  */
922 static void
923 mysql_plugin_drop (void *cls)
924 {
925   struct Plugin *plugin = cls;
926
927   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
928     return;                     /* error */
929   plugin->env->duc (plugin->env->cls, 0);
930 }
931
932
933 /**
934  * Entry point for the plugin.
935  *
936  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
937  * @return our "struct Plugin*"
938  */
939 void *
940 libgnunet_plugin_datastore_mysql_init (void *cls)
941 {
942   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
943   struct GNUNET_DATASTORE_PluginFunctions *api;
944   struct Plugin *plugin;
945
946   plugin = GNUNET_malloc (sizeof (struct Plugin));
947   plugin->env = env;
948   plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
949   if (NULL == plugin->mc)
950   {
951     GNUNET_free (plugin);
952     return NULL;
953   }
954 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
955 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
956   if (MRUNS
957       ("CREATE TABLE IF NOT EXISTS gn090 ("
958        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
959        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
960        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
961        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
962        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
963        " rvalue BIGINT UNSIGNED NOT NULL,"
964        " hash BINARY(64) NOT NULL DEFAULT '',"
965        " vhash BINARY(64) NOT NULL DEFAULT '',"
966        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
967        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
968        " INDEX idx_hash_uid (hash(64),uid),"
969        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
970        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
971        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
972        " INDEX idx_expire (expire),"
973        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
974        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
975       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
976       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
977       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
978       PINIT (plugin->select_entry_by_hash_and_vhash,
979              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
980       PINIT (plugin->select_entry_by_hash_and_type,
981              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
982       PINIT (plugin->select_entry_by_hash_vhash_and_type,
983              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
984       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
985       PINIT (plugin->get_size, SELECT_SIZE) ||
986       PINIT (plugin->count_entry_by_hash_and_vhash,
987              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
988       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
989       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
990                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
991       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
992       PINIT (plugin->dec_repl, DEC_REPL) ||
993       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
994       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
995       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
996       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
997       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
998       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
999   {
1000     GNUNET_MYSQL_context_destroy (plugin->mc);
1001     GNUNET_free (plugin);
1002     return NULL;
1003   }
1004 #undef PINIT
1005 #undef MRUNS
1006
1007   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1008   api->cls = plugin;
1009   api->estimate_size = &mysql_plugin_estimate_size;
1010   api->put = &mysql_plugin_put;
1011   api->update = &mysql_plugin_update;
1012   api->get_key = &mysql_plugin_get_key;
1013   api->get_replication = &mysql_plugin_get_replication;
1014   api->get_expiration = &mysql_plugin_get_expiration;
1015   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1016   api->get_keys = &mysql_plugin_get_keys;
1017   api->drop = &mysql_plugin_drop;
1018   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1019                    _("Mysql database running\n"));
1020   return api;
1021 }
1022
1023
1024 /**
1025  * Exit point from the plugin.
1026  * @param cls our "struct Plugin*"
1027  * @return always NULL
1028  */
1029 void *
1030 libgnunet_plugin_datastore_mysql_done (void *cls)
1031 {
1032   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1033   struct Plugin *plugin = api->cls;
1034
1035   GNUNET_MYSQL_context_destroy (plugin->mc);
1036   GNUNET_free (plugin);
1037   GNUNET_free (api);
1038   return NULL;
1039 }
1040
1041 /* end of plugin_datastore_mysql.c */