insanity
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109      mysql> REPAIR TABLE gn072;
110    @endverbatim
111  *
112  * PROBLEMS?
113  *
114  * If you have problems related to the mysql module, your best
115  * friend is probably the mysql manual. The first thing to check
116  * is that mysql is basically operational, that you can connect
117  * to it, create tables, issue queries etc.
118  *
119  * TODO:
120  * - use FOREIGN KEY for 'uid/vkey'
121  * - consistent naming of uid/vkey
122  */
123
124 #include "platform.h"
125 #include "gnunet_datastore_plugin.h"
126 #include "gnunet_util_lib.h"
127 #include <mysql/mysql.h>
128
129 #define DEBUG_MYSQL GNUNET_NO
130
131 #define MAX_DATUM_SIZE 65536
132
133 /**
134  * Maximum number of supported parameters for a prepared
135  * statement.  Increase if needed.
136  */
137 #define MAX_PARAM 16
138
139 /**
140  * Die with an error message that indicates
141  * a failure of the command 'cmd' with the message given
142  * by strerror(errno).
143  */
144 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
145
146 /**
147  * Log an error message at log-level 'level' that indicates
148  * a failure of the command 'cmd' on file 'filename'
149  * with the message given by strerror(errno).
150  */
151 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
152
153
154 struct GNUNET_MysqlStatementHandle
155 {
156   struct GNUNET_MysqlStatementHandle *next;
157
158   struct GNUNET_MysqlStatementHandle *prev;
159
160   char *query;
161
162   MYSQL_STMT *statement;
163
164   int valid;
165
166 };
167
168 /**
169  * Context for the universal iterator.
170  */
171 struct NextRequestClosure;
172
173 /**
174  * Type of a function that will prepare
175  * the next iteration.
176  *
177  * @param cls closure
178  * @param nc the next context; NULL for the last
179  *         call which gives the callback a chance to
180  *         clean up the closure
181  * @return GNUNET_OK on success, GNUNET_NO if there are
182  *         no more values, GNUNET_SYSERR on error
183  */
184 typedef int (*PrepareFunction)(void *cls,
185                                struct NextRequestClosure *nc);
186
187
188 struct NextRequestClosure
189 {
190   struct Plugin *plugin;
191
192   struct GNUNET_TIME_Absolute now;
193
194   /**
195    * Function to call to prepare the next
196    * iteration.
197    */
198   PrepareFunction prep;
199
200   /**
201    * Closure for prep.
202    */
203   void *prep_cls;
204
205   MYSQL_BIND rbind[6];
206
207   enum GNUNET_BLOCK_Type type;
208
209   PluginIterator dviter;
210
211   void *dviter_cls;
212
213   unsigned long long last_vkey;
214
215   unsigned int last_prio;
216
217   int end_it;
218 };
219
220
221 /**
222  * Context for all functions in this plugin.
223  */
224 struct Plugin 
225 {
226   /**
227    * Our execution environment.
228    */
229   struct GNUNET_DATASTORE_PluginEnvironment *env;
230
231   MYSQL *dbf;
232   
233   struct GNUNET_MysqlStatementHandle *shead;
234
235   struct GNUNET_MysqlStatementHandle *stail;
236
237   /**
238    * Filename of "my.cnf" (msyql configuration).
239    */
240   char *cnffile;
241
242   /**
243    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
244    */
245   struct NextRequestClosure *next_task_nc;
246
247   /**
248    * Pending task with scheduler for running the next request.
249    */
250   GNUNET_SCHEDULER_TaskIdentifier next_task;
251
252   /**
253    * Statements dealing with gn072 table 
254    */
255 #define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?"
256   struct GNUNET_MysqlStatementHandle *select_value;
257
258 #define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"
259   struct GNUNET_MysqlStatementHandle *delete_value;
260
261 #define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)"
262   struct GNUNET_MysqlStatementHandle *insert_value;
263
264   /**
265    * Statements dealing with gn090 table 
266    */
267 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)"
268   struct GNUNET_MysqlStatementHandle *insert_entry;
269   
270 #define DELETE_ENTRY_BY_VKEY "DELETE FROM gn090 WHERE vkey=?"
271   struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey;
272   
273 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
274   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
275   
276 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
277   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
278
279 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
280   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
281   
282 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
283   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
284   
285 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=?"
286   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
287
288 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
289   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
290
291 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=? AND type=?"
292   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
293
294 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
295   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
296
297 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
298   struct GNUNET_MysqlStatementHandle *update_entry;
299
300 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072"
301   struct GNUNET_MysqlStatementHandle *get_size;
302
303 /* warning, slighly crazy mysql statements ahead.  Essentially, MySQL does not handle
304    "OR" very well, so we need to use UNION instead.  And UNION does not
305    automatically apply a LIMIT on the outermost clause, so we need to
306    repeat ourselves quite a bit.  All hail the performance gods (and thanks
307    to #mysql on freenode) */
308 #define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
309   " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
310   "UNION "\
311   "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
312   " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
313   "ORDER BY prio DESC,vkey DESC LIMIT 1"
314   struct GNUNET_MysqlStatementHandle *zero_iter;
315
316 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ?) "\
317   "ORDER BY prio ASC LIMIT 1) "\
318   "UNION "\
319   "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) "\
320   "ORDER BY prio ASC LIMIT 1) ORDER BY expire ASC LIMIT 1"
321   struct GNUNET_MysqlStatementHandle *select_expiration;
322
323 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) "\
324   "WHERE expire > ?"\
325   " ORDER BY repl DESC,RAND() LIMIT 1"
326   struct GNUNET_MysqlStatementHandle *select_replication;
327
328 };
329
330
331 /**
332  * Obtain the location of ".my.cnf".
333  *
334  * @param cfg our configuration
335  * @return NULL on error
336  */
337 static char *
338 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
339 {
340   char *cnffile;
341   char *home_dir;
342   struct stat st;
343 #ifndef WINDOWS
344   struct passwd *pw;
345 #endif
346   int configured;
347
348 #ifndef WINDOWS
349   pw = getpwuid (getuid ());
350   if (!pw)
351     {
352       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
353                            "getpwuid");
354       return NULL;
355     }
356   if (GNUNET_YES ==
357       GNUNET_CONFIGURATION_have_value (cfg,
358                                        "datastore-mysql", "CONFIG"))
359     {
360       GNUNET_assert (GNUNET_OK == 
361                      GNUNET_CONFIGURATION_get_value_filename (cfg,
362                                                               "datastore-mysql", "CONFIG", &cnffile));
363       configured = GNUNET_YES;
364     }
365   else
366     {
367       home_dir = GNUNET_strdup (pw->pw_dir);
368 #else
369       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
370       plibc_conv_to_win_path ("~/", home_dir);
371 #endif
372       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
373       GNUNET_free (home_dir);
374       configured = GNUNET_NO;
375     }
376   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
377               _("Trying to use file `%s' for MySQL configuration.\n"),
378               cnffile);
379   if ((0 != STAT (cnffile, &st)) ||
380       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
381     {
382       if (configured == GNUNET_YES)
383         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
384                     _("Could not access file `%s': %s\n"), cnffile,
385                     STRERROR (errno));
386       GNUNET_free (cnffile);
387       return NULL;
388     }
389   return cnffile;
390 }
391
392
393
394 /**
395  * Free a prepared statement.
396  *
397  * @param plugin plugin context
398  * @param s prepared statement
399  */
400 static void
401 prepared_statement_destroy (struct Plugin *plugin, 
402                             struct GNUNET_MysqlStatementHandle
403                             *s)
404 {
405   GNUNET_CONTAINER_DLL_remove (plugin->shead,
406                                plugin->stail,
407                                s);
408   if (s->valid)
409     mysql_stmt_close (s->statement);
410   GNUNET_free (s->query);
411   GNUNET_free (s);
412 }
413
414
415 /**
416  * Close database connection and all prepared statements (we got a DB
417  * disconnect error).
418  */
419 static int
420 iclose (struct Plugin *plugin)
421 {
422   struct GNUNET_MysqlStatementHandle *spos;
423
424   spos = plugin->shead;
425   while (NULL != plugin->shead)
426     prepared_statement_destroy (plugin,
427                                 plugin->shead);
428   if (plugin->dbf != NULL)
429     {
430       mysql_close (plugin->dbf);
431       plugin->dbf = NULL;
432     }
433   return GNUNET_OK;
434 }
435
436
437 /**
438  * Open the connection with the database (and initialize
439  * our default options).
440  *
441  * @return GNUNET_OK on success
442  */
443 static int
444 iopen (struct Plugin *ret)
445 {
446   char *mysql_dbname;
447   char *mysql_server;
448   char *mysql_user;
449   char *mysql_password;
450   unsigned long long mysql_port;
451   my_bool reconnect;
452   unsigned int timeout;
453
454   ret->dbf = mysql_init (NULL);
455   if (ret->dbf == NULL)
456     return GNUNET_SYSERR;
457   if (ret->cnffile != NULL)
458     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
459   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
460   reconnect = 0;
461   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
462   mysql_options (ret->dbf,
463                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
464   mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
465   timeout = 60; /* in seconds */
466   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
467   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
468   mysql_dbname = NULL;
469   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
470                                                      "datastore-mysql", "DATABASE"))
471     GNUNET_assert (GNUNET_OK == 
472                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
473                                                           "datastore-mysql", "DATABASE", 
474                                                           &mysql_dbname));
475   else
476     mysql_dbname = GNUNET_strdup ("gnunet");
477   mysql_user = NULL;
478   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
479                                                      "datastore-mysql", "USER"))
480     {
481       GNUNET_assert (GNUNET_OK == 
482                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
483                                                            "datastore-mysql", "USER", 
484                                                            &mysql_user));
485     }
486   mysql_password = NULL;
487   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
488                                                      "datastore-mysql", "PASSWORD"))
489     {
490       GNUNET_assert (GNUNET_OK ==
491                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
492                                                            "datastore-mysql", "PASSWORD",
493                                                            &mysql_password));
494     }
495   mysql_server = NULL;
496   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
497                                                      "datastore-mysql", "HOST"))
498     {
499       GNUNET_assert (GNUNET_OK == 
500                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
501                                                            "datastore-mysql", "HOST", 
502                                                            &mysql_server));
503     }
504   mysql_port = 0;
505   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
506                                                      "datastore-mysql", "PORT"))
507     {
508       GNUNET_assert (GNUNET_OK ==
509                     GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
510                                                            "PORT", &mysql_port));
511     }
512
513   GNUNET_assert (mysql_dbname != NULL);
514   mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
515                       mysql_dbname, (unsigned int) mysql_port, NULL,
516                       CLIENT_IGNORE_SIGPIPE);
517   GNUNET_free_non_null (mysql_server);
518   GNUNET_free_non_null (mysql_user);
519   GNUNET_free_non_null (mysql_password);
520   GNUNET_free (mysql_dbname);
521   if (mysql_error (ret->dbf)[0])
522     {
523       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
524                  "mysql_real_connect", ret);
525       return GNUNET_SYSERR;
526     }
527   return GNUNET_OK;
528 }
529
530
531 /**
532  * Run the given MySQL statement.
533  *
534  * @param plugin plugin context
535  * @param statement SQL statement to run
536  * @return GNUNET_OK on success, GNUNET_SYSERR on error
537  */
538 static int
539 run_statement (struct Plugin *plugin,
540                const char *statement)
541 {
542   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
543     return GNUNET_SYSERR;
544   mysql_query (plugin->dbf, statement);
545   if (mysql_error (plugin->dbf)[0])
546     {
547       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
548                  "mysql_query", plugin);
549       iclose (plugin);
550       return GNUNET_SYSERR;
551     }
552   return GNUNET_OK;
553 }
554
555
556 /**
557  * Create a prepared statement.
558  *
559  * @param plugin plugin context
560  * @param statement SQL statement text to prepare
561  * @return NULL on error
562  */
563 static struct GNUNET_MysqlStatementHandle *
564 prepared_statement_create (struct Plugin *plugin, 
565                            const char *statement)
566 {
567   struct GNUNET_MysqlStatementHandle *ret;
568
569   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
570   ret->query = GNUNET_strdup (statement);
571   GNUNET_CONTAINER_DLL_insert (plugin->shead,
572                                plugin->stail,
573                                ret);
574   return ret;
575 }
576
577
578 /**
579  * Prepare a statement for running.
580  *
581  * @param plugin plugin context
582  * @param ret handle to prepared statement
583  * @return GNUNET_OK on success
584  */
585 static int
586 prepare_statement (struct Plugin *plugin, 
587                    struct GNUNET_MysqlStatementHandle *ret)
588 {
589   if (GNUNET_YES == ret->valid)
590     return GNUNET_OK;
591   if ((NULL == plugin->dbf) && 
592       (GNUNET_OK != iopen (plugin)))
593     return GNUNET_SYSERR;
594   ret->statement = mysql_stmt_init (plugin->dbf);
595   if (ret->statement == NULL)
596     {
597       iclose (plugin);
598       return GNUNET_SYSERR;
599     }
600   if (mysql_stmt_prepare (ret->statement, 
601                           ret->query,
602                           strlen (ret->query)))
603     {
604       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
605                  "mysql_stmt_prepare", 
606                  plugin);
607       mysql_stmt_close (ret->statement);
608       ret->statement = NULL;
609       iclose (plugin);
610       return GNUNET_SYSERR;
611     }
612   ret->valid = GNUNET_YES;
613   return GNUNET_OK;
614
615 }
616
617
618 /**
619  * Bind the parameters for the given MySQL statement
620  * and run it.
621  *
622  * @param plugin plugin context
623  * @param s statement to bind and run
624  * @param ap arguments for the binding
625  * @return GNUNET_SYSERR on error, GNUNET_OK on success
626  */
627 static int
628 init_params (struct Plugin *plugin,
629              struct GNUNET_MysqlStatementHandle *s,
630              va_list ap)
631 {
632   MYSQL_BIND qbind[MAX_PARAM];
633   unsigned int pc;
634   unsigned int off;
635   enum enum_field_types ft;
636
637   pc = mysql_stmt_param_count (s->statement);
638   if (pc > MAX_PARAM)
639     {
640       /* increase internal constant! */
641       GNUNET_break (0);
642       return GNUNET_SYSERR;
643     }
644   memset (qbind, 0, sizeof (qbind));
645   off = 0;
646   ft = 0;
647   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
648     {
649       qbind[off].buffer_type = ft;
650       switch (ft)
651         {
652         case MYSQL_TYPE_FLOAT:
653           qbind[off].buffer = va_arg (ap, float *);
654           break;
655         case MYSQL_TYPE_LONGLONG:
656           qbind[off].buffer = va_arg (ap, unsigned long long *);
657           qbind[off].is_unsigned = va_arg (ap, int);
658           break;
659         case MYSQL_TYPE_LONG:
660           qbind[off].buffer = va_arg (ap, unsigned int *);
661           qbind[off].is_unsigned = va_arg (ap, int);
662           break;
663         case MYSQL_TYPE_VAR_STRING:
664         case MYSQL_TYPE_STRING:
665         case MYSQL_TYPE_BLOB:
666           qbind[off].buffer = va_arg (ap, void *);
667           qbind[off].buffer_length = va_arg (ap, unsigned long);
668           qbind[off].length = va_arg (ap, unsigned long *);
669           break;
670         default:
671           /* unsupported type */
672           GNUNET_break (0);
673           return GNUNET_SYSERR;
674         }
675       pc--;
676       off++;
677     }
678   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
679     {
680       GNUNET_break (0);
681       return GNUNET_SYSERR;
682     }
683   if (mysql_stmt_bind_param (s->statement, qbind))
684     {
685       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
686                   _("`%s' failed at %s:%d with error: %s\n"),
687                   "mysql_stmt_bind_param",
688                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
689       iclose (plugin);
690       return GNUNET_SYSERR;
691     }
692   if (mysql_stmt_execute (s->statement))
693     {
694       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
695                   _("`%s' failed at %s:%d with error: %s\n"),
696                   "mysql_stmt_execute",
697                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
698       iclose (plugin);
699       return GNUNET_SYSERR;
700     }
701   return GNUNET_OK;
702 }
703
704 /**
705  * Type of a callback that will be called for each
706  * data set returned from MySQL.
707  *
708  * @param cls user-defined argument
709  * @param num_values number of elements in values
710  * @param values values returned by MySQL
711  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
712  */
713 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
714                                           unsigned int num_values,
715                                           MYSQL_BIND *values);
716
717
718 /**
719  * Run a prepared SELECT statement.
720  *
721  * @param plugin plugin context
722  * @param s statement to run
723  * @param result_size number of elements in results array
724  * @param results pointer to already initialized MYSQL_BIND
725  *        array (of sufficient size) for passing results
726  * @param processor function to call on each result
727  * @param processor_cls extra argument to processor
728  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
729  *        values (size + buffer-reference for pointers); terminated
730  *        with "-1"
731  * @return GNUNET_SYSERR on error, otherwise
732  *         the number of successfully affected (or queried) rows
733  */
734 static int
735 prepared_statement_run_select (struct Plugin *plugin,
736                                struct GNUNET_MysqlStatementHandle *s,
737                                unsigned int result_size,
738                                MYSQL_BIND *results,
739                                GNUNET_MysqlDataProcessor processor, void *processor_cls,
740                                ...)
741 {
742   va_list ap;
743   int ret;
744   unsigned int rsize;
745   int total;
746
747   if (GNUNET_OK != prepare_statement (plugin, s))
748     {
749       GNUNET_break (0);
750       return GNUNET_SYSERR;
751     }
752   va_start (ap, processor_cls);
753   if (GNUNET_OK != init_params (plugin, s, ap))
754     {
755       GNUNET_break (0);
756       va_end (ap);
757       return GNUNET_SYSERR;
758     }
759   va_end (ap);
760   rsize = mysql_stmt_field_count (s->statement);
761   if (rsize > result_size)
762     {
763       GNUNET_break (0);
764       return GNUNET_SYSERR;
765     }
766   if (mysql_stmt_bind_result (s->statement, results))
767     {
768       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
769                   _("`%s' failed at %s:%d with error: %s\n"),
770                   "mysql_stmt_bind_result",
771                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
772       iclose (plugin);
773       return GNUNET_SYSERR;
774     }
775
776   total = 0;
777   while (1)
778     {
779       ret = mysql_stmt_fetch (s->statement);
780       if (ret == MYSQL_NO_DATA)
781         break;
782       if (ret != 0)
783         {
784           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
785                       _("`%s' failed at %s:%d with error: %s\n"),
786                       "mysql_stmt_fetch",
787                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
788           iclose (plugin);
789           return GNUNET_SYSERR;
790         }
791       if (processor != NULL)
792         if (GNUNET_OK != processor (processor_cls, rsize, results))
793           break;
794       total++;
795     }
796   mysql_stmt_reset (s->statement);
797   return total;
798 }
799
800
801 /**
802  * Run a prepared statement that does NOT produce results.
803  *
804  * @param plugin plugin context
805  * @param s statement to run
806  * @param insert_id NULL or address where to store the row ID of whatever
807  *        was inserted (only for INSERT statements!)
808  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
809  *        values (size + buffer-reference for pointers); terminated
810  *        with "-1"
811  * @return GNUNET_SYSERR on error, otherwise
812  *         the number of successfully affected rows
813  */
814 static int
815 prepared_statement_run (struct Plugin *plugin,
816                         struct GNUNET_MysqlStatementHandle *s,
817                         unsigned long long *insert_id, ...)
818 {
819   va_list ap;
820   int affected;
821
822   if (GNUNET_OK != prepare_statement (plugin, s))
823     return GNUNET_SYSERR;
824   va_start (ap, insert_id);
825   if (GNUNET_OK != init_params (plugin, s, ap))
826     {
827       va_end (ap);
828       return GNUNET_SYSERR;
829     }
830   va_end (ap);
831   affected = mysql_stmt_affected_rows (s->statement);
832   if (NULL != insert_id)
833     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
834   mysql_stmt_reset (s->statement);
835   return affected;
836 }
837
838
839 /**
840  * Delete an value from the gn072 table.
841  *
842  * @param plugin plugin context
843  * @param vkey vkey identifying the value to delete
844  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
845  */
846 static int
847 do_delete_value (struct Plugin *plugin,
848                  unsigned long long vkey)
849 {
850   int ret;
851
852 #if DEBUG_MYSQL
853   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854               "Deleting value %llu from gn072 table\n",
855               vkey);
856 #endif
857   ret = prepared_statement_run (plugin,
858                                 plugin->delete_value,
859                                 NULL,
860                                 MYSQL_TYPE_LONGLONG,
861                                 &vkey, GNUNET_YES, -1);
862   if (ret > 0)
863     {
864       ret = GNUNET_OK;
865     }
866   else
867     {
868       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
869                   "Deleting value %llu from gn072 table failed\n",
870                   vkey);
871     }
872   return ret;
873 }
874
875 /**
876  * Insert a value into the gn072 table.
877  *
878  * @param plugin plugin context
879  * @param value the value to insert
880  * @param size size of the value
881  * @param vkey vkey identifying the value henceforth (set)
882  * @return GNUNET_OK on success, GNUNET_SYSERR on error
883  */
884 static int
885 do_insert_value (struct Plugin *plugin,
886                  const void *value, unsigned int size,
887                  unsigned long long *vkey)
888 {
889   unsigned long length = size;
890   int ret;
891
892   ret = prepared_statement_run (plugin,
893                                 plugin->insert_value,
894                                 vkey,
895                                 MYSQL_TYPE_BLOB,
896                                 value, length, &length, -1);
897   if (ret == GNUNET_OK)
898     {
899 #if DEBUG_MYSQL
900       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901                   "Inserted value number %llu with length %u into gn072 table\n",
902                   *vkey,
903                   size);
904 #endif
905     }
906   else
907     {
908       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
909                   "Failed to insert %u byte value into gn072 table\n",
910                   size);
911     }
912   return ret;
913 }
914
915 /**
916  * Delete an entry from the gn090 table.
917  *
918  * @param plugin plugin context
919  * @param vkey vkey identifying the entry to delete
920  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
921  */
922 static int
923 do_delete_entry_by_vkey (struct Plugin *plugin,
924                          unsigned long long vkey)
925 {
926   int ret;
927
928 #if DEBUG_MYSQL
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "Deleting value %llu from gn090 table\n",
931               vkey);
932 #endif
933   ret = prepared_statement_run (plugin,
934                                 plugin->delete_entry_by_vkey,
935                                 NULL,
936                                 MYSQL_TYPE_LONGLONG,
937                                 &vkey, GNUNET_YES, -1);
938   if (ret > 0)
939     {
940       ret = GNUNET_OK;
941     }
942   else
943     {
944       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
945                   "Deleting value %llu from gn090 table failed\n",
946                   vkey);
947     }
948   return ret;
949 }
950
951
952 /**
953  * Function that simply returns GNUNET_OK
954  *
955  * @param cls closure, not used
956  * @param num_values not used
957  * @param values not used
958  * @return GNUNET_OK
959  */
960 static int
961 return_ok (void *cls, 
962            unsigned int num_values, 
963            MYSQL_BIND * values)
964 {
965   return GNUNET_OK;
966 }
967
968
969 /**
970  * Continuation of "mysql_next_request".
971  *
972  * @param next_cls the next context
973  * @param tc the task context (unused)
974  */
975 static void 
976 mysql_next_request_cont (void *next_cls,
977                          const struct GNUNET_SCHEDULER_TaskContext *tc)
978 {
979   struct NextRequestClosure *nrc = next_cls;
980   struct Plugin *plugin;
981   int ret;
982   unsigned int type;
983   unsigned int priority;
984   unsigned int anonymity;
985   unsigned long long exp;
986   unsigned long long vkey;
987   unsigned long hashSize;
988   GNUNET_HashCode key;
989   struct GNUNET_TIME_Absolute expiration;
990   unsigned long length;
991   MYSQL_BIND *rbind; /* size 7 */
992   MYSQL_BIND dbind[1];
993   char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
994
995   plugin = nrc->plugin;
996   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
997   plugin->next_task_nc = NULL;
998
999  AGAIN: 
1000   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1001   nrc->now = GNUNET_TIME_absolute_get ();
1002   hashSize = sizeof (GNUNET_HashCode);
1003   memset (nrc->rbind, 0, sizeof (nrc->rbind));
1004   rbind = nrc->rbind;
1005   rbind[0].buffer_type = MYSQL_TYPE_LONG;
1006   rbind[0].buffer = &type;
1007   rbind[0].is_unsigned = 1;
1008   rbind[1].buffer_type = MYSQL_TYPE_LONG;
1009   rbind[1].buffer = &priority;
1010   rbind[1].is_unsigned = 1;
1011   rbind[2].buffer_type = MYSQL_TYPE_LONG;
1012   rbind[2].buffer = &anonymity;
1013   rbind[2].is_unsigned = 1;
1014   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1015   rbind[3].buffer = &exp;
1016   rbind[3].is_unsigned = 1;
1017   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1018   rbind[4].buffer = &key;
1019   rbind[4].buffer_length = hashSize;
1020   rbind[4].length = &hashSize;
1021   rbind[5].buffer_type = MYSQL_TYPE_LONGLONG;
1022   rbind[5].buffer = &vkey;
1023   rbind[5].is_unsigned = GNUNET_YES;
1024
1025   if ( (GNUNET_YES == nrc->end_it) ||
1026        (GNUNET_OK != nrc->prep (nrc->prep_cls,
1027                                 nrc)))
1028     goto END_SET;
1029   nrc->last_vkey = vkey;
1030   nrc->last_prio = priority;
1031   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1032   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1033        (hashSize != sizeof (GNUNET_HashCode)) )
1034     {
1035       GNUNET_break (0);
1036       goto END_SET;
1037     }     
1038 #if DEBUG_MYSQL
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Found value %llu with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1041               vkey,           
1042               priority,
1043               anonymity,
1044               exp);
1045 #endif
1046   /* now do query on gn072 */
1047   length = sizeof (datum);
1048   memset (dbind, 0, sizeof (dbind));
1049   dbind[0].buffer_type = MYSQL_TYPE_BLOB;
1050   dbind[0].buffer_length = length;
1051   dbind[0].length = &length;
1052   dbind[0].buffer = datum;
1053   ret = prepared_statement_run_select (plugin,
1054                                        plugin->select_value,
1055                                        1,
1056                                        dbind,
1057                                        &return_ok,
1058                                        NULL,
1059                                        MYSQL_TYPE_LONGLONG,
1060                                        &vkey, GNUNET_YES, -1);
1061   GNUNET_break (ret <= 1);     /* should only have one rbind! */
1062   if (ret > 0)
1063     ret = GNUNET_OK;
1064   if (ret != GNUNET_OK) 
1065     {
1066       GNUNET_break (0);
1067       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
1068                   _("Failed to obtain value %llu from table `%s'\n"),
1069                   vkey,
1070                   "gn072");
1071       goto AGAIN;
1072     }
1073   GNUNET_break (length <= sizeof(datum));
1074 #if DEBUG_MYSQL
1075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076               "Calling iterator with value `%s' number %llu of size %u with type %u, priority %u, anonymity %u and expiration %llu\n",
1077               GNUNET_h2s (&key),
1078               vkey,           
1079               length,
1080               type,
1081               priority,
1082               anonymity,
1083               exp);
1084 #endif
1085   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1086   expiration.abs_value = exp;
1087   ret = nrc->dviter (nrc->dviter_cls,
1088                      nrc,
1089                      &key,
1090                      length,
1091                      datum,
1092                      type,
1093                      priority,
1094                      anonymity,
1095                      expiration,
1096                      vkey);
1097   if (ret == GNUNET_SYSERR)
1098     {
1099       nrc->end_it = GNUNET_YES;
1100       return;
1101     }
1102   if (ret == GNUNET_NO)
1103     {
1104       do_delete_value (plugin, vkey);
1105       do_delete_entry_by_vkey (plugin, vkey);
1106       if (length != 0)
1107         plugin->env->duc (plugin->env->cls,
1108                           - length);
1109     }
1110   return;
1111  END_SET:
1112   /* call dviter with "end of set" */
1113   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1114   nrc->dviter (nrc->dviter_cls, 
1115                NULL, NULL, 0, NULL, 0, 0, 0, 
1116                GNUNET_TIME_UNIT_ZERO_ABS, 0);
1117   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1118   nrc->prep (nrc->prep_cls, NULL);
1119   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1120   GNUNET_free (nrc);
1121 }
1122
1123
1124 /**
1125  * Function invoked on behalf of a "PluginIterator"
1126  * asking the database plugin to call the iterator
1127  * with the next item.
1128  *
1129  * @param next_cls whatever argument was given
1130  *        to the PluginIterator as "next_cls".
1131  * @param end_it set to GNUNET_YES if we
1132  *        should terminate the iteration early
1133  *        (iterator should be still called once more
1134  *         to signal the end of the iteration).
1135  */
1136 static void 
1137 mysql_plugin_next_request (void *next_cls,
1138                            int end_it)
1139 {
1140   struct NextRequestClosure *nrc = next_cls;
1141
1142   if (GNUNET_YES == end_it)
1143     nrc->end_it = GNUNET_YES;
1144   nrc->plugin->next_task_nc = nrc;
1145   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1146                                                      nrc);
1147 }  
1148
1149
1150 /**
1151  * Get an estimate of how much space the database is
1152  * currently using.
1153  *
1154  * @param cls our "struct Plugin *"
1155  * @return number of bytes used on disk
1156  */
1157 static unsigned long long
1158 mysql_plugin_get_size (void *cls)
1159 {
1160   struct Plugin *plugin = cls;
1161   MYSQL_BIND cbind[1];
1162   long long total;
1163
1164   memset (cbind, 0, sizeof (cbind));
1165   total = 0;
1166   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1167   cbind[0].buffer = &total;
1168   cbind[0].is_unsigned = GNUNET_NO;
1169   if (GNUNET_OK != 
1170       prepared_statement_run_select (plugin,
1171                                      plugin->get_size,
1172                                      1, cbind, 
1173                                      &return_ok, NULL,
1174                                      -1))
1175     return 0;
1176   return total;
1177 }
1178
1179
1180 /**
1181  * Store an item in the datastore.
1182  *
1183  * @param cls closure
1184  * @param key key for the item
1185  * @param size number of bytes in data
1186  * @param data content stored
1187  * @param type type of the content
1188  * @param priority priority of the content
1189  * @param anonymity anonymity-level for the content
1190  * @param replication replication-level for the content
1191  * @param expiration expiration time for the content
1192  * @param msg set to error message
1193  * @return GNUNET_OK on success
1194  */
1195 static int
1196 mysql_plugin_put (void *cls,
1197                   const GNUNET_HashCode * key,
1198                   uint32_t size,
1199                   const void *data,
1200                   enum GNUNET_BLOCK_Type type,
1201                   uint32_t priority,
1202                   uint32_t anonymity,
1203                   uint32_t replication,
1204                   struct GNUNET_TIME_Absolute expiration,
1205                   char **msg)
1206 {
1207   struct Plugin *plugin = cls;
1208   unsigned int irepl = replication;
1209   unsigned int itype = type;
1210   unsigned int ipriority = priority;
1211   unsigned int ianonymity = anonymity;
1212   unsigned long long lexpiration = expiration.abs_value;
1213   unsigned long hashSize;
1214   unsigned long hashSize2;
1215   unsigned long long vkey;
1216   GNUNET_HashCode vhash;
1217
1218   if (size > MAX_DATUM_SIZE)
1219     {
1220       GNUNET_break (0);
1221       return GNUNET_SYSERR;
1222     }
1223   hashSize = sizeof (GNUNET_HashCode);
1224   hashSize2 = sizeof (GNUNET_HashCode);
1225   GNUNET_CRYPTO_hash (data, size, &vhash);
1226   if (GNUNET_OK != do_insert_value (plugin,
1227                                     data, size, &vkey))
1228     return GNUNET_SYSERR;
1229   if (GNUNET_OK !=
1230       prepared_statement_run (plugin,
1231                               plugin->insert_entry,
1232                               NULL,
1233                               MYSQL_TYPE_LONG,
1234                               &irepl,
1235                               GNUNET_YES,
1236                               MYSQL_TYPE_LONG,
1237                               &itype,
1238                               GNUNET_YES,
1239                               MYSQL_TYPE_LONG,
1240                               &ipriority,
1241                               GNUNET_YES,
1242                               MYSQL_TYPE_LONG,
1243                               &ianonymity,
1244                               GNUNET_YES,
1245                               MYSQL_TYPE_LONGLONG,
1246                               &lexpiration,
1247                               GNUNET_YES,
1248                               MYSQL_TYPE_BLOB,
1249                               key,
1250                               hashSize,
1251                               &hashSize,
1252                               MYSQL_TYPE_BLOB,
1253                               &vhash,
1254                               hashSize2,
1255                               &hashSize2,
1256                               MYSQL_TYPE_LONGLONG,
1257                               &vkey, GNUNET_YES, -1))
1258     {
1259       do_delete_value (plugin, vkey);
1260       return GNUNET_SYSERR;
1261     }
1262 #if DEBUG_MYSQL
1263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264               "Inserted value `%s' number %llu with size %u into gn090 table\n",
1265               GNUNET_h2s (key),
1266               vkey,
1267               (unsigned int) size);
1268 #endif
1269   if (size > 0)
1270     plugin->env->duc (plugin->env->cls,
1271                       size);
1272   return GNUNET_OK;
1273 }
1274
1275
1276 /**
1277  * Update the priority for a particular key in the datastore.  If
1278  * the expiration time in value is different than the time found in
1279  * the datastore, the higher value should be kept.  For the
1280  * anonymity level, the lower value is to be used.  The specified
1281  * priority should be added to the existing priority, ignoring the
1282  * priority in value.
1283  *
1284  * Note that it is possible for multiple values to match this put.
1285  * In that case, all of the respective values are updated.
1286  *
1287  * @param cls our "struct Plugin*"
1288  * @param uid unique identifier of the datum
1289  * @param delta by how much should the priority
1290  *     change?  If priority + delta < 0 the
1291  *     priority should be set to 0 (never go
1292  *     negative).
1293  * @param expire new expiration time should be the
1294  *     MAX of any existing expiration time and
1295  *     this value
1296  * @param msg set to error message
1297  * @return GNUNET_OK on success
1298  */
1299 static int
1300 mysql_plugin_update (void *cls,
1301                      uint64_t uid,
1302                      int delta, 
1303                      struct GNUNET_TIME_Absolute expire,
1304                      char **msg)
1305 {
1306   struct Plugin *plugin = cls;
1307   unsigned long long vkey = uid;
1308   unsigned long long lexpire = expire.abs_value;
1309   int ret;
1310
1311 #if DEBUG_MYSQL
1312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1313               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
1314               vkey,
1315               delta,
1316               lexpire);
1317 #endif
1318   ret = prepared_statement_run (plugin,
1319                                 plugin->update_entry,
1320                                 NULL,
1321                                 MYSQL_TYPE_LONG,
1322                                 &delta,
1323                                 GNUNET_NO,
1324                                 MYSQL_TYPE_LONGLONG,
1325                                 &lexpire,
1326                                 GNUNET_YES,
1327                                 MYSQL_TYPE_LONGLONG,
1328                                 &lexpire,
1329                                 GNUNET_YES,
1330                                 MYSQL_TYPE_LONGLONG,
1331                                 &vkey,
1332                                 GNUNET_YES, -1);
1333   if (ret != GNUNET_OK)
1334     {
1335       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1336                   "Failed to update value %llu\n",
1337                   vkey);
1338     }
1339   return ret;
1340 }
1341
1342
1343 struct GetContext
1344 {
1345   GNUNET_HashCode key;
1346   GNUNET_HashCode vhash;
1347
1348   unsigned int prio;
1349   unsigned int anonymity;
1350   unsigned long long expiration;
1351   unsigned long long vkey;
1352   unsigned long long total;
1353   int off;
1354   int count;
1355   int have_vhash;
1356 };
1357
1358
1359 static int
1360 get_statement_prepare (void *cls,
1361                        struct NextRequestClosure *nrc)
1362 {
1363   struct GetContext *gc = cls;
1364   struct Plugin *plugin;
1365   int ret;
1366   unsigned int limit_off;
1367   unsigned long hashSize;
1368
1369   if (NULL == nrc)
1370     {
1371       GNUNET_free (gc);
1372       return GNUNET_NO;
1373     }
1374   if (gc->count == gc->total)
1375     return GNUNET_NO;
1376   plugin = nrc->plugin;
1377   hashSize = sizeof (GNUNET_HashCode);
1378   if (gc->count + gc->off == gc->total)
1379     nrc->last_vkey = 0;          /* back to start */
1380   if (gc->count == 0)
1381     limit_off = gc->off;
1382   else
1383     limit_off = 0;
1384 #if DEBUG_MYSQL
1385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386               "Obtaining result number %d/%lld at offset %d with lvc %llu for GET `%s'\n",
1387               gc->count+1,
1388               gc->total,
1389               limit_off,
1390               nrc->last_vkey,
1391               GNUNET_h2s (&gc->key));  
1392 #endif
1393   if (nrc->type != 0)
1394     {
1395       if (gc->have_vhash)
1396         {
1397           ret =
1398             prepared_statement_run_select
1399             (plugin,
1400              plugin->select_entry_by_hash_vhash_and_type, 6, nrc->rbind, &return_ok,
1401              NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1402              MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1403              MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
1404              &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
1405              -1);
1406         }
1407       else
1408         {
1409           ret =
1410             prepared_statement_run_select
1411             (plugin,
1412              plugin->select_entry_by_hash_and_type, 6, nrc->rbind, &return_ok, NULL,
1413              MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1414              MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
1415              &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
1416              -1);
1417         }
1418     }
1419   else
1420     {
1421       if (gc->have_vhash)
1422         {
1423           ret =
1424             prepared_statement_run_select
1425             (plugin,
1426              plugin->select_entry_by_hash_and_vhash, 6, nrc->rbind, &return_ok, NULL,
1427              MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
1428              &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG,
1429              &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
1430              GNUNET_YES, -1);
1431         }
1432       else
1433         {
1434           ret =
1435             prepared_statement_run_select
1436             (plugin,
1437              plugin->select_entry_by_hash, 6, nrc->rbind, &return_ok, NULL,
1438              MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1439              MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
1440              &limit_off, GNUNET_YES, -1);
1441         }
1442     }
1443   gc->count++;
1444   return ret;
1445 }
1446
1447
1448 /**
1449  * Iterate over the results for a particular key
1450  * in the datastore.
1451  *
1452  * @param cls closure
1453  * @param key maybe NULL (to match all entries)
1454  * @param vhash hash of the value, maybe NULL (to
1455  *        match all values that have the right key).
1456  *        Note that for DBlocks there is no difference
1457  *        betwen key and vhash, but for other blocks
1458  *        there may be!
1459  * @param type entries of which type are relevant?
1460  *     Use 0 for any type.
1461  * @param iter function to call on each matching value;
1462  *        will be called once with a NULL value at the end
1463  * @param iter_cls closure for iter
1464  */
1465 static void
1466 mysql_plugin_get (void *cls,
1467                   const GNUNET_HashCode *key,
1468                   const GNUNET_HashCode *vhash,
1469                   enum GNUNET_BLOCK_Type type,
1470                   PluginIterator iter, void *iter_cls)
1471 {
1472   struct Plugin *plugin = cls;
1473   unsigned int itype = type;
1474   int ret;
1475   MYSQL_BIND cbind[1];
1476   struct GetContext *gc;
1477   struct NextRequestClosure *nrc;
1478   long long total;
1479   unsigned long hashSize;
1480
1481   GNUNET_assert (key != NULL);
1482   if (iter == NULL) 
1483     return;
1484   hashSize = sizeof (GNUNET_HashCode);
1485   memset (cbind, 0, sizeof (cbind));
1486   total = -1;
1487   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1488   cbind[0].buffer = &total;
1489   cbind[0].is_unsigned = GNUNET_NO;
1490   if (type != 0)
1491     {
1492       if (vhash != NULL)
1493         {
1494           ret =
1495             prepared_statement_run_select
1496             (plugin,
1497              plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL,
1498              MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
1499              vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1500              -1);
1501         }
1502       else
1503         {
1504           ret =
1505             prepared_statement_run_select
1506             (plugin,
1507              plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL,
1508              MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG,
1509              &itype, GNUNET_YES, -1);
1510
1511         }
1512     }
1513   else
1514     {
1515       if (vhash != NULL)
1516         {
1517           ret =
1518             prepared_statement_run_select
1519             (plugin,
1520              plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL,
1521              MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
1522              vhash, hashSize, &hashSize, -1);
1523
1524         }
1525       else
1526         {
1527           ret =
1528             prepared_statement_run_select (plugin,
1529                                            plugin->count_entry_by_hash,
1530                                            1, cbind, &return_ok,
1531                                            NULL, MYSQL_TYPE_BLOB,
1532                                            key, hashSize,
1533                                            &hashSize, -1);
1534         }
1535     }
1536   if ((ret != GNUNET_OK) || (0 >= total))
1537     {
1538       iter (iter_cls, 
1539             NULL, NULL, 0, NULL, 0, 0, 0, 
1540             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1541       return;
1542     }
1543 #if DEBUG_MYSQL
1544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1545               "Iterating over %lld results for GET `%s'\n",
1546               total,
1547               GNUNET_h2s (key));
1548 #endif
1549   gc = GNUNET_malloc (sizeof (struct GetContext));
1550   gc->key = *key;
1551   if (vhash != NULL)
1552     {
1553       gc->have_vhash = GNUNET_YES;
1554       gc->vhash = *vhash;
1555     }
1556   gc->total = total;
1557   gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1558   
1559
1560   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1561   nrc->plugin = plugin;
1562   nrc->type = type;  
1563   nrc->dviter = iter;
1564   nrc->dviter_cls = iter_cls;
1565   nrc->prep = &get_statement_prepare;
1566   nrc->prep_cls = gc;
1567   nrc->last_vkey = 0;
1568   mysql_plugin_next_request (nrc, GNUNET_NO);
1569 }
1570
1571
1572 /**
1573  * Run the prepared statement to get the next data item ready.
1574  * 
1575  * @param cls not used
1576  * @param nrc closure for the next request iterator
1577  * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1578  */
1579 static int
1580 iterator_zero_prepare (void *cls,
1581                        struct NextRequestClosure *nrc)
1582 {
1583   struct Plugin *plugin;
1584
1585   if (nrc == NULL)
1586     return GNUNET_NO;
1587   plugin = nrc->plugin;
1588   return prepared_statement_run_select (plugin,
1589                                         plugin->zero_iter,
1590                                         6,
1591                                         nrc->rbind,
1592                                         &return_ok,
1593                                         NULL,
1594                                         MYSQL_TYPE_LONGLONG,
1595                                         &nrc->now.abs_value,
1596                                         GNUNET_YES,
1597                                         MYSQL_TYPE_LONGLONG,
1598                                         &nrc->last_vkey,
1599                                         GNUNET_YES,
1600                                         MYSQL_TYPE_LONGLONG,
1601                                         &nrc->last_prio,
1602                                         GNUNET_YES,
1603                                         MYSQL_TYPE_LONGLONG,
1604                                         &nrc->last_vkey,
1605                                         GNUNET_YES,
1606                                         -1);
1607 }
1608
1609
1610 /**
1611  * Select a subset of the items in the datastore and call
1612  * the given iterator for each of them.
1613  *
1614  * @param cls our "struct Plugin*"
1615  * @param type entries of which type should be considered?
1616  *        Use 0 for any type.
1617  * @param iter function to call on each matching value;
1618  *        will be called once with a NULL value at the end
1619  * @param iter_cls closure for iter
1620  */
1621 static void
1622 mysql_plugin_iter_zero_anonymity (void *cls,
1623                                   enum GNUNET_BLOCK_Type type,
1624                                   PluginIterator iter,
1625                                   void *iter_cls)
1626 {
1627   struct Plugin *plugin = cls;
1628   struct NextRequestClosure *nrc;
1629
1630   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1631   nrc->plugin = plugin;
1632   nrc->type = type;  
1633   nrc->dviter = iter;
1634   nrc->dviter_cls = iter_cls;
1635   nrc->prep = &iterator_zero_prepare;
1636   nrc->last_vkey = INT64_MAX; /* MySQL only supports 63 bits, hence signed */
1637   nrc->last_prio = INT32_MAX; /* similar issue... */
1638   mysql_plugin_next_request (nrc, GNUNET_NO);
1639 }
1640
1641
1642 /**
1643  * Run the SELECT statement for the replication function.
1644  * 
1645  * @param cls the 'struct Plugin'
1646  * @param nrc the context (not used)
1647  */
1648 static int
1649 replication_prepare (void *cls,
1650                      struct NextRequestClosure *nrc)
1651 {
1652   struct Plugin *plugin = cls;
1653   long long nt;
1654
1655   nt = (long long) nrc->now.abs_value;
1656   return prepared_statement_run_select
1657     (plugin,
1658      plugin->select_replication, 
1659      6, nrc->rbind, 
1660      &return_ok, NULL,
1661      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1662      -1);
1663 }
1664
1665
1666 /**
1667  * Get a random item for replication.  Returns a single, not expired, random item
1668  * from those with the highest replication counters.  The item's 
1669  * replication counter is decremented by one IF it was positive before.
1670  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1671  *
1672  * @param cls closure
1673  * @param iter function to call the value (once only).
1674  * @param iter_cls closure for iter
1675  */
1676 static void
1677 mysql_plugin_replication_get (void *cls,
1678                               PluginIterator iter, void *iter_cls)
1679 {
1680   struct Plugin *plugin = cls;
1681   struct NextRequestClosure nrc;
1682
1683   memset (&nrc, 0, sizeof (nrc));
1684   nrc.plugin = plugin;
1685   nrc.now = GNUNET_TIME_absolute_get ();
1686   nrc.prep = &replication_prepare;
1687   nrc.prep_cls = plugin;
1688   nrc.type = 0;
1689   nrc.dviter = iter;
1690   nrc.dviter_cls = iter_cls;
1691   nrc.end_it = GNUNET_NO;
1692   mysql_next_request_cont (&nrc, NULL);
1693 }
1694
1695
1696 /**
1697  * Run the SELECT statement for the expiration function.
1698  * 
1699  * @param cls the 'struct Plugin'
1700  * @param nrc the context (not used)
1701  */
1702 static int
1703 expiration_prepare (void *cls,
1704                     struct NextRequestClosure *nrc)
1705 {
1706   struct Plugin *plugin = cls;
1707   long long nt;
1708
1709   nt = (long long) nrc->now.abs_value;
1710   return prepared_statement_run_select
1711     (plugin,
1712      plugin->select_expiration, 
1713      6, nrc->rbind, 
1714      &return_ok, NULL,
1715      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1716      -1);
1717 }
1718
1719
1720 /**
1721  * Get a random item for expiration.
1722  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1723  *
1724  * @param cls closure
1725  * @param iter function to call the value (once only).
1726  * @param iter_cls closure for iter
1727  */
1728 static void
1729 mysql_plugin_expiration_get (void *cls,
1730                              PluginIterator iter, void *iter_cls)
1731 {
1732   struct Plugin *plugin = cls;
1733   struct NextRequestClosure nrc;
1734
1735   memset (&nrc, 0, sizeof (nrc));
1736   nrc.plugin = plugin;
1737   nrc.now = GNUNET_TIME_absolute_get ();
1738   nrc.prep = &expiration_prepare;
1739   nrc.prep_cls = plugin;
1740   nrc.type = 0;
1741   nrc.dviter = iter;
1742   nrc.dviter_cls = iter_cls;
1743   nrc.end_it = GNUNET_NO;
1744   mysql_next_request_cont (&nrc, NULL);
1745 }
1746
1747
1748 /**
1749  * Drop database.
1750  *
1751  * @param cls the "struct Plugin*"
1752  */
1753 static void 
1754 mysql_plugin_drop (void *cls)
1755 {
1756   struct Plugin *plugin = cls;
1757
1758   if ((GNUNET_OK != run_statement (plugin,
1759                                    "DROP TABLE gn090")) ||
1760       (GNUNET_OK != run_statement (plugin,
1761                                    "DROP TABLE gn072")))
1762     return;           /* error */
1763   plugin->env->duc (plugin->env->cls, 0);
1764 }
1765
1766
1767 /**
1768  * Entry point for the plugin.
1769  *
1770  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1771  * @return our "struct Plugin*"
1772  */
1773 void *
1774 libgnunet_plugin_datastore_mysql_init (void *cls)
1775 {
1776   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1777   struct GNUNET_DATASTORE_PluginFunctions *api;
1778   struct Plugin *plugin;
1779
1780   plugin = GNUNET_malloc (sizeof (struct Plugin));
1781   plugin->env = env;
1782   plugin->cnffile = get_my_cnf_path (env->cfg);
1783   if (GNUNET_OK != iopen (plugin))
1784     {
1785       iclose (plugin);
1786       GNUNET_free_non_null (plugin->cnffile);
1787       GNUNET_free (plugin);
1788       return NULL;
1789     }
1790 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1791 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1792   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1793              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1794              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1795              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1796              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1797              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1798              " hash BINARY(64) NOT NULL DEFAULT '',"
1799              " vhash BINARY(64) NOT NULL DEFAULT '',"
1800              " vkey BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1801              " INDEX hash (hash(64)),"
1802              " INDEX hash_vhash_vkey (hash(64),vhash(64),vkey),"
1803              " INDEX hash_vkey (hash(64),vkey),"
1804              " INDEX vkey (vkey),"
1805              " INDEX prio (prio,vkey),"
1806              " INDEX expire (expire,vkey,type),"
1807              " INDEX anonLevel (anonLevel,prio,vkey,type)"
1808              ") ENGINE=InnoDB") ||
1809       MRUNS ("CREATE TABLE IF NOT EXISTS gn072 ("
1810              " vkey BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,"
1811              " value BLOB NOT NULL DEFAULT '') ENGINE=MyISAM") ||
1812       MRUNS ("SET AUTOCOMMIT = 1") ||
1813       PINIT (plugin->select_value, SELECT_VALUE) ||
1814       PINIT (plugin->delete_value, DELETE_VALUE) ||
1815       PINIT (plugin->insert_value, INSERT_VALUE) ||
1816       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1817       PINIT (plugin->delete_entry_by_vkey, DELETE_ENTRY_BY_VKEY) ||
1818       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1819       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1820       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1821       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1822                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1823       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1824       || PINIT (plugin->get_size, SELECT_SIZE)
1825       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1826       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1827       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1828                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1829       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1830       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1831       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1832       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1833     {
1834       iclose (plugin);
1835       GNUNET_free_non_null (plugin->cnffile);
1836       GNUNET_free (plugin);
1837       return NULL;
1838     }
1839 #undef PINIT
1840 #undef MRUNS
1841
1842   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1843   api->cls = plugin;
1844   api->get_size = &mysql_plugin_get_size;
1845   api->put = &mysql_plugin_put;
1846   api->next_request = &mysql_plugin_next_request;
1847   api->get = &mysql_plugin_get;
1848   api->replication_get = &mysql_plugin_replication_get;
1849   api->expiration_get = &mysql_plugin_expiration_get;
1850   api->update = &mysql_plugin_update;
1851   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1852   api->drop = &mysql_plugin_drop;
1853   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1854                    "mysql", _("Mysql database running\n"));
1855   return api;
1856 }
1857
1858
1859 /**
1860  * Exit point from the plugin.
1861  * @param cls our "struct Plugin*"
1862  * @return always NULL
1863  */
1864 void *
1865 libgnunet_plugin_datastore_mysql_done (void *cls)
1866 {
1867   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1868   struct Plugin *plugin = api->cls;
1869
1870   iclose (plugin);
1871   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1872     {
1873       GNUNET_SCHEDULER_cancel (plugin->next_task);
1874       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1875       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1876       GNUNET_free (plugin->next_task_nc);
1877       plugin->next_task_nc = NULL;
1878     }
1879   GNUNET_free_non_null (plugin->cnffile);
1880   GNUNET_free (plugin);
1881   GNUNET_free (api);
1882   mysql_library_end ();
1883   return NULL;
1884 }
1885
1886 /* end of plugin_datastore_mysql.c */