use enum GNUNET_ATS_Network_Type instead of uint32_t where appropriate
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include "gnunet_mysql_lib.h"
123
124
125 #define MAX_DATUM_SIZE 65536
126
127
128 /**
129  * Context for all functions in this plugin.
130  */
131 struct Plugin
132 {
133   /**
134    * Our execution environment.
135    */
136   struct GNUNET_DATASTORE_PluginEnvironment *env;
137
138   /**
139    * Handle to talk to MySQL.
140    */
141   struct GNUNET_MYSQL_Context *mc;
142
143   /**
144    * Prepared statements.
145    */
146 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
147   struct GNUNET_MYSQL_StatementHandle *insert_entry;
148
149 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
150   struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
151
152 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
153   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
154
155 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
156   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
157
158 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
159   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
160
161 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
162   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
163
164 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
165   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
166
167 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
168   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
169
170 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
171   struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
172
173 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
174   struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
175
176 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
177   struct GNUNET_MYSQL_StatementHandle *update_entry;
178
179 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
180   struct GNUNET_MYSQL_StatementHandle *dec_repl;
181
182 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
183   struct GNUNET_MYSQL_StatementHandle *get_size;
184
185 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
186    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
187    "WHERE anonLevel=0 AND type=? AND "\
188    "(rvalue >= ? OR"\
189    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
190    "ORDER BY rvalue ASC LIMIT 1"
191   struct GNUNET_MYSQL_StatementHandle *zero_iter;
192
193 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
194   struct GNUNET_MYSQL_StatementHandle *select_expiration;
195
196 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
197   struct GNUNET_MYSQL_StatementHandle *select_priority;
198
199 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
200   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
201   "WHERE repl=? AND "\
202   " (rvalue>=? OR"\
203   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
204   "ORDER BY rvalue ASC "\
205   "LIMIT 1"
206   struct GNUNET_MYSQL_StatementHandle *select_replication;
207
208 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
209   struct GNUNET_MYSQL_StatementHandle *max_repl;
210
211 #define GET_ALL_KEYS "SELECT hash from gn090"
212   struct GNUNET_MYSQL_StatementHandle *get_all_keys;
213
214 };
215
216
217 /**
218  * Delete an entry from the gn090 table.
219  *
220  * @param plugin plugin context
221  * @param uid unique ID of the entry to delete
222  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
223  */
224 static int
225 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
226 {
227   int ret;
228
229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n",
230               uid);
231   ret = GNUNET_MYSQL_statement_run_prepared (plugin->mc,
232                                              plugin->delete_entry_by_uid, NULL,
233                                              MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
234   if (ret >= 0)
235     return GNUNET_OK;
236   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
237               "Deleting value %llu from gn090 table failed\n", uid);
238   return ret;
239 }
240
241
242 /**
243  * Get an estimate of how much space the database is
244  * currently using.
245  *
246  * @param cls our "struct Plugin *"
247  * @return number of bytes used on disk
248  */
249 static unsigned long long
250 mysql_plugin_estimate_size (void *cls)
251 {
252   struct Plugin *plugin = cls;
253   MYSQL_BIND cbind[1];
254   long long total;
255
256   memset (cbind, 0, sizeof (cbind));
257   total = 0;
258   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
259   cbind[0].buffer = &total;
260   cbind[0].is_unsigned = GNUNET_NO;
261   if (GNUNET_OK !=
262       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->get_size, 1, cbind, NULL, NULL, -1))
263     return 0;
264   return total;
265 }
266
267
268 /**
269  * Store an item in the datastore.
270  *
271  * @param cls closure
272  * @param key key for the item
273  * @param size number of bytes in data
274  * @param data content stored
275  * @param type type of the content
276  * @param priority priority of the content
277  * @param anonymity anonymity-level for the content
278  * @param replication replication-level for the content
279  * @param expiration expiration time for the content
280  * @param msg set to error message
281  * @return GNUNET_OK on success
282  */
283 static int
284 mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
285                   const void *data, enum GNUNET_BLOCK_Type type,
286                   uint32_t priority, uint32_t anonymity, uint32_t replication,
287                   struct GNUNET_TIME_Absolute expiration, char **msg)
288 {
289   struct Plugin *plugin = cls;
290   unsigned int irepl = replication;
291   unsigned int ipriority = priority;
292   unsigned int ianonymity = anonymity;
293   unsigned long long lexpiration = expiration.abs_value_us;
294   unsigned long long lrvalue =
295       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
296                                                      UINT64_MAX);
297   unsigned long hashSize;
298   unsigned long hashSize2;
299   unsigned long lsize;
300   struct GNUNET_HashCode vhash;
301
302   if (size > MAX_DATUM_SIZE)
303   {
304     GNUNET_break (0);
305     return GNUNET_SYSERR;
306   }
307   hashSize = sizeof (struct GNUNET_HashCode);
308   hashSize2 = sizeof (struct GNUNET_HashCode);
309   lsize = size;
310   GNUNET_CRYPTO_hash (data, size, &vhash);
311   if (GNUNET_OK !=
312       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->insert_entry, NULL,
313                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
314                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
315                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
316                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
317                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
318                               MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
319                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
320                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
321                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
322     return GNUNET_SYSERR;
323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324               "Inserted value `%s' with size %u into gn090 table\n",
325               GNUNET_h2s (key), (unsigned int) size);
326   if (size > 0)
327     plugin->env->duc (plugin->env->cls, size);
328   return GNUNET_OK;
329 }
330
331
332 /**
333  * Update the priority for a particular key in the datastore.  If
334  * the expiration time in value is different than the time found in
335  * the datastore, the higher value should be kept.  For the
336  * anonymity level, the lower value is to be used.  The specified
337  * priority should be added to the existing priority, ignoring the
338  * priority in value.
339  *
340  * Note that it is possible for multiple values to match this put.
341  * In that case, all of the respective values are updated.
342  *
343  * @param cls our "struct Plugin*"
344  * @param uid unique identifier of the datum
345  * @param delta by how much should the priority
346  *     change?  If priority + delta < 0 the
347  *     priority should be set to 0 (never go
348  *     negative).
349  * @param expire new expiration time should be the
350  *     MAX of any existing expiration time and
351  *     this value
352  * @param msg set to error message
353  * @return GNUNET_OK on success
354  */
355 static int
356 mysql_plugin_update (void *cls, uint64_t uid, int delta,
357                      struct GNUNET_TIME_Absolute expire, char **msg)
358 {
359   struct Plugin *plugin = cls;
360   unsigned long long vkey = uid;
361   unsigned long long lexpire = expire.abs_value_us;
362   int ret;
363
364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365               "Updating value %llu adding %d to priority and maxing exp at %s\n",
366               vkey, delta,
367               GNUNET_STRINGS_absolute_time_to_string (expire));
368   ret =
369     GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->update_entry, NULL,
370                                          MYSQL_TYPE_LONG, &delta, GNUNET_NO,
371                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
372                               MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
373                               MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
374   if (ret != GNUNET_OK)
375   {
376     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
377                 vkey);
378   }
379   return ret;
380 }
381
382
383 /**
384  * Run the given select statement and call 'proc' on the resulting
385  * values (which must be in particular positions).
386  *
387  * @param plugin the plugin handle
388  * @param stmt select statement to run
389  * @param proc function to call on result
390  * @param proc_cls closure for proc
391  * @param ... arguments to initialize stmt
392  */
393 static void
394 execute_select (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
395                 PluginDatumProcessor proc, void *proc_cls, ...)
396 {
397   va_list ap;
398   int ret;
399   unsigned int type;
400   unsigned int priority;
401   unsigned int anonymity;
402   unsigned long long exp;
403   unsigned long hashSize;
404   unsigned long size;
405   unsigned long long uid;
406   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
407   struct GNUNET_HashCode key;
408   struct GNUNET_TIME_Absolute expiration;
409   MYSQL_BIND rbind[7];
410
411   hashSize = sizeof (struct GNUNET_HashCode);
412   memset (rbind, 0, sizeof (rbind));
413   rbind[0].buffer_type = MYSQL_TYPE_LONG;
414   rbind[0].buffer = &type;
415   rbind[0].is_unsigned = 1;
416   rbind[1].buffer_type = MYSQL_TYPE_LONG;
417   rbind[1].buffer = &priority;
418   rbind[1].is_unsigned = 1;
419   rbind[2].buffer_type = MYSQL_TYPE_LONG;
420   rbind[2].buffer = &anonymity;
421   rbind[2].is_unsigned = 1;
422   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
423   rbind[3].buffer = &exp;
424   rbind[3].is_unsigned = 1;
425   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
426   rbind[4].buffer = &key;
427   rbind[4].buffer_length = hashSize;
428   rbind[4].length = &hashSize;
429   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
430   rbind[5].buffer = value;
431   rbind[5].buffer_length = size = sizeof (value);
432   rbind[5].length = &size;
433   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
434   rbind[6].buffer = &uid;
435   rbind[6].is_unsigned = 1;
436
437   va_start (ap, proc_cls);
438   ret = GNUNET_MYSQL_statement_run_prepared_select_va (plugin->mc, stmt, 7, rbind, NULL, NULL, ap);
439   va_end (ap);
440   if (ret <= 0)
441   {
442     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
443     return;
444   }
445   GNUNET_assert (size <= sizeof (value));
446   if ((rbind[4].buffer_length != sizeof (struct GNUNET_HashCode)) ||
447       (hashSize != sizeof (struct GNUNET_HashCode)))
448   {
449     GNUNET_break (0);
450     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
451     return;
452   }
453   expiration.abs_value_us = exp;
454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %s selecting from gn090 table\n",
456               (unsigned int) size, GNUNET_h2s (&key),
457               priority, anonymity,
458               GNUNET_STRINGS_absolute_time_to_string (expiration));
459   GNUNET_assert (size < MAX_DATUM_SIZE);
460   ret =
461       proc (proc_cls, &key, size, value, type, priority, anonymity, expiration,
462             uid);
463   if (ret == GNUNET_NO)
464   {
465     do_delete_entry (plugin, uid);
466     if (size != 0)
467       plugin->env->duc (plugin->env->cls, -size);
468   }
469 }
470
471
472
473 /**
474  * Get one of the results for a particular key in the datastore.
475  *
476  * @param cls closure
477  * @param offset offset of the result (modulo num-results);
478  *               specific ordering does not matter for the offset
479  * @param key key to match, never NULL
480  * @param vhash hash of the value, maybe NULL (to
481  *        match all values that have the right key).
482  *        Note that for DBlocks there is no difference
483  *        betwen key and vhash, but for other blocks
484  *        there may be!
485  * @param type entries of which type are relevant?
486  *     Use 0 for any type.
487  * @param proc function to call on the matching value,
488  *        with NULL for if no value matches
489  * @param proc_cls closure for proc
490  */
491 static void
492 mysql_plugin_get_key (void *cls, uint64_t offset, const struct GNUNET_HashCode * key,
493                       const struct GNUNET_HashCode * vhash,
494                       enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
495                       void *proc_cls)
496 {
497   struct Plugin *plugin = cls;
498   int ret;
499   MYSQL_BIND cbind[1];
500   long long total;
501   unsigned long hashSize;
502   unsigned long hashSize2;
503   unsigned long long off;
504
505   GNUNET_assert (key != NULL);
506   GNUNET_assert (NULL != proc);
507   hashSize = sizeof (struct GNUNET_HashCode);
508   hashSize2 = sizeof (struct GNUNET_HashCode);
509   memset (cbind, 0, sizeof (cbind));
510   total = -1;
511   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
512   cbind[0].buffer = &total;
513   cbind[0].is_unsigned = GNUNET_NO;
514   if (type != 0)
515   {
516     if (vhash != NULL)
517     {
518       ret =
519         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
520                                          plugin->
521                                          count_entry_by_hash_vhash_and_type, 1,
522                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
523                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
524                                          hashSize2, &hashSize2, MYSQL_TYPE_LONG,
525                                          &type, GNUNET_YES, -1);
526     }
527     else
528     {
529       ret =
530         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
531                                          plugin->count_entry_by_hash_and_type,
532                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
533                                          hashSize, &hashSize, MYSQL_TYPE_LONG,
534                                          &type, GNUNET_YES, -1);
535     }
536   }
537   else
538   {
539     if (vhash != NULL)
540     {
541       ret =
542         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc,
543                                          plugin->count_entry_by_hash_and_vhash,
544                                                     1, cbind, NULL, NULL, MYSQL_TYPE_BLOB, key,
545                                          hashSize, &hashSize, MYSQL_TYPE_BLOB,
546                                          vhash, hashSize2, &hashSize2, -1);
547
548     }
549     else
550     {
551       ret =
552         GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->count_entry_by_hash, 1,
553                                                     cbind, NULL, NULL, MYSQL_TYPE_BLOB, key, hashSize,
554                                          &hashSize, -1);
555     }
556   }
557   if ((ret != GNUNET_OK) || (0 >= total))
558   {
559     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
560     return;
561   }
562   offset = offset % total;
563   off = (unsigned long long) offset;
564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565               "Obtaining %llu/%lld result for GET `%s'\n", off, total,
566               GNUNET_h2s (key));
567   if (type != GNUNET_BLOCK_TYPE_ANY)
568   {
569     if (NULL != vhash)
570     {
571       execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, proc,
572                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
573                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
574                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
575                       &off, GNUNET_YES, -1);
576     }
577     else
578     {
579       execute_select (plugin, plugin->select_entry_by_hash_and_type, proc,
580                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
581                       MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
582                       &off, GNUNET_YES, -1);
583     }
584   }
585   else
586   {
587     if (NULL != vhash)
588     {
589       execute_select (plugin, plugin->select_entry_by_hash_and_vhash, proc,
590                       proc_cls, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
591                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
592                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
593     }
594     else
595     {
596       execute_select (plugin, plugin->select_entry_by_hash, proc, proc_cls,
597                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
598                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
599     }
600   }
601 }
602
603
604 /**
605  * Get a zero-anonymity datum from the datastore.
606  *
607  * @param cls our "struct Plugin*"
608  * @param offset offset of the result
609  * @param type entries of which type should be considered?
610  *        Use 0 for any type.
611  * @param proc function to call on a matching value or NULL
612  * @param proc_cls closure for iter
613  */
614 static void
615 mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
616                                  enum GNUNET_BLOCK_Type type,
617                                  PluginDatumProcessor proc, void *proc_cls)
618 {
619   struct Plugin *plugin = cls;
620   unsigned long long rvalue =
621       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
622                                                      UINT64_MAX);
623
624   execute_select (plugin, plugin->zero_iter, proc, proc_cls, MYSQL_TYPE_LONG,
625                   &type, GNUNET_YES, MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
626                   MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONGLONG,
627                   &rvalue, GNUNET_YES, -1);
628 }
629
630
631 /**
632  * Context for 'repl_proc' function.
633  */
634 struct ReplCtx
635 {
636
637   /**
638    * Plugin handle.
639    */
640   struct Plugin *plugin;
641
642   /**
643    * Function to call for the result (or the NULL).
644    */
645   PluginDatumProcessor proc;
646
647   /**
648    * Closure for proc.
649    */
650   void *proc_cls;
651 };
652
653
654 /**
655  * Wrapper for the processor for 'mysql_plugin_get_replication'.
656  * Decrements the replication counter and calls the original
657  * iterator.
658  *
659  * @param cls closure
660  * @param key key for the content
661  * @param size number of bytes in data
662  * @param data content stored
663  * @param type type of the content
664  * @param priority priority of the content
665  * @param anonymity anonymity-level for the content
666  * @param expiration expiration time for the content
667  * @param uid unique identifier for the datum;
668  *        maybe 0 if no unique identifier is available
669  *
670  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
671  *         (continue on call to "next", of course),
672  *         GNUNET_NO to delete the item and continue (if supported)
673  */
674 static int
675 repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
676            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
677            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
678            uint64_t uid)
679 {
680   struct ReplCtx *rc = cls;
681   struct Plugin *plugin = rc->plugin;
682   unsigned long long oid;
683   int ret;
684   int iret;
685
686   ret =
687       rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
688                 expiration, uid);
689   if (NULL != key)
690   {
691     oid = (unsigned long long) uid;
692     iret =
693       GNUNET_MYSQL_statement_run_prepared (plugin->mc, plugin->dec_repl, NULL,
694                                            MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
695     if (iret == GNUNET_SYSERR)
696     {
697       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
698                   "Failed to reduce replication counter\n");
699       return GNUNET_SYSERR;
700     }
701   }
702   return ret;
703 }
704
705
706 /**
707  * Get a random item for replication.  Returns a single, not expired,
708  * random item from those with the highest replication counters.  The
709  * item's replication counter is decremented by one IF it was positive
710  * before.  Call 'proc' with all values ZERO or NULL if the datastore
711  * is empty.
712  *
713  * @param cls closure
714  * @param proc function to call the value (once only).
715  * @param proc_cls closure for proc
716  */
717 static void
718 mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
719                               void *proc_cls)
720 {
721   struct Plugin *plugin = cls;
722   struct ReplCtx rc;
723   unsigned long long rvalue;
724   unsigned long repl;
725   MYSQL_BIND results;
726
727   rc.plugin = plugin;
728   rc.proc = proc;
729   rc.proc_cls = proc_cls;
730   memset (&results, 0, sizeof (results));
731   results.buffer_type = MYSQL_TYPE_LONG;
732   results.buffer = &repl;
733   results.is_unsigned = GNUNET_YES;
734
735   if (1 !=
736       GNUNET_MYSQL_statement_run_prepared_select (plugin->mc, plugin->max_repl, 1, &results, NULL, NULL, -1))
737   {
738     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
739     return;
740   }
741
742   rvalue =
743       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
744                                                      UINT64_MAX);
745   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
746                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
747                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
748                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
749
750 }
751
752
753 /**
754  * Get all of the keys in the datastore.
755  *
756  * @param cls closure
757  * @param proc function to call on each key
758  * @param proc_cls closure for proc
759  */
760 static void
761 mysql_plugin_get_keys (void *cls,
762                         PluginKeyProcessor proc,
763                         void *proc_cls)
764 {
765   struct Plugin *plugin = cls;
766   const char *query = "SELECT hash FROM gn090";
767   int ret;
768   MYSQL_STMT *statement;
769   struct GNUNET_HashCode key;
770   MYSQL_BIND cbind[1];
771   unsigned long length;
772
773   statement = GNUNET_MYSQL_statement_get_stmt (plugin->mc,
774                                                plugin->get_all_keys);
775   if (statement == NULL)
776   {
777     GNUNET_MYSQL_statements_invalidate (plugin->mc);
778     return;
779   }
780   if (mysql_stmt_prepare (statement, query, strlen (query)))
781   {
782     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
783                      _("Failed to prepare statement `%s'\n"), query);
784     GNUNET_MYSQL_statements_invalidate (plugin->mc);
785     return;
786   }
787   GNUNET_assert (proc != NULL);
788   if (mysql_stmt_execute (statement))
789   {
790     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
791                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
792                 "mysql_stmt_execute", query, __FILE__, __LINE__,
793                 mysql_stmt_error (statement));
794     GNUNET_MYSQL_statements_invalidate (plugin->mc);
795     return;
796   }
797   memset (cbind, 0, sizeof (cbind));
798   cbind[0].buffer_type = MYSQL_TYPE_BLOB;
799   cbind[0].buffer = &key;
800   cbind[0].buffer_length = sizeof (key);
801   cbind[0].length = &length;
802   cbind[0].is_unsigned = GNUNET_NO;
803   if (mysql_stmt_bind_result (statement, cbind))
804   {
805     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
806                 _("`%s' failed at %s:%d with error: %s\n"),
807                 "mysql_stmt_bind_result", __FILE__, __LINE__,
808                 mysql_stmt_error (statement));
809     GNUNET_MYSQL_statements_invalidate (plugin->mc);
810     return;
811   }
812   while (0 == (ret = mysql_stmt_fetch (statement)))
813   {
814     if (sizeof (struct GNUNET_HashCode) == length)
815       proc (proc_cls, &key, 1);
816   }
817   if (ret != MYSQL_NO_DATA)
818   {
819     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
820                 _("`%s' failed at %s:%d with error: %s\n"),
821                      "mysql_stmt_fetch", __FILE__, __LINE__,
822                      mysql_stmt_error (statement));
823     GNUNET_MYSQL_statements_invalidate (plugin->mc);
824     return;
825   }
826   mysql_stmt_reset (statement);
827 }
828
829
830 /**
831  * Context for 'expi_proc' function.
832  */
833 struct ExpiCtx
834 {
835
836   /**
837    * Plugin handle.
838    */
839   struct Plugin *plugin;
840
841   /**
842    * Function to call for the result (or the NULL).
843    */
844   PluginDatumProcessor proc;
845
846   /**
847    * Closure for proc.
848    */
849   void *proc_cls;
850 };
851
852
853
854 /**
855  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
856  * If no expired value was found, we do a second query for
857  * low-priority content.
858  *
859  * @param cls closure
860  * @param key key for the content
861  * @param size number of bytes in data
862  * @param data content stored
863  * @param type type of the content
864  * @param priority priority of the content
865  * @param anonymity anonymity-level for the content
866  * @param expiration expiration time for the content
867  * @param uid unique identifier for the datum;
868  *        maybe 0 if no unique identifier is available
869  *
870  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
871  *         (continue on call to "next", of course),
872  *         GNUNET_NO to delete the item and continue (if supported)
873  */
874 static int
875 expi_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
876            const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
877            uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
878            uint64_t uid)
879 {
880   struct ExpiCtx *rc = cls;
881   struct Plugin *plugin = rc->plugin;
882
883   if (NULL == key)
884   {
885     execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
886                     -1);
887     return GNUNET_SYSERR;
888   }
889   return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
890                    expiration, uid);
891 }
892
893
894 /**
895  * Get a random item for expiration.
896  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
897  *
898  * @param cls closure
899  * @param proc function to call the value (once only).
900  * @param proc_cls closure for proc
901  */
902 static void
903 mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
904                              void *proc_cls)
905 {
906   struct Plugin *plugin = cls;
907   long long nt;
908   struct ExpiCtx rc;
909
910   rc.plugin = plugin;
911   rc.proc = proc;
912   rc.proc_cls = proc_cls;
913   nt = (long long) GNUNET_TIME_absolute_get ().abs_value_us;
914   execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
915                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
916
917 }
918
919
920 /**
921  * Drop database.
922  *
923  * @param cls the "struct Plugin*"
924  */
925 static void
926 mysql_plugin_drop (void *cls)
927 {
928   struct Plugin *plugin = cls;
929
930   if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
931     return;                     /* error */
932   plugin->env->duc (plugin->env->cls, 0);
933 }
934
935
936 /**
937  * Entry point for the plugin.
938  *
939  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
940  * @return our "struct Plugin*"
941  */
942 void *
943 libgnunet_plugin_datastore_mysql_init (void *cls)
944 {
945   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
946   struct GNUNET_DATASTORE_PluginFunctions *api;
947   struct Plugin *plugin;
948
949   plugin = GNUNET_malloc (sizeof (struct Plugin));
950   plugin->env = env;
951   plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
952   if (NULL == plugin->mc)
953   {
954     GNUNET_free (plugin);
955     return NULL;
956   }
957 #define MRUNS(a) (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, a) )
958 #define PINIT(a,b) (NULL == (a = GNUNET_MYSQL_statement_prepare (plugin->mc, b)))
959   if (MRUNS
960       ("CREATE TABLE IF NOT EXISTS gn090 ("
961        " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
962        " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
963        " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
964        " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
965        " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
966        " rvalue BIGINT UNSIGNED NOT NULL,"
967        " hash BINARY(64) NOT NULL DEFAULT '',"
968        " vhash BINARY(64) NOT NULL DEFAULT '',"
969        " value BLOB NOT NULL DEFAULT ''," " uid BIGINT NOT NULL AUTO_INCREMENT,"
970        " PRIMARY KEY (uid)," " INDEX idx_hash (hash(64)),"
971        " INDEX idx_hash_uid (hash(64),uid),"
972        " INDEX idx_hash_vhash (hash(64),vhash(64)),"
973        " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
974        " INDEX idx_prio (prio)," " INDEX idx_repl_rvalue (repl,rvalue),"
975        " INDEX idx_expire (expire),"
976        " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
977        ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
978       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
979       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
980       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
981       PINIT (plugin->select_entry_by_hash_and_vhash,
982              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
983       PINIT (plugin->select_entry_by_hash_and_type,
984              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
985       PINIT (plugin->select_entry_by_hash_vhash_and_type,
986              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
987       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
988       PINIT (plugin->get_size, SELECT_SIZE) ||
989       PINIT (plugin->count_entry_by_hash_and_vhash,
990              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
991       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
992       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
993                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
994       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
995       PINIT (plugin->dec_repl, DEC_REPL) ||
996       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
997       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
998       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
999       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1000       PINIT (plugin->get_all_keys, GET_ALL_KEYS) ||
1001       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1002   {
1003     GNUNET_MYSQL_context_destroy (plugin->mc);
1004     GNUNET_free (plugin);
1005     return NULL;
1006   }
1007 #undef PINIT
1008 #undef MRUNS
1009
1010   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1011   api->cls = plugin;
1012   api->estimate_size = &mysql_plugin_estimate_size;
1013   api->put = &mysql_plugin_put;
1014   api->update = &mysql_plugin_update;
1015   api->get_key = &mysql_plugin_get_key;
1016   api->get_replication = &mysql_plugin_get_replication;
1017   api->get_expiration = &mysql_plugin_get_expiration;
1018   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1019   api->get_keys = &mysql_plugin_get_keys;
1020   api->drop = &mysql_plugin_drop;
1021   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql",
1022                    _("Mysql database running\n"));
1023   return api;
1024 }
1025
1026
1027 /**
1028  * Exit point from the plugin.
1029  * @param cls our "struct Plugin*"
1030  * @return always NULL
1031  */
1032 void *
1033 libgnunet_plugin_datastore_mysql_done (void *cls)
1034 {
1035   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1036   struct Plugin *plugin = api->cls;
1037
1038   GNUNET_MYSQL_context_destroy (plugin->mc);
1039   GNUNET_free (plugin);
1040   GNUNET_free (api);
1041   return NULL;
1042 }
1043
1044 /* end of plugin_datastore_mysql.c */