indentation
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163
164 /**
165  * Context for all functions in this plugin.
166  */
167 struct Plugin
168 {
169   /**
170    * Our execution environment.
171    */
172   struct GNUNET_DATASTORE_PluginEnvironment *env;
173
174   /**
175    * Handle to talk to MySQL.
176    */
177   MYSQL *dbf;
178
179   /**
180    * We keep all prepared statements in a DLL.  This is the head.
181    */
182   struct GNUNET_MysqlStatementHandle *shead;
183
184   /**
185    * We keep all prepared statements in a DLL.  This is the tail.
186    */
187   struct GNUNET_MysqlStatementHandle *stail;
188
189   /**
190    * Filename of "my.cnf" (msyql configuration).
191    */
192   char *cnffile;
193
194   /**
195    * Prepared statements.
196    */
197 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
198   struct GNUNET_MysqlStatementHandle *insert_entry;
199
200 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
201   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
202
203 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
204   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
205
206 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
207   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
208
209 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
210   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
211
212 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
213   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
214
215 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
216   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
217
218 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
219   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
220
221 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
222   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
223
224 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
225   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
226
227 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
228   struct GNUNET_MysqlStatementHandle *update_entry;
229
230 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
231   struct GNUNET_MysqlStatementHandle *dec_repl;
232
233 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
234   struct GNUNET_MysqlStatementHandle *get_size;
235
236 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
237    "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
238    "WHERE anonLevel=0 AND type=? AND "\
239    "(rvalue >= ? OR"\
240    "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
241    "ORDER BY rvalue ASC LIMIT 1"
242   struct GNUNET_MysqlStatementHandle *zero_iter;
243
244 #define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
245   struct GNUNET_MysqlStatementHandle *select_expiration;
246
247 #define SELECT_IT_PRIORITY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_prio) ORDER BY prio ASC LIMIT 1"
248   struct GNUNET_MysqlStatementHandle *select_priority;
249
250 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid "\
251   "FROM gn090 FORCE INDEX (idx_repl_rvalue) "\
252   "WHERE repl=? AND "\
253   " (rvalue>=? OR"\
254   "  NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_repl_rvalue) WHERE repl=? AND rvalue>=?)) "\
255   "ORDER BY rvalue ASC "\
256   "LIMIT 1"
257   struct GNUNET_MysqlStatementHandle *select_replication;
258
259 #define SELECT_MAX_REPL "SELECT MAX(repl) FROM gn090"
260   struct GNUNET_MysqlStatementHandle *max_repl;
261
262 };
263
264
265 /**
266  * Obtain the location of ".my.cnf".
267  *
268  * @param cfg our configuration
269  * @return NULL on error
270  */
271 static char *
272 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
273 {
274   char *cnffile;
275   char *home_dir;
276   struct stat st;
277
278 #ifndef WINDOWS
279   struct passwd *pw;
280 #endif
281   int configured;
282
283 #ifndef WINDOWS
284   pw = getpwuid (getuid ());
285   if (!pw)
286   {
287     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "getpwuid");
288     return NULL;
289   }
290   if (GNUNET_YES ==
291       GNUNET_CONFIGURATION_have_value (cfg, "datastore-mysql", "CONFIG"))
292   {
293     GNUNET_assert (GNUNET_OK ==
294                    GNUNET_CONFIGURATION_get_value_filename (cfg,
295                                                             "datastore-mysql",
296                                                             "CONFIG",
297                                                             &cnffile));
298     configured = GNUNET_YES;
299   }
300   else
301   {
302     home_dir = GNUNET_strdup (pw->pw_dir);
303     GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
304     GNUNET_free (home_dir);
305     configured = GNUNET_NO;
306   }
307 #else
308   home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
309   plibc_conv_to_win_path ("~/", home_dir);
310   GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
311   GNUNET_free (home_dir);
312   configured = GNUNET_NO;
313 #endif
314
315   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
316               _("Trying to use file `%s' for MySQL configuration.\n"), cnffile);
317   if ((0 != STAT (cnffile, &st)) ||
318       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
319   {
320     if (configured == GNUNET_YES)
321       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
322                   _("Could not access file `%s': %s\n"), cnffile,
323                   STRERROR (errno));
324     GNUNET_free (cnffile);
325     return NULL;
326   }
327   return cnffile;
328 }
329
330
331 /**
332  * Close database connection and all prepared statements (we got a DB
333  * disconnect error).
334  * 
335  * @param plugin plugin context
336  */
337 static int
338 iclose (struct Plugin *plugin)
339 {
340   struct GNUNET_MysqlStatementHandle *s;
341
342   for (s = plugin->shead; s != NULL; s = s->next)
343   {
344     if (s->valid)
345     {
346       mysql_stmt_close (s->statement);
347       s->valid = GNUNET_NO;
348     }
349   }
350   if (plugin->dbf != NULL)
351   {
352     mysql_close (plugin->dbf);
353     plugin->dbf = NULL;
354   }
355   return GNUNET_OK;
356 }
357
358
359 /**
360  * Open the connection with the database (and initialize
361  * our default options).
362  *
363  * @param plugin plugin context
364  * @return GNUNET_OK on success
365  */
366 static int
367 iopen (struct Plugin *plugin)
368 {
369   char *mysql_dbname;
370   char *mysql_server;
371   char *mysql_user;
372   char *mysql_password;
373   unsigned long long mysql_port;
374   my_bool reconnect;
375   unsigned int timeout;
376
377   plugin->dbf = mysql_init (NULL);
378   if (plugin->dbf == NULL)
379     return GNUNET_SYSERR;
380   if (plugin->cnffile != NULL)
381     mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile);
382   mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
383   reconnect = 0;
384   mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect);
385   timeout = 120;                /* in seconds */
386   mysql_options (plugin->dbf,
387                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
388   mysql_options (plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
389   timeout = 60;                 /* in seconds */
390   mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
391   mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
392   mysql_dbname = NULL;
393   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
394                                                      "datastore-mysql",
395                                                      "DATABASE"))
396     GNUNET_assert (GNUNET_OK ==
397                    GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
398                                                           "datastore-mysql",
399                                                           "DATABASE",
400                                                           &mysql_dbname));
401   else
402     mysql_dbname = GNUNET_strdup ("gnunet");
403   mysql_user = NULL;
404   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
405                                                      "datastore-mysql", "USER"))
406   {
407     GNUNET_assert (GNUNET_OK ==
408                    GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
409                                                           "datastore-mysql",
410                                                           "USER", &mysql_user));
411   }
412   mysql_password = NULL;
413   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
414                                                      "datastore-mysql",
415                                                      "PASSWORD"))
416   {
417     GNUNET_assert (GNUNET_OK ==
418                    GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
419                                                           "datastore-mysql",
420                                                           "PASSWORD",
421                                                           &mysql_password));
422   }
423   mysql_server = NULL;
424   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
425                                                      "datastore-mysql", "HOST"))
426   {
427     GNUNET_assert (GNUNET_OK ==
428                    GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
429                                                           "datastore-mysql",
430                                                           "HOST",
431                                                           &mysql_server));
432   }
433   mysql_port = 0;
434   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
435                                                      "datastore-mysql", "PORT"))
436   {
437     GNUNET_assert (GNUNET_OK ==
438                    GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg,
439                                                           "datastore-mysql",
440                                                           "PORT", &mysql_port));
441   }
442
443   GNUNET_assert (mysql_dbname != NULL);
444   mysql_real_connect (plugin->dbf,
445                       mysql_server,
446                       mysql_user, mysql_password,
447                       mysql_dbname,
448                       (unsigned int) mysql_port, NULL, CLIENT_IGNORE_SIGPIPE);
449   GNUNET_free_non_null (mysql_server);
450   GNUNET_free_non_null (mysql_user);
451   GNUNET_free_non_null (mysql_password);
452   GNUNET_free (mysql_dbname);
453   if (mysql_error (plugin->dbf)[0])
454   {
455     LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_real_connect", plugin);
456     return GNUNET_SYSERR;
457   }
458   return GNUNET_OK;
459 }
460
461
462 /**
463  * Run the given MySQL statement.
464  *
465  * @param plugin plugin context
466  * @param statement SQL statement to run
467  * @return GNUNET_OK on success, GNUNET_SYSERR on error
468  */
469 static int
470 run_statement (struct Plugin *plugin, const char *statement)
471 {
472   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
473     return GNUNET_SYSERR;
474   mysql_query (plugin->dbf, statement);
475   if (mysql_error (plugin->dbf)[0])
476   {
477     LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_query", plugin);
478     iclose (plugin);
479     return GNUNET_SYSERR;
480   }
481   return GNUNET_OK;
482 }
483
484
485 /**
486  * Create a prepared statement.
487  *
488  * @param plugin plugin context
489  * @param statement SQL statement text to prepare
490  * @return NULL on error
491  */
492 static struct GNUNET_MysqlStatementHandle *
493 prepared_statement_create (struct Plugin *plugin, const char *statement)
494 {
495   struct GNUNET_MysqlStatementHandle *ret;
496
497   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
498   ret->query = GNUNET_strdup (statement);
499   GNUNET_CONTAINER_DLL_insert (plugin->shead, plugin->stail, ret);
500   return ret;
501 }
502
503
504 /**
505  * Prepare a statement for running.
506  *
507  * @param plugin plugin context
508  * @param ret handle to prepared statement
509  * @return GNUNET_OK on success
510  */
511 static int
512 prepare_statement (struct Plugin *plugin,
513                    struct GNUNET_MysqlStatementHandle *ret)
514 {
515   if (GNUNET_YES == ret->valid)
516     return GNUNET_OK;
517   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
518     return GNUNET_SYSERR;
519   ret->statement = mysql_stmt_init (plugin->dbf);
520   if (ret->statement == NULL)
521   {
522     iclose (plugin);
523     return GNUNET_SYSERR;
524   }
525   if (mysql_stmt_prepare (ret->statement, ret->query, strlen (ret->query)))
526   {
527     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
528                      "mysql",
529                      _("Failed to prepare statement `%s'\n"), ret->query);
530     LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_stmt_prepare", plugin);
531     mysql_stmt_close (ret->statement);
532     ret->statement = NULL;
533     iclose (plugin);
534     return GNUNET_SYSERR;
535   }
536   ret->valid = GNUNET_YES;
537   return GNUNET_OK;
538
539 }
540
541
542 /**
543  * Bind the parameters for the given MySQL statement
544  * and run it.
545  *
546  * @param plugin plugin context
547  * @param s statement to bind and run
548  * @param ap arguments for the binding
549  * @return GNUNET_SYSERR on error, GNUNET_OK on success
550  */
551 static int
552 init_params (struct Plugin *plugin,
553              struct GNUNET_MysqlStatementHandle *s, va_list ap)
554 {
555   MYSQL_BIND qbind[MAX_PARAM];
556   unsigned int pc;
557   unsigned int off;
558   enum enum_field_types ft;
559
560   pc = mysql_stmt_param_count (s->statement);
561   if (pc > MAX_PARAM)
562   {
563     /* increase internal constant! */
564     GNUNET_break (0);
565     return GNUNET_SYSERR;
566   }
567   memset (qbind, 0, sizeof (qbind));
568   off = 0;
569   ft = 0;
570   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
571   {
572     qbind[off].buffer_type = ft;
573     switch (ft)
574     {
575     case MYSQL_TYPE_FLOAT:
576       qbind[off].buffer = va_arg (ap, float *);
577
578       break;
579     case MYSQL_TYPE_LONGLONG:
580       qbind[off].buffer = va_arg (ap, unsigned long long *);
581       qbind[off].is_unsigned = va_arg (ap, int);
582
583       break;
584     case MYSQL_TYPE_LONG:
585       qbind[off].buffer = va_arg (ap, unsigned int *);
586       qbind[off].is_unsigned = va_arg (ap, int);
587
588       break;
589     case MYSQL_TYPE_VAR_STRING:
590     case MYSQL_TYPE_STRING:
591     case MYSQL_TYPE_BLOB:
592       qbind[off].buffer = va_arg (ap, void *);
593       qbind[off].buffer_length = va_arg (ap, unsigned long);
594       qbind[off].length = va_arg (ap, unsigned long *);
595
596       break;
597     default:
598       /* unsupported type */
599       GNUNET_break (0);
600       return GNUNET_SYSERR;
601     }
602     pc--;
603     off++;
604   }
605   if (!((pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1)))
606   {
607     GNUNET_assert (0);
608     return GNUNET_SYSERR;
609   }
610   if (mysql_stmt_bind_param (s->statement, qbind))
611   {
612     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
613                 _("`%s' failed at %s:%d with error: %s\n"),
614                 "mysql_stmt_bind_param",
615                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
616     iclose (plugin);
617     return GNUNET_SYSERR;
618   }
619   if (mysql_stmt_execute (s->statement))
620   {
621     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
622                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
623                 "mysql_stmt_execute",
624                 s->query, __FILE__, __LINE__, mysql_stmt_error (s->statement));
625     iclose (plugin);
626     return GNUNET_SYSERR;
627   }
628   return GNUNET_OK;
629 }
630
631
632 /**
633  * Run a prepared SELECT statement.
634  *
635  * @param plugin plugin context
636  * @param s statement to run
637  * @param result_size number of elements in results array
638  * @param results pointer to already initialized MYSQL_BIND
639  *        array (of sufficient size) for passing results
640  * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
641  *        values (size + buffer-reference for pointers); terminated
642  *        with "-1"
643  * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result)
644  */
645 static int
646 prepared_statement_run_select_va (struct Plugin *plugin,
647                                   struct GNUNET_MysqlStatementHandle *s,
648                                   unsigned int result_size,
649                                   MYSQL_BIND * results, va_list ap)
650 {
651   int ret;
652   unsigned int rsize;
653
654   if (GNUNET_OK != prepare_statement (plugin, s))
655   {
656     GNUNET_break (0);
657     return GNUNET_SYSERR;
658   }
659   if (GNUNET_OK != init_params (plugin, s, ap))
660   {
661     GNUNET_break (0);
662     return GNUNET_SYSERR;
663   }
664   rsize = mysql_stmt_field_count (s->statement);
665   if (rsize > result_size)
666   {
667     GNUNET_break (0);
668     return GNUNET_SYSERR;
669   }
670   if (mysql_stmt_bind_result (s->statement, results))
671   {
672     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
673                 _("`%s' failed at %s:%d with error: %s\n"),
674                 "mysql_stmt_bind_result",
675                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
676     iclose (plugin);
677     return GNUNET_SYSERR;
678   }
679   ret = mysql_stmt_fetch (s->statement);
680   if (ret == MYSQL_NO_DATA)
681     return GNUNET_NO;
682   if (ret != 0)
683   {
684     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
685                 _("`%s' failed at %s:%d with error: %s\n"),
686                 "mysql_stmt_fetch",
687                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
688     iclose (plugin);
689     return GNUNET_SYSERR;
690   }
691   mysql_stmt_reset (s->statement);
692   return GNUNET_OK;
693 }
694
695
696 /**
697  * Run a prepared SELECT statement.
698  *
699  * @param plugin plugin context
700  * @param s statement to run
701  * @param result_size number of elements in results array
702  * @param results pointer to already initialized MYSQL_BIND
703  *        array (of sufficient size) for passing results
704  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
705  *        values (size + buffer-reference for pointers); terminated
706  *        with "-1"
707  * @return GNUNET_SYSERR on error, otherwise
708  *         the number of successfully affected (or queried) rows
709  */
710 static int
711 prepared_statement_run_select (struct Plugin *plugin,
712                                struct GNUNET_MysqlStatementHandle *s,
713                                unsigned int result_size,
714                                MYSQL_BIND * results, ...)
715 {
716   va_list ap;
717   int ret;
718
719   va_start (ap, results);
720   ret = prepared_statement_run_select_va (plugin, s, result_size, results, ap);
721   va_end (ap);
722   return ret;
723 }
724
725
726 /**
727  * Run a prepared statement that does NOT produce results.
728  *
729  * @param plugin plugin context
730  * @param s statement to run
731  * @param insert_id NULL or address where to store the row ID of whatever
732  *        was inserted (only for INSERT statements!)
733  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
734  *        values (size + buffer-reference for pointers); terminated
735  *        with "-1"
736  * @return GNUNET_SYSERR on error, otherwise
737  *         the number of successfully affected rows
738  */
739 static int
740 prepared_statement_run (struct Plugin *plugin,
741                         struct GNUNET_MysqlStatementHandle *s,
742                         unsigned long long *insert_id, ...)
743 {
744   va_list ap;
745   int affected;
746
747   if (GNUNET_OK != prepare_statement (plugin, s))
748     return GNUNET_SYSERR;
749   va_start (ap, insert_id);
750   if (GNUNET_OK != init_params (plugin, s, ap))
751   {
752     va_end (ap);
753     return GNUNET_SYSERR;
754   }
755   va_end (ap);
756   affected = mysql_stmt_affected_rows (s->statement);
757   if (NULL != insert_id)
758     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
759   mysql_stmt_reset (s->statement);
760   return affected;
761 }
762
763
764 /**
765  * Delete an entry from the gn090 table.
766  *
767  * @param plugin plugin context
768  * @param uid unique ID of the entry to delete
769  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
770  */
771 static int
772 do_delete_entry (struct Plugin *plugin, unsigned long long uid)
773 {
774   int ret;
775
776 #if DEBUG_MYSQL
777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778               "Deleting value %llu from gn090 table\n", uid);
779 #endif
780   ret = prepared_statement_run (plugin,
781                                 plugin->delete_entry_by_uid,
782                                 NULL,
783                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES, -1);
784   if (ret >= 0)
785     return GNUNET_OK;
786   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
787               "Deleting value %llu from gn090 table failed\n", uid);
788   return ret;
789 }
790
791
792 /**
793  * Get an estimate of how much space the database is
794  * currently using.
795  *
796  * @param cls our "struct Plugin *"
797  * @return number of bytes used on disk
798  */
799 static unsigned long long
800 mysql_plugin_estimate_size (void *cls)
801 {
802   struct Plugin *plugin = cls;
803   MYSQL_BIND cbind[1];
804   long long total;
805
806   memset (cbind, 0, sizeof (cbind));
807   total = 0;
808   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
809   cbind[0].buffer = &total;
810   cbind[0].is_unsigned = GNUNET_NO;
811   if (GNUNET_OK !=
812       prepared_statement_run_select (plugin, plugin->get_size, 1, cbind, -1))
813     return 0;
814   return total;
815 }
816
817
818 /**
819  * Store an item in the datastore.
820  *
821  * @param cls closure
822  * @param key key for the item
823  * @param size number of bytes in data
824  * @param data content stored
825  * @param type type of the content
826  * @param priority priority of the content
827  * @param anonymity anonymity-level for the content
828  * @param replication replication-level for the content
829  * @param expiration expiration time for the content
830  * @param msg set to error message
831  * @return GNUNET_OK on success
832  */
833 static int
834 mysql_plugin_put (void *cls,
835                   const GNUNET_HashCode * key,
836                   uint32_t size,
837                   const void *data,
838                   enum GNUNET_BLOCK_Type type,
839                   uint32_t priority,
840                   uint32_t anonymity,
841                   uint32_t replication,
842                   struct GNUNET_TIME_Absolute expiration, char **msg)
843 {
844   struct Plugin *plugin = cls;
845   unsigned int irepl = replication;
846   unsigned int ipriority = priority;
847   unsigned int ianonymity = anonymity;
848   unsigned long long lexpiration = expiration.abs_value;
849   unsigned long long lrvalue =
850       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
851                                                      UINT64_MAX);
852   unsigned long hashSize;
853   unsigned long hashSize2;
854   unsigned long lsize;
855   GNUNET_HashCode vhash;
856
857   if (size > MAX_DATUM_SIZE)
858   {
859     GNUNET_break (0);
860     return GNUNET_SYSERR;
861   }
862   hashSize = sizeof (GNUNET_HashCode);
863   hashSize2 = sizeof (GNUNET_HashCode);
864   lsize = size;
865   GNUNET_CRYPTO_hash (data, size, &vhash);
866   if (GNUNET_OK !=
867       prepared_statement_run (plugin,
868                               plugin->insert_entry,
869                               NULL,
870                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
871                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
872                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
873                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
874                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
875                               MYSQL_TYPE_LONGLONG, &lrvalue, GNUNET_YES,
876                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
877                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
878                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
879     return GNUNET_SYSERR;
880 #if DEBUG_MYSQL
881   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882               "Inserted value `%s' with size %u into gn090 table\n",
883               GNUNET_h2s (key), (unsigned int) size);
884 #endif
885   if (size > 0)
886     plugin->env->duc (plugin->env->cls, size);
887   return GNUNET_OK;
888 }
889
890
891 /**
892  * Update the priority for a particular key in the datastore.  If
893  * the expiration time in value is different than the time found in
894  * the datastore, the higher value should be kept.  For the
895  * anonymity level, the lower value is to be used.  The specified
896  * priority should be added to the existing priority, ignoring the
897  * priority in value.
898  *
899  * Note that it is possible for multiple values to match this put.
900  * In that case, all of the respective values are updated.
901  *
902  * @param cls our "struct Plugin*"
903  * @param uid unique identifier of the datum
904  * @param delta by how much should the priority
905  *     change?  If priority + delta < 0 the
906  *     priority should be set to 0 (never go
907  *     negative).
908  * @param expire new expiration time should be the
909  *     MAX of any existing expiration time and
910  *     this value
911  * @param msg set to error message
912  * @return GNUNET_OK on success
913  */
914 static int
915 mysql_plugin_update (void *cls,
916                      uint64_t uid,
917                      int delta, struct GNUNET_TIME_Absolute expire, char **msg)
918 {
919   struct Plugin *plugin = cls;
920   unsigned long long vkey = uid;
921   unsigned long long lexpire = expire.abs_value;
922   int ret;
923
924 #if DEBUG_MYSQL
925   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
927               vkey, delta, lexpire);
928 #endif
929   ret = prepared_statement_run (plugin,
930                                 plugin->update_entry,
931                                 NULL,
932                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
933                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
934                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
935                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1);
936   if (ret != GNUNET_OK)
937   {
938     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
939                 "Failed to update value %llu\n", vkey);
940   }
941   return ret;
942 }
943
944
945 /**
946  * Run the given select statement and call 'proc' on the resulting
947  * values (which must be in particular positions).
948  *
949  * @param plugin the plugin handle
950  * @param stmt select statement to run
951  * @param proc function to call on result
952  * @param proc_cls closure for proc
953  * @param ... arguments to initialize stmt
954  */
955 static void
956 execute_select (struct Plugin *plugin,
957                 struct GNUNET_MysqlStatementHandle *stmt,
958                 PluginDatumProcessor proc, void *proc_cls, ...)
959 {
960   va_list ap;
961   int ret;
962   unsigned int type;
963   unsigned int priority;
964   unsigned int anonymity;
965   unsigned long long exp;
966   unsigned long hashSize;
967   unsigned long size;
968   unsigned long long uid;
969   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
970   GNUNET_HashCode key;
971   struct GNUNET_TIME_Absolute expiration;
972   MYSQL_BIND rbind[7];
973
974   hashSize = sizeof (GNUNET_HashCode);
975   memset (rbind, 0, sizeof (rbind));
976   rbind[0].buffer_type = MYSQL_TYPE_LONG;
977   rbind[0].buffer = &type;
978   rbind[0].is_unsigned = 1;
979   rbind[1].buffer_type = MYSQL_TYPE_LONG;
980   rbind[1].buffer = &priority;
981   rbind[1].is_unsigned = 1;
982   rbind[2].buffer_type = MYSQL_TYPE_LONG;
983   rbind[2].buffer = &anonymity;
984   rbind[2].is_unsigned = 1;
985   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
986   rbind[3].buffer = &exp;
987   rbind[3].is_unsigned = 1;
988   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
989   rbind[4].buffer = &key;
990   rbind[4].buffer_length = hashSize;
991   rbind[4].length = &hashSize;
992   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
993   rbind[5].buffer = value;
994   rbind[5].buffer_length = size = sizeof (value);
995   rbind[5].length = &size;
996   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
997   rbind[6].buffer = &uid;
998   rbind[6].is_unsigned = 1;
999
1000   va_start (ap, proc_cls);
1001   ret = prepared_statement_run_select_va (plugin, stmt, 7, rbind, ap);
1002   va_end (ap);
1003   if (ret <= 0)
1004   {
1005     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1006     return;
1007   }
1008   GNUNET_assert (size <= sizeof (value));
1009   if ((rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1010       (hashSize != sizeof (GNUNET_HashCode)))
1011   {
1012     GNUNET_break (0);
1013     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1014     return;
1015   }
1016 #if DEBUG_MYSQL
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1019               (unsigned int) size, GNUNET_h2s (&key), priority, anonymity, exp);
1020 #endif
1021   GNUNET_assert (size < MAX_DATUM_SIZE);
1022   expiration.abs_value = exp;
1023   ret = proc (proc_cls,
1024               &key, size, value, type, priority, anonymity, expiration, uid);
1025   if (ret == GNUNET_NO)
1026   {
1027     do_delete_entry (plugin, uid);
1028     if (size != 0)
1029       plugin->env->duc (plugin->env->cls, -size);
1030   }
1031 }
1032
1033
1034
1035 /**
1036  * Get one of the results for a particular key in the datastore.
1037  *
1038  * @param cls closure
1039  * @param offset offset of the result (modulo num-results); 
1040  *               specific ordering does not matter for the offset
1041  * @param key key to match, never NULL
1042  * @param vhash hash of the value, maybe NULL (to
1043  *        match all values that have the right key).
1044  *        Note that for DBlocks there is no difference
1045  *        betwen key and vhash, but for other blocks
1046  *        there may be!
1047  * @param type entries of which type are relevant?
1048  *     Use 0 for any type.
1049  * @param proc function to call on the matching value, 
1050  *        with NULL for if no value matches
1051  * @param proc_cls closure for proc
1052  */
1053 static void
1054 mysql_plugin_get_key (void *cls,
1055                       uint64_t offset,
1056                       const GNUNET_HashCode * key,
1057                       const GNUNET_HashCode * vhash,
1058                       enum GNUNET_BLOCK_Type type,
1059                       PluginDatumProcessor proc, void *proc_cls)
1060 {
1061   struct Plugin *plugin = cls;
1062   int ret;
1063   MYSQL_BIND cbind[1];
1064   long long total;
1065   unsigned long hashSize;
1066   unsigned long hashSize2;
1067   unsigned long long off;
1068
1069   GNUNET_assert (key != NULL);
1070   GNUNET_assert (NULL != proc);
1071   hashSize = sizeof (GNUNET_HashCode);
1072   hashSize2 = sizeof (GNUNET_HashCode);
1073   memset (cbind, 0, sizeof (cbind));
1074   total = -1;
1075   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1076   cbind[0].buffer = &total;
1077   cbind[0].is_unsigned = GNUNET_NO;
1078   if (type != 0)
1079   {
1080     if (vhash != NULL)
1081     {
1082       ret =
1083           prepared_statement_run_select (plugin,
1084                                          plugin->
1085                                          count_entry_by_hash_vhash_and_type, 1,
1086                                          cbind, MYSQL_TYPE_BLOB, key, hashSize,
1087                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
1088                                          hashSize2, &hashSize2, MYSQL_TYPE_LONG,
1089                                          &type, GNUNET_YES, -1);
1090     }
1091     else
1092     {
1093       ret =
1094           prepared_statement_run_select (plugin,
1095                                          plugin->count_entry_by_hash_and_type,
1096                                          1, cbind,
1097                                          MYSQL_TYPE_BLOB, key, hashSize,
1098                                          &hashSize, MYSQL_TYPE_LONG, &type,
1099                                          GNUNET_YES, -1);
1100     }
1101   }
1102   else
1103   {
1104     if (vhash != NULL)
1105     {
1106       ret =
1107           prepared_statement_run_select (plugin,
1108                                          plugin->count_entry_by_hash_and_vhash,
1109                                          1, cbind,
1110                                          MYSQL_TYPE_BLOB, key, hashSize,
1111                                          &hashSize, MYSQL_TYPE_BLOB, vhash,
1112                                          hashSize2, &hashSize2, -1);
1113
1114     }
1115     else
1116     {
1117       ret =
1118           prepared_statement_run_select (plugin,
1119                                          plugin->count_entry_by_hash,
1120                                          1, cbind,
1121                                          MYSQL_TYPE_BLOB, key, hashSize,
1122                                          &hashSize, -1);
1123     }
1124   }
1125   if ((ret != GNUNET_OK) || (0 >= total))
1126   {
1127     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1128     return;
1129   }
1130   offset = offset % total;
1131   off = (unsigned long long) offset;
1132 #if DEBUG_MYSQL
1133   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134               "Obtaining %llu/%lld result for GET `%s'\n",
1135               off, total, GNUNET_h2s (key));
1136 #endif
1137
1138   if (type != GNUNET_BLOCK_TYPE_ANY)
1139   {
1140     if (NULL != vhash)
1141     {
1142       execute_select (plugin,
1143                       plugin->select_entry_by_hash_vhash_and_type,
1144                       proc, proc_cls,
1145                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1146                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
1147                       MYSQL_TYPE_LONG, &type, GNUNET_YES,
1148                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
1149     }
1150     else
1151     {
1152       execute_select (plugin,
1153                       plugin->select_entry_by_hash_and_type,
1154                       proc, proc_cls,
1155                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1156                       MYSQL_TYPE_LONG, &type, GNUNET_YES,
1157                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
1158     }
1159   }
1160   else
1161   {
1162     if (NULL != vhash)
1163     {
1164       execute_select (plugin,
1165                       plugin->select_entry_by_hash_and_vhash,
1166                       proc, proc_cls,
1167                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1168                       MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
1169                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
1170     }
1171     else
1172     {
1173       execute_select (plugin,
1174                       plugin->select_entry_by_hash,
1175                       proc, proc_cls,
1176                       MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1177                       MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, -1);
1178     }
1179   }
1180 }
1181
1182
1183 /**
1184  * Get a zero-anonymity datum from the datastore.
1185  *
1186  * @param cls our "struct Plugin*"
1187  * @param offset offset of the result
1188  * @param type entries of which type should be considered?
1189  *        Use 0 for any type.
1190  * @param proc function to call on a matching value or NULL
1191  * @param proc_cls closure for iter
1192  */
1193 static void
1194 mysql_plugin_get_zero_anonymity (void *cls,
1195                                  uint64_t offset,
1196                                  enum GNUNET_BLOCK_Type type,
1197                                  PluginDatumProcessor proc, void *proc_cls)
1198 {
1199   struct Plugin *plugin = cls;
1200   unsigned long long rvalue =
1201       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1202                                                      UINT64_MAX);
1203
1204   execute_select (plugin,
1205                   plugin->zero_iter,
1206                   proc, proc_cls,
1207                   MYSQL_TYPE_LONG, &type, GNUNET_YES,
1208                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
1209                   MYSQL_TYPE_LONG, &type, GNUNET_YES,
1210                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
1211 }
1212
1213
1214 /**
1215  * Context for 'repl_proc' function.
1216  */
1217 struct ReplCtx
1218 {
1219
1220   /**
1221    * Plugin handle.
1222    */
1223   struct Plugin *plugin;
1224
1225   /**
1226    * Function to call for the result (or the NULL).
1227    */
1228   PluginDatumProcessor proc;
1229
1230   /**
1231    * Closure for proc.
1232    */
1233   void *proc_cls;
1234 };
1235
1236
1237 /**
1238  * Wrapper for the processor for 'mysql_plugin_get_replication'.
1239  * Decrements the replication counter and calls the original
1240  * iterator.
1241  *
1242  * @param cls closure
1243  * @param key key for the content
1244  * @param size number of bytes in data
1245  * @param data content stored
1246  * @param type type of the content
1247  * @param priority priority of the content
1248  * @param anonymity anonymity-level for the content
1249  * @param expiration expiration time for the content
1250  * @param uid unique identifier for the datum;
1251  *        maybe 0 if no unique identifier is available
1252  *
1253  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1254  *         (continue on call to "next", of course),
1255  *         GNUNET_NO to delete the item and continue (if supported)
1256  */
1257 static int
1258 repl_proc (void *cls,
1259            const GNUNET_HashCode * key,
1260            uint32_t size,
1261            const void *data,
1262            enum GNUNET_BLOCK_Type type,
1263            uint32_t priority,
1264            uint32_t anonymity,
1265            struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1266 {
1267   struct ReplCtx *rc = cls;
1268   struct Plugin *plugin = rc->plugin;
1269   unsigned long long oid;
1270   int ret;
1271   int iret;
1272
1273   ret = rc->proc (rc->proc_cls,
1274                   key, size, data, type, priority, anonymity, expiration, uid);
1275   if (NULL != key)
1276   {
1277     oid = (unsigned long long) uid;
1278     iret = prepared_statement_run (plugin,
1279                                    plugin->dec_repl,
1280                                    NULL,
1281                                    MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, -1);
1282     if (iret == GNUNET_SYSERR)
1283     {
1284       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1285                   "Failed to reduce replication counter\n");
1286       return GNUNET_SYSERR;
1287     }
1288   }
1289   return ret;
1290 }
1291
1292
1293 /**
1294  * Get a random item for replication.  Returns a single, not expired,
1295  * random item from those with the highest replication counters.  The
1296  * item's replication counter is decremented by one IF it was positive
1297  * before.  Call 'proc' with all values ZERO or NULL if the datastore
1298  * is empty.
1299  *
1300  * @param cls closure
1301  * @param proc function to call the value (once only).
1302  * @param proc_cls closure for proc
1303  */
1304 static void
1305 mysql_plugin_get_replication (void *cls,
1306                               PluginDatumProcessor proc, void *proc_cls)
1307 {
1308   struct Plugin *plugin = cls;
1309   struct ReplCtx rc;
1310   unsigned long long rvalue;
1311   unsigned long repl;
1312   MYSQL_BIND results;
1313
1314   rc.plugin = plugin;
1315   rc.proc = proc;
1316   rc.proc_cls = proc_cls;
1317   memset (&results, 0, sizeof (results));
1318   results.buffer_type = MYSQL_TYPE_LONG;
1319   results.buffer = &repl;
1320   results.is_unsigned = GNUNET_YES;
1321
1322   if (1 !=
1323       prepared_statement_run_select (plugin, plugin->max_repl, 1, &results, -1))
1324   {
1325     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1326     return;
1327   }
1328
1329   rvalue =
1330       (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1331                                                      UINT64_MAX);
1332   execute_select (plugin, plugin->select_replication, &repl_proc, &rc,
1333                   MYSQL_TYPE_LONG, &repl, GNUNET_YES, MYSQL_TYPE_LONGLONG,
1334                   &rvalue, GNUNET_YES, MYSQL_TYPE_LONG, &repl, GNUNET_YES,
1335                   MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES, -1);
1336
1337 }
1338
1339
1340 /**
1341  * Context for 'expi_proc' function.
1342  */
1343 struct ExpiCtx
1344 {
1345
1346   /**
1347    * Plugin handle.
1348    */
1349   struct Plugin *plugin;
1350
1351   /**
1352    * Function to call for the result (or the NULL).
1353    */
1354   PluginDatumProcessor proc;
1355
1356   /**
1357    * Closure for proc.
1358    */
1359   void *proc_cls;
1360 };
1361
1362
1363
1364 /**
1365  * Wrapper for the processor for 'mysql_plugin_get_expiration'.
1366  * If no expired value was found, we do a second query for
1367  * low-priority content.
1368  *
1369  * @param cls closure
1370  * @param key key for the content
1371  * @param size number of bytes in data
1372  * @param data content stored
1373  * @param type type of the content
1374  * @param priority priority of the content
1375  * @param anonymity anonymity-level for the content
1376  * @param expiration expiration time for the content
1377  * @param uid unique identifier for the datum;
1378  *        maybe 0 if no unique identifier is available
1379  *
1380  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1381  *         (continue on call to "next", of course),
1382  *         GNUNET_NO to delete the item and continue (if supported)
1383  */
1384 static int
1385 expi_proc (void *cls,
1386            const GNUNET_HashCode * key,
1387            uint32_t size,
1388            const void *data,
1389            enum GNUNET_BLOCK_Type type,
1390            uint32_t priority,
1391            uint32_t anonymity,
1392            struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1393 {
1394   struct ExpiCtx *rc = cls;
1395   struct Plugin *plugin = rc->plugin;
1396
1397   if (NULL == key)
1398   {
1399     execute_select (plugin,
1400                     plugin->select_priority, rc->proc, rc->proc_cls, -1);
1401     return GNUNET_SYSERR;
1402   }
1403   return rc->proc (rc->proc_cls,
1404                    key, size, data, type, priority, anonymity, expiration, uid);
1405 }
1406
1407
1408 /**
1409  * Get a random item for expiration.
1410  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1411  *
1412  * @param cls closure
1413  * @param proc function to call the value (once only).
1414  * @param proc_cls closure for proc
1415  */
1416 static void
1417 mysql_plugin_get_expiration (void *cls,
1418                              PluginDatumProcessor proc, void *proc_cls)
1419 {
1420   struct Plugin *plugin = cls;
1421   long long nt;
1422   struct ExpiCtx rc;
1423
1424   rc.plugin = plugin;
1425   rc.proc = proc;
1426   rc.proc_cls = proc_cls;
1427   nt = (long long) GNUNET_TIME_absolute_get ().abs_value;
1428   execute_select (plugin,
1429                   plugin->select_expiration,
1430                   expi_proc, &rc, MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, -1);
1431
1432 }
1433
1434
1435 /**
1436  * Drop database.
1437  *
1438  * @param cls the "struct Plugin*"
1439  */
1440 static void
1441 mysql_plugin_drop (void *cls)
1442 {
1443   struct Plugin *plugin = cls;
1444
1445   if (GNUNET_OK != run_statement (plugin, "DROP TABLE gn090"))
1446     return;                     /* error */
1447   plugin->env->duc (plugin->env->cls, 0);
1448 }
1449
1450
1451 /**
1452  * Entry point for the plugin.
1453  *
1454  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1455  * @return our "struct Plugin*"
1456  */
1457 void *
1458 libgnunet_plugin_datastore_mysql_init (void *cls)
1459 {
1460   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1461   struct GNUNET_DATASTORE_PluginFunctions *api;
1462   struct Plugin *plugin;
1463
1464   plugin = GNUNET_malloc (sizeof (struct Plugin));
1465   plugin->env = env;
1466   plugin->cnffile = get_my_cnf_path (env->cfg);
1467   if (GNUNET_OK != iopen (plugin))
1468   {
1469     iclose (plugin);
1470     GNUNET_free_non_null (plugin->cnffile);
1471     GNUNET_free (plugin);
1472     return NULL;
1473   }
1474 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1475 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1476   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1477              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1478              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1479              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1480              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1481              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1482              " rvalue BIGINT UNSIGNED NOT NULL,"
1483              " hash BINARY(64) NOT NULL DEFAULT '',"
1484              " vhash BINARY(64) NOT NULL DEFAULT '',"
1485              " value BLOB NOT NULL DEFAULT '',"
1486              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1487              " PRIMARY KEY (uid),"
1488              " INDEX idx_hash (hash(64)),"
1489              " INDEX idx_hash_uid (hash(64),uid),"
1490              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1491              " INDEX idx_hash_type_uid (hash(64),type,rvalue),"
1492              " INDEX idx_prio (prio),"
1493              " INDEX idx_repl_rvalue (repl,rvalue),"
1494              " INDEX idx_expire (expire),"
1495              " INDEX idx_anonLevel_type_rvalue (anonLevel,type,rvalue)"
1496              ") ENGINE=InnoDB") ||
1497       MRUNS ("SET AUTOCOMMIT = 1") ||
1498       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1499       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1500       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1501       PINIT (plugin->select_entry_by_hash_and_vhash,
1502              SELECT_ENTRY_BY_HASH_AND_VHASH) ||
1503       PINIT (plugin->select_entry_by_hash_and_type,
1504              SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1505       PINIT (plugin->select_entry_by_hash_vhash_and_type,
1506              SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1507       PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
1508       PINIT (plugin->get_size, SELECT_SIZE) ||
1509       PINIT (plugin->count_entry_by_hash_and_vhash,
1510              COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1511       PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1512       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1513                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1514       PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1515       PINIT (plugin->dec_repl, DEC_REPL) ||
1516       PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
1517       PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) ||
1518       PINIT (plugin->select_priority, SELECT_IT_PRIORITY) ||
1519       PINIT (plugin->max_repl, SELECT_MAX_REPL) ||
1520       PINIT (plugin->select_replication, SELECT_IT_REPLICATION))
1521   {
1522     iclose (plugin);
1523     GNUNET_free_non_null (plugin->cnffile);
1524     GNUNET_free (plugin);
1525     return NULL;
1526   }
1527 #undef PINIT
1528 #undef MRUNS
1529
1530   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1531   api->cls = plugin;
1532   api->estimate_size = &mysql_plugin_estimate_size;
1533   api->put = &mysql_plugin_put;
1534   api->update = &mysql_plugin_update;
1535   api->get_key = &mysql_plugin_get_key;
1536   api->get_replication = &mysql_plugin_get_replication;
1537   api->get_expiration = &mysql_plugin_get_expiration;
1538   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1539   api->drop = &mysql_plugin_drop;
1540   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1541                    "mysql", _("Mysql database running\n"));
1542   return api;
1543 }
1544
1545
1546 /**
1547  * Exit point from the plugin.
1548  * @param cls our "struct Plugin*"
1549  * @return always NULL
1550  */
1551 void *
1552 libgnunet_plugin_datastore_mysql_done (void *cls)
1553 {
1554   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1555   struct Plugin *plugin = api->cls;
1556   struct GNUNET_MysqlStatementHandle *s;
1557
1558   iclose (plugin);
1559   while (NULL != (s = plugin->shead))
1560   {
1561     GNUNET_CONTAINER_DLL_remove (plugin->shead, plugin->stail, s);
1562     GNUNET_free (s->query);
1563     GNUNET_free (s);
1564   }
1565   GNUNET_free_non_null (plugin->cnffile);
1566   GNUNET_free (plugin);
1567   GNUNET_free (api);
1568   mysql_library_end ();
1569   return NULL;
1570 }
1571
1572 /* end of plugin_datastore_mysql.c */