fixes
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163 /**
164  * Context for the universal iterator.
165  */
166 struct NextRequestClosure;
167
168 /**
169  * Type of a function that will prepare
170  * the next iteration.
171  *
172  * @param cls closure
173  * @param nc the next context; NULL for the last
174  *         call which gives the callback a chance to
175  *         clean up the closure
176  * @return GNUNET_OK on success, GNUNET_NO if there are
177  *         no more values, GNUNET_SYSERR on error
178  */
179 typedef int (*PrepareFunction)(void *cls,
180                                struct NextRequestClosure *nc);
181
182
183 struct NextRequestClosure
184 {
185   struct Plugin *plugin;
186
187   struct GNUNET_TIME_Absolute now;
188
189   /**
190    * Function to call to prepare the next
191    * iteration.
192    */
193   PrepareFunction prep;
194
195   /**
196    * Closure for prep.
197    */
198   void *prep_cls;
199
200   MYSQL_BIND rbind[7];
201
202   enum GNUNET_BLOCK_Type type;
203
204   PluginIterator dviter;
205
206   void *dviter_cls;
207
208   unsigned int count;
209
210   int end_it;
211
212   int one_shot;
213 };
214
215
216 /**
217  * Context for all functions in this plugin.
218  */
219 struct Plugin 
220 {
221   /**
222    * Our execution environment.
223    */
224   struct GNUNET_DATASTORE_PluginEnvironment *env;
225
226   /**
227    * Handle to talk to MySQL.
228    */
229   MYSQL *dbf;
230   
231   /**
232    * We keep all prepared statements in a DLL.  This is the head.
233    */
234   struct GNUNET_MysqlStatementHandle *shead;
235
236   /**
237    * We keep all prepared statements in a DLL.  This is the tail.
238    */
239   struct GNUNET_MysqlStatementHandle *stail;
240
241   /**
242    * Filename of "my.cnf" (msyql configuration).
243    */
244   char *cnffile;
245
246   /**
247    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
248    */
249   struct NextRequestClosure *next_task_nc;
250
251   /**
252    * Pending task with scheduler for running the next request.
253    */
254   GNUNET_SCHEDULER_TaskIdentifier next_task;
255
256   /**
257    * Prepared statements.
258    */
259 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
260   struct GNUNET_MysqlStatementHandle *insert_entry;
261   
262 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
263   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
264
265 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
266   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
267   
268 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_uid) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
269   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
270
271 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
272   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
273
274 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
275   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
276  
277 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
278   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
279
280 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
281   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
282  
283 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
284   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
285   
286 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
287   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
288
289 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
290   struct GNUNET_MysqlStatementHandle *update_entry;
291
292 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
293   struct GNUNET_MysqlStatementHandle *dec_repl;
294
295 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
296   struct GNUNET_MysqlStatementHandle *get_size;
297
298 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_anonLevel_uid) WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
299   struct GNUNET_MysqlStatementHandle *zero_iter;
300
301 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_expire_prio) WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
302   "UNION "\
303   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_prio) ORDER BY prio ASC LIMIT 1) "\
304   "ORDER BY expire ASC LIMIT 1"
305   struct GNUNET_MysqlStatementHandle *select_expiration;
306
307 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_repl) ORDER BY repl DESC,RAND() LIMIT 1"
308   struct GNUNET_MysqlStatementHandle *select_replication;
309
310 };
311
312
313 /**
314  * Obtain the location of ".my.cnf".
315  *
316  * @param cfg our configuration
317  * @return NULL on error
318  */
319 static char *
320 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
321 {
322   char *cnffile;
323   char *home_dir;
324   struct stat st;
325 #ifndef WINDOWS
326   struct passwd *pw;
327 #endif
328   int configured;
329
330 #ifndef WINDOWS
331   pw = getpwuid (getuid ());
332   if (!pw)
333     {
334       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
335                            "getpwuid");
336       return NULL;
337     }
338   if (GNUNET_YES ==
339       GNUNET_CONFIGURATION_have_value (cfg,
340                                        "datastore-mysql", "CONFIG"))
341     {
342       GNUNET_assert (GNUNET_OK == 
343                      GNUNET_CONFIGURATION_get_value_filename (cfg,
344                                                               "datastore-mysql", "CONFIG", &cnffile));
345       configured = GNUNET_YES;
346     }
347   else
348     {
349       home_dir = GNUNET_strdup (pw->pw_dir);
350 #else
351       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
352       plibc_conv_to_win_path ("~/", home_dir);
353 #endif
354       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
355       GNUNET_free (home_dir);
356       configured = GNUNET_NO;
357     }
358   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
359               _("Trying to use file `%s' for MySQL configuration.\n"),
360               cnffile);
361   if ((0 != STAT (cnffile, &st)) ||
362       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
363     {
364       if (configured == GNUNET_YES)
365         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
366                     _("Could not access file `%s': %s\n"), cnffile,
367                     STRERROR (errno));
368       GNUNET_free (cnffile);
369       return NULL;
370     }
371   return cnffile;
372 }
373
374
375
376 /**
377  * Free a prepared statement.
378  *
379  * @param plugin plugin context
380  * @param s prepared statement
381  */
382 static void
383 prepared_statement_destroy (struct Plugin *plugin, 
384                             struct GNUNET_MysqlStatementHandle
385                             *s)
386 {
387   GNUNET_CONTAINER_DLL_remove (plugin->shead,
388                                plugin->stail,
389                                s);
390   if (s->valid)
391     mysql_stmt_close (s->statement);
392   GNUNET_free (s->query);
393   GNUNET_free (s);
394 }
395
396
397 /**
398  * Close database connection and all prepared statements (we got a DB
399  * disconnect error).
400  */
401 static int
402 iclose (struct Plugin *plugin)
403 {
404   struct GNUNET_MysqlStatementHandle *spos;
405
406   spos = plugin->shead;
407   while (NULL != plugin->shead)
408     prepared_statement_destroy (plugin,
409                                 plugin->shead);
410   if (plugin->dbf != NULL)
411     {
412       mysql_close (plugin->dbf);
413       plugin->dbf = NULL;
414     }
415   return GNUNET_OK;
416 }
417
418
419 /**
420  * Open the connection with the database (and initialize
421  * our default options).
422  *
423  * @return GNUNET_OK on success
424  */
425 static int
426 iopen (struct Plugin *ret)
427 {
428   char *mysql_dbname;
429   char *mysql_server;
430   char *mysql_user;
431   char *mysql_password;
432   unsigned long long mysql_port;
433   my_bool reconnect;
434   unsigned int timeout;
435
436   ret->dbf = mysql_init (NULL);
437   if (ret->dbf == NULL)
438     return GNUNET_SYSERR;
439   if (ret->cnffile != NULL)
440     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
441   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
442   reconnect = 0;
443   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
444   mysql_options (ret->dbf,
445                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
446   mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
447   timeout = 60; /* in seconds */
448   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
449   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
450   mysql_dbname = NULL;
451   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
452                                                      "datastore-mysql", "DATABASE"))
453     GNUNET_assert (GNUNET_OK == 
454                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
455                                                           "datastore-mysql", "DATABASE", 
456                                                           &mysql_dbname));
457   else
458     mysql_dbname = GNUNET_strdup ("gnunet");
459   mysql_user = NULL;
460   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
461                                                      "datastore-mysql", "USER"))
462     {
463       GNUNET_assert (GNUNET_OK == 
464                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
465                                                            "datastore-mysql", "USER", 
466                                                            &mysql_user));
467     }
468   mysql_password = NULL;
469   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
470                                                      "datastore-mysql", "PASSWORD"))
471     {
472       GNUNET_assert (GNUNET_OK ==
473                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
474                                                            "datastore-mysql", "PASSWORD",
475                                                            &mysql_password));
476     }
477   mysql_server = NULL;
478   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
479                                                      "datastore-mysql", "HOST"))
480     {
481       GNUNET_assert (GNUNET_OK == 
482                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
483                                                            "datastore-mysql", "HOST", 
484                                                            &mysql_server));
485     }
486   mysql_port = 0;
487   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
488                                                      "datastore-mysql", "PORT"))
489     {
490       GNUNET_assert (GNUNET_OK ==
491                     GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
492                                                            "PORT", &mysql_port));
493     }
494
495   GNUNET_assert (mysql_dbname != NULL);
496   mysql_real_connect (ret->dbf, 
497                       mysql_server, 
498                       mysql_user, mysql_password,
499                       mysql_dbname, 
500                       (unsigned int) mysql_port, NULL,
501                       CLIENT_IGNORE_SIGPIPE);
502   GNUNET_free_non_null (mysql_server);
503   GNUNET_free_non_null (mysql_user);
504   GNUNET_free_non_null (mysql_password);
505   GNUNET_free (mysql_dbname);
506   if (mysql_error (ret->dbf)[0])
507     {
508       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
509                  "mysql_real_connect", ret);
510       return GNUNET_SYSERR;
511     }
512   return GNUNET_OK;
513 }
514
515
516 /**
517  * Run the given MySQL statement.
518  *
519  * @param plugin plugin context
520  * @param statement SQL statement to run
521  * @return GNUNET_OK on success, GNUNET_SYSERR on error
522  */
523 static int
524 run_statement (struct Plugin *plugin,
525                const char *statement)
526 {
527   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
528     return GNUNET_SYSERR;
529   mysql_query (plugin->dbf, statement);
530   if (mysql_error (plugin->dbf)[0])
531     {
532       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
533                  "mysql_query", plugin);
534       iclose (plugin);
535       return GNUNET_SYSERR;
536     }
537   return GNUNET_OK;
538 }
539
540
541 /**
542  * Create a prepared statement.
543  *
544  * @param plugin plugin context
545  * @param statement SQL statement text to prepare
546  * @return NULL on error
547  */
548 static struct GNUNET_MysqlStatementHandle *
549 prepared_statement_create (struct Plugin *plugin, 
550                            const char *statement)
551 {
552   struct GNUNET_MysqlStatementHandle *ret;
553
554   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
555   ret->query = GNUNET_strdup (statement);
556   GNUNET_CONTAINER_DLL_insert (plugin->shead,
557                                plugin->stail,
558                                ret);
559   return ret;
560 }
561
562
563 /**
564  * Prepare a statement for running.
565  *
566  * @param plugin plugin context
567  * @param ret handle to prepared statement
568  * @return GNUNET_OK on success
569  */
570 static int
571 prepare_statement (struct Plugin *plugin, 
572                    struct GNUNET_MysqlStatementHandle *ret)
573 {
574   if (GNUNET_YES == ret->valid)
575     return GNUNET_OK;
576   if ((NULL == plugin->dbf) && 
577       (GNUNET_OK != iopen (plugin)))
578     return GNUNET_SYSERR;
579   ret->statement = mysql_stmt_init (plugin->dbf);
580   if (ret->statement == NULL)
581     {
582       iclose (plugin);
583       return GNUNET_SYSERR;
584     }
585   if (mysql_stmt_prepare (ret->statement, 
586                           ret->query,
587                           strlen (ret->query)))
588     {
589       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
590                  "mysql_stmt_prepare", 
591                  plugin);
592       mysql_stmt_close (ret->statement);
593       ret->statement = NULL;
594       iclose (plugin);
595       return GNUNET_SYSERR;
596     }
597   ret->valid = GNUNET_YES;
598   return GNUNET_OK;
599
600 }
601
602
603 /**
604  * Bind the parameters for the given MySQL statement
605  * and run it.
606  *
607  * @param plugin plugin context
608  * @param s statement to bind and run
609  * @param ap arguments for the binding
610  * @return GNUNET_SYSERR on error, GNUNET_OK on success
611  */
612 static int
613 init_params (struct Plugin *plugin,
614              struct GNUNET_MysqlStatementHandle *s,
615              va_list ap)
616 {
617   MYSQL_BIND qbind[MAX_PARAM];
618   unsigned int pc;
619   unsigned int off;
620   enum enum_field_types ft;
621
622   pc = mysql_stmt_param_count (s->statement);
623   if (pc > MAX_PARAM)
624     {
625       /* increase internal constant! */
626       GNUNET_break (0);
627       return GNUNET_SYSERR;
628     }
629   memset (qbind, 0, sizeof (qbind));
630   off = 0;
631   ft = 0;
632   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
633     {
634       qbind[off].buffer_type = ft;
635       switch (ft)
636         {
637         case MYSQL_TYPE_FLOAT:
638           qbind[off].buffer = va_arg (ap, float *);
639           break;
640         case MYSQL_TYPE_LONGLONG:
641           qbind[off].buffer = va_arg (ap, unsigned long long *);
642           qbind[off].is_unsigned = va_arg (ap, int);
643           break;
644         case MYSQL_TYPE_LONG:
645           qbind[off].buffer = va_arg (ap, unsigned int *);
646           qbind[off].is_unsigned = va_arg (ap, int);
647           break;
648         case MYSQL_TYPE_VAR_STRING:
649         case MYSQL_TYPE_STRING:
650         case MYSQL_TYPE_BLOB:
651           qbind[off].buffer = va_arg (ap, void *);
652           qbind[off].buffer_length = va_arg (ap, unsigned long);
653           qbind[off].length = va_arg (ap, unsigned long *);
654           break;
655         default:
656           /* unsupported type */
657           GNUNET_break (0);
658           return GNUNET_SYSERR;
659         }
660       pc--;
661       off++;
662     }
663   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
664     {
665       GNUNET_assert (0);
666       return GNUNET_SYSERR;
667     }
668   if (mysql_stmt_bind_param (s->statement, qbind))
669     {
670       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671                   _("`%s' failed at %s:%d with error: %s\n"),
672                   "mysql_stmt_bind_param",
673                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
674       iclose (plugin);
675       return GNUNET_SYSERR;
676     }
677   if (mysql_stmt_execute (s->statement))
678     {
679       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
680                   _("`%s' failed at %s:%d with error: %s\n"),
681                   "mysql_stmt_execute",
682                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
683       iclose (plugin);
684       return GNUNET_SYSERR;
685     }
686   return GNUNET_OK;
687 }
688
689 /**
690  * Type of a callback that will be called for each
691  * data set returned from MySQL.
692  *
693  * @param cls user-defined argument
694  * @param num_values number of elements in values
695  * @param values values returned by MySQL
696  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
697  */
698 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
699                                           unsigned int num_values,
700                                           MYSQL_BIND *values);
701
702
703 /**
704  * Run a prepared SELECT statement.
705  *
706  * @param plugin plugin context
707  * @param s statement to run
708  * @param result_size number of elements in results array
709  * @param results pointer to already initialized MYSQL_BIND
710  *        array (of sufficient size) for passing results
711  * @param processor function to call on each result
712  * @param processor_cls extra argument to processor
713  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
714  *        values (size + buffer-reference for pointers); terminated
715  *        with "-1"
716  * @return GNUNET_SYSERR on error, otherwise
717  *         the number of successfully affected (or queried) rows
718  */
719 static int
720 prepared_statement_run_select (struct Plugin *plugin,
721                                struct GNUNET_MysqlStatementHandle *s,
722                                unsigned int result_size,
723                                MYSQL_BIND *results,
724                                GNUNET_MysqlDataProcessor processor, void *processor_cls,
725                                ...)
726 {
727   va_list ap;
728   int ret;
729   unsigned int rsize;
730   int total;
731
732   if (GNUNET_OK != prepare_statement (plugin, s))
733     {
734       GNUNET_break (0);
735       return GNUNET_SYSERR;
736     }
737   va_start (ap, processor_cls);
738   if (GNUNET_OK != init_params (plugin, s, ap))
739     {
740       GNUNET_break (0);
741       va_end (ap);
742       return GNUNET_SYSERR;
743     }
744   va_end (ap);
745   rsize = mysql_stmt_field_count (s->statement);
746   if (rsize > result_size)
747     {
748       GNUNET_break (0);
749       return GNUNET_SYSERR;
750     }
751   if (mysql_stmt_bind_result (s->statement, results))
752     {
753       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
754                   _("`%s' failed at %s:%d with error: %s\n"),
755                   "mysql_stmt_bind_result",
756                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
757       iclose (plugin);
758       return GNUNET_SYSERR;
759     }
760
761   total = 0;
762   while (1)
763     {
764       ret = mysql_stmt_fetch (s->statement);
765       if (ret == MYSQL_NO_DATA)
766         break;
767       if (ret != 0)
768         {
769           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
770                       _("`%s' failed at %s:%d with error: %s\n"),
771                       "mysql_stmt_fetch",
772                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
773           iclose (plugin);
774           return GNUNET_SYSERR;
775         }
776       if (processor != NULL)
777         if (GNUNET_OK != processor (processor_cls, rsize, results))
778           break;
779       total++;
780     }
781   mysql_stmt_reset (s->statement);
782   return total;
783 }
784
785
786 /**
787  * Run a prepared statement that does NOT produce results.
788  *
789  * @param plugin plugin context
790  * @param s statement to run
791  * @param insert_id NULL or address where to store the row ID of whatever
792  *        was inserted (only for INSERT statements!)
793  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
794  *        values (size + buffer-reference for pointers); terminated
795  *        with "-1"
796  * @return GNUNET_SYSERR on error, otherwise
797  *         the number of successfully affected rows
798  */
799 static int
800 prepared_statement_run (struct Plugin *plugin,
801                         struct GNUNET_MysqlStatementHandle *s,
802                         unsigned long long *insert_id, ...)
803 {
804   va_list ap;
805   int affected;
806
807   if (GNUNET_OK != prepare_statement (plugin, s))
808     return GNUNET_SYSERR;
809   va_start (ap, insert_id);
810   if (GNUNET_OK != init_params (plugin, s, ap))
811     {
812       va_end (ap);
813       return GNUNET_SYSERR;
814     }
815   va_end (ap);
816   affected = mysql_stmt_affected_rows (s->statement);
817   if (NULL != insert_id)
818     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
819   mysql_stmt_reset (s->statement);
820   return affected;
821 }
822
823
824 /**
825  * Delete an entry from the gn090 table.
826  *
827  * @param plugin plugin context
828  * @param uid unique ID of the entry to delete
829  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
830  */
831 static int
832 do_delete_entry (struct Plugin *plugin,
833                  unsigned long long uid)
834 {
835   int ret;
836  
837 #if DEBUG_MYSQL
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "Deleting value %llu from gn090 table\n",
840               uid);
841 #endif
842   ret = prepared_statement_run (plugin,
843                                 plugin->delete_entry_by_uid,
844                                 NULL,
845                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
846                                 -1);
847   if (ret > 0)
848     return GNUNET_OK;
849   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
850               "Deleting value %llu from gn090 table failed\n",
851               uid);
852   return ret;
853 }
854
855
856 /**
857  * Function that simply returns GNUNET_OK
858  *
859  * @param cls closure, not used
860  * @param num_values not used
861  * @param values not used
862  * @return GNUNET_OK
863  */
864 static int
865 return_ok (void *cls, 
866            unsigned int num_values, 
867            MYSQL_BIND * values)
868 {
869   return GNUNET_OK;
870 }
871
872
873 /**
874  * Get an estimate of how much space the database is
875  * currently using.
876  *
877  * @param cls our "struct Plugin *"
878  * @return number of bytes used on disk
879  */
880 static unsigned long long
881 mysql_plugin_get_size (void *cls)
882 {
883   struct Plugin *plugin = cls;
884   MYSQL_BIND cbind[1];
885   long long total;
886
887   memset (cbind, 0, sizeof (cbind));
888   total = 0;
889   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
890   cbind[0].buffer = &total;
891   cbind[0].is_unsigned = GNUNET_NO;
892   if (GNUNET_OK != 
893       prepared_statement_run_select (plugin,
894                                      plugin->get_size,
895                                      1, cbind, 
896                                      &return_ok, NULL,
897                                      -1))
898     return 0;
899   return total;
900 }
901
902
903 /**
904  * Store an item in the datastore.
905  *
906  * @param cls closure
907  * @param key key for the item
908  * @param size number of bytes in data
909  * @param data content stored
910  * @param type type of the content
911  * @param priority priority of the content
912  * @param anonymity anonymity-level for the content
913  * @param replication replication-level for the content
914  * @param expiration expiration time for the content
915  * @param msg set to error message
916  * @return GNUNET_OK on success
917  */
918 static int
919 mysql_plugin_put (void *cls,
920                   const GNUNET_HashCode * key,
921                   uint32_t size,
922                   const void *data,
923                   enum GNUNET_BLOCK_Type type,
924                   uint32_t priority,
925                   uint32_t anonymity,
926                   uint32_t replication,
927                   struct GNUNET_TIME_Absolute expiration,
928                   char **msg)
929 {
930   struct Plugin *plugin = cls;
931   unsigned int irepl = replication;
932   unsigned int itype = type;
933   unsigned int ipriority = priority;
934   unsigned int ianonymity = anonymity;
935   unsigned long long lexpiration = expiration.abs_value;
936   unsigned long hashSize;
937   unsigned long hashSize2;
938   unsigned long lsize;
939   GNUNET_HashCode vhash;
940
941   if (size > MAX_DATUM_SIZE)
942     {
943       GNUNET_break (0);
944       return GNUNET_SYSERR;
945     }
946   hashSize = sizeof (GNUNET_HashCode);
947   hashSize2 = sizeof (GNUNET_HashCode);
948   lsize = size;
949   GNUNET_CRYPTO_hash (data, size, &vhash);
950   if (GNUNET_OK !=
951       prepared_statement_run (plugin,
952                               plugin->insert_entry,
953                               NULL,
954                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
955                               MYSQL_TYPE_LONG, &itype, GNUNET_YES,
956                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
957                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
958                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
959                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
960                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
961                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
962                               -1))
963     return GNUNET_SYSERR;    
964 #if DEBUG_MYSQL
965   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
966               "Inserted value `%s' with size %u into gn090 table\n",
967               GNUNET_h2s (key),
968               (unsigned int) size);
969 #endif
970   if (size > 0)
971     plugin->env->duc (plugin->env->cls,
972                       size);
973   return GNUNET_OK;
974 }
975
976
977 /**
978  * Update the priority for a particular key in the datastore.  If
979  * the expiration time in value is different than the time found in
980  * the datastore, the higher value should be kept.  For the
981  * anonymity level, the lower value is to be used.  The specified
982  * priority should be added to the existing priority, ignoring the
983  * priority in value.
984  *
985  * Note that it is possible for multiple values to match this put.
986  * In that case, all of the respective values are updated.
987  *
988  * @param cls our "struct Plugin*"
989  * @param uid unique identifier of the datum
990  * @param delta by how much should the priority
991  *     change?  If priority + delta < 0 the
992  *     priority should be set to 0 (never go
993  *     negative).
994  * @param expire new expiration time should be the
995  *     MAX of any existing expiration time and
996  *     this value
997  * @param msg set to error message
998  * @return GNUNET_OK on success
999  */
1000 static int
1001 mysql_plugin_update (void *cls,
1002                      uint64_t uid,
1003                      int delta, 
1004                      struct GNUNET_TIME_Absolute expire,
1005                      char **msg)
1006 {
1007   struct Plugin *plugin = cls;
1008   unsigned long long vkey = uid;
1009   unsigned long long lexpire = expire.abs_value;
1010   int ret;
1011
1012 #if DEBUG_MYSQL
1013   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
1015               vkey,
1016               delta,
1017               lexpire);
1018 #endif
1019   ret = prepared_statement_run (plugin,
1020                                 plugin->update_entry,
1021                                 NULL,
1022                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
1023                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1024                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1025                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
1026                                 -1);
1027   if (ret != GNUNET_OK)
1028     {
1029       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1030                   "Failed to update value %llu\n",
1031                   vkey);
1032     }
1033   return ret;
1034 }
1035
1036
1037
1038
1039 /**
1040  * Continuation of "mysql_next_request".
1041  *
1042  * @param next_cls the next context
1043  * @param tc the task context (unused)
1044  */
1045 static void 
1046 mysql_next_request_cont (void *next_cls,
1047                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1048 {
1049   struct NextRequestClosure *nrc = next_cls;
1050   struct Plugin *plugin;
1051   int ret;
1052   unsigned int type;
1053   unsigned int priority;
1054   unsigned int anonymity;
1055   unsigned long long exp;
1056   unsigned long hashSize;
1057   unsigned long size;
1058   unsigned long long uid;
1059   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
1060   GNUNET_HashCode key;
1061   struct GNUNET_TIME_Absolute expiration;
1062   MYSQL_BIND *rbind = nrc->rbind;
1063
1064   plugin = nrc->plugin;
1065   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1066   plugin->next_task_nc = NULL;
1067
1068   if (GNUNET_YES == nrc->end_it) 
1069     goto END_SET;
1070   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1071   nrc->now = GNUNET_TIME_absolute_get ();
1072   hashSize = sizeof (GNUNET_HashCode);
1073   memset (nrc->rbind, 0, sizeof (nrc->rbind));
1074   rbind = nrc->rbind;
1075   rbind[0].buffer_type = MYSQL_TYPE_LONG;
1076   rbind[0].buffer = &type;
1077   rbind[0].is_unsigned = 1;
1078   rbind[1].buffer_type = MYSQL_TYPE_LONG;
1079   rbind[1].buffer = &priority;
1080   rbind[1].is_unsigned = 1;
1081   rbind[2].buffer_type = MYSQL_TYPE_LONG;
1082   rbind[2].buffer = &anonymity;
1083   rbind[2].is_unsigned = 1;
1084   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1085   rbind[3].buffer = &exp;
1086   rbind[3].is_unsigned = 1;
1087   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1088   rbind[4].buffer = &key;
1089   rbind[4].buffer_length = hashSize;
1090   rbind[4].length = &hashSize;
1091   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1092   rbind[5].buffer = value;
1093   rbind[5].buffer_length = size = sizeof (value);
1094   rbind[5].length = &size;
1095   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1096   rbind[6].buffer = &uid;
1097   rbind[6].is_unsigned = 1;
1098
1099   if (GNUNET_OK != nrc->prep (nrc->prep_cls,
1100                               nrc))
1101     goto END_SET;
1102   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1103   GNUNET_assert (size <= sizeof(value));
1104   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1105        (hashSize != sizeof (GNUNET_HashCode)) )
1106     {
1107       GNUNET_break (0);
1108       goto END_SET;
1109     }     
1110 #if DEBUG_MYSQL
1111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1113               (unsigned int) size,
1114               GNUNET_h2s (&key),
1115               priority,
1116               anonymity,
1117               exp);
1118 #endif
1119   expiration.abs_value = exp;
1120   ret = nrc->dviter (nrc->dviter_cls, 
1121                      (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
1122                      &key,
1123                      size, value,
1124                      type, priority, anonymity, expiration,
1125                      uid);
1126   if (ret == GNUNET_SYSERR)
1127     {
1128       nrc->end_it = GNUNET_YES;
1129       return;
1130     }
1131   if (ret == GNUNET_NO)
1132     {
1133       do_delete_entry (plugin, uid);
1134       if (size != 0)
1135         plugin->env->duc (plugin->env->cls,
1136                           - size);
1137     }
1138   if (nrc->one_shot == GNUNET_YES)
1139     GNUNET_free (nrc);
1140   return;
1141  END_SET:
1142   /* call dviter with "end of set" */
1143   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1144   nrc->dviter (nrc->dviter_cls, 
1145                NULL, NULL, 0, NULL, 0, 0, 0, 
1146                GNUNET_TIME_UNIT_ZERO_ABS, 0);
1147   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1148   nrc->prep (nrc->prep_cls, NULL);
1149   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1150   GNUNET_free (nrc);
1151 }
1152
1153
1154 /**
1155  * Function invoked on behalf of a "PluginIterator"
1156  * asking the database plugin to call the iterator
1157  * with the next item.
1158  *
1159  * @param next_cls whatever argument was given
1160  *        to the PluginIterator as "next_cls".
1161  * @param end_it set to GNUNET_YES if we
1162  *        should terminate the iteration early
1163  *        (iterator should be still called once more
1164  *         to signal the end of the iteration).
1165  */
1166 static void 
1167 mysql_plugin_next_request (void *next_cls,
1168                            int end_it)
1169 {
1170   struct NextRequestClosure *nrc = next_cls;
1171
1172   if (GNUNET_YES == end_it)
1173     nrc->end_it = GNUNET_YES;
1174   nrc->plugin->next_task_nc = nrc;
1175   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1176                                                      nrc);
1177 }  
1178
1179
1180 /**
1181  * Context for 'get_statement_prepare'.
1182  */
1183 struct GetContext
1184 {
1185   GNUNET_HashCode key;
1186   GNUNET_HashCode vhash;
1187
1188   unsigned int prio;
1189   unsigned int anonymity;
1190   unsigned long long expiration;
1191   unsigned long long vkey;
1192   unsigned long long total;
1193   unsigned int off;
1194   unsigned int count;
1195   int have_vhash;
1196 };
1197
1198
1199 static int
1200 get_statement_prepare (void *cls,
1201                        struct NextRequestClosure *nrc)
1202 {
1203   struct GetContext *gc = cls;
1204   struct Plugin *plugin;
1205   int ret;
1206   unsigned long hashSize;
1207   
1208   if (NULL == nrc)
1209     {
1210       GNUNET_free (gc);
1211       return GNUNET_NO;
1212     }
1213   if (gc->count == gc->total)
1214     return GNUNET_NO;
1215   plugin = nrc->plugin;
1216   hashSize = sizeof (GNUNET_HashCode);
1217   if (++gc->off >= gc->total)
1218     gc->off = 0;
1219 #if DEBUG_MYSQL
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221               "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
1222               gc->count+1,
1223               gc->total,
1224               gc->off,
1225               GNUNET_h2s (&gc->key));  
1226 #endif
1227   if (nrc->type != 0)
1228     {
1229       if (gc->have_vhash)
1230         {
1231           ret = prepared_statement_run_select (plugin,
1232                                                plugin->select_entry_by_hash_vhash_and_type, 
1233                                                7, nrc->rbind, 
1234                                                &return_ok, NULL, 
1235                                                MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1236                                                MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1237                                                MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1238                                                MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1239                                                -1);
1240         }
1241       else
1242         {
1243           ret =
1244             prepared_statement_run_select (plugin,
1245                                            plugin->select_entry_by_hash_and_type, 
1246                                            7, nrc->rbind, 
1247                                            &return_ok, NULL,
1248                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1249                                            MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1250                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1251                                            -1);
1252         }
1253     }
1254   else
1255     {
1256       if (gc->have_vhash)
1257         {
1258           ret =
1259             prepared_statement_run_select (plugin,
1260                                            plugin->select_entry_by_hash_and_vhash, 
1261                                            7, nrc->rbind, 
1262                                            &return_ok, NULL,
1263                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
1264                                            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
1265                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1266                                            -1);
1267         }
1268       else
1269         {
1270           ret =
1271             prepared_statement_run_select (plugin,
1272                                            plugin->select_entry_by_hash, 
1273                                            7, nrc->rbind, 
1274                                            &return_ok, NULL,
1275                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1276                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1277                                            -1);
1278         }
1279     }
1280   gc->count++;
1281   return ret;
1282 }
1283
1284
1285 /**
1286  * Iterate over the results for a particular key in the datastore.
1287  *
1288  * @param cls closure
1289  * @param key maybe NULL (to match all entries)
1290  * @param vhash hash of the value, maybe NULL (to
1291  *        match all values that have the right key).
1292  *        Note that for DBlocks there is no difference
1293  *        betwen key and vhash, but for other blocks
1294  *        there may be!
1295  * @param type entries of which type are relevant?
1296  *        Use 0 for any type.
1297  * @param iter function to call on each matching value;
1298  *        will be called once with a NULL value at the end
1299  * @param iter_cls closure for iter
1300  */
1301 static void
1302 mysql_plugin_get (void *cls,
1303                   const GNUNET_HashCode *key,
1304                   const GNUNET_HashCode *vhash,
1305                   enum GNUNET_BLOCK_Type type,
1306                   PluginIterator iter, void *iter_cls)
1307 {
1308   struct Plugin *plugin = cls;
1309   unsigned int itype = type;
1310   int ret;
1311   MYSQL_BIND cbind[1];
1312   struct GetContext *gc;
1313   struct NextRequestClosure *nrc;
1314   long long total;
1315   unsigned long hashSize;
1316
1317   GNUNET_assert (key != NULL);
1318   if (iter == NULL) 
1319     return;
1320   hashSize = sizeof (GNUNET_HashCode);
1321   memset (cbind, 0, sizeof (cbind));
1322   total = -1;
1323   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1324   cbind[0].buffer = &total;
1325   cbind[0].is_unsigned = GNUNET_NO;
1326   if (type != 0)
1327     {
1328       if (vhash != NULL)
1329         {
1330           ret =
1331             prepared_statement_run_select (plugin,
1332                                            plugin->count_entry_by_hash_vhash_and_type, 
1333                                            1, cbind, 
1334                                            &return_ok, NULL,
1335                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1336                                            MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1337                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1338                                            -1);
1339         }
1340       else
1341         {
1342           ret =
1343             prepared_statement_run_select (plugin,
1344                                            plugin->count_entry_by_hash_and_type, 
1345                                            1, cbind, 
1346                                            &return_ok, NULL,
1347                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1348                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1349                                            -1);
1350         }
1351     }
1352   else
1353     {
1354       if (vhash != NULL)
1355         {
1356           ret =
1357             prepared_statement_run_select (plugin,
1358                                            plugin->count_entry_by_hash_and_vhash, 
1359                                            1, cbind,
1360                                            &return_ok, NULL,
1361                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1362                                            MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1363                                            -1);
1364
1365         }
1366       else
1367         {
1368           ret =
1369             prepared_statement_run_select (plugin,
1370                                            plugin->count_entry_by_hash,
1371                                            1, cbind, 
1372                                            &return_ok, NULL, 
1373                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1374                                            -1);
1375         }
1376     }
1377   if ((ret != GNUNET_OK) || (0 >= total))
1378     {
1379       iter (iter_cls, 
1380             NULL, NULL, 0, NULL, 0, 0, 0, 
1381             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1382       return;
1383     }
1384 #if DEBUG_MYSQL
1385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386               "Iterating over %lld results for GET `%s'\n",
1387               total,
1388               GNUNET_h2s (key));
1389 #endif
1390   gc = GNUNET_malloc (sizeof (struct GetContext));
1391   gc->key = *key;
1392   if (vhash != NULL)
1393     {
1394       gc->have_vhash = GNUNET_YES;
1395       gc->vhash = *vhash;
1396     }
1397   gc->total = total;
1398   gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1399   
1400
1401   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1402   nrc->plugin = plugin;
1403   nrc->type = type;  
1404   nrc->dviter = iter;
1405   nrc->dviter_cls = iter_cls;
1406   nrc->prep = &get_statement_prepare;
1407   nrc->prep_cls = gc;
1408   mysql_plugin_next_request (nrc, GNUNET_NO);
1409 }
1410
1411
1412 /**
1413  * Run the prepared statement to get the next data item ready.
1414  * 
1415  * @param cls not used
1416  * @param nrc closure for the next request iterator
1417  * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1418  */
1419 static int
1420 iterator_zero_prepare (void *cls,
1421                        struct NextRequestClosure *nrc)
1422 {
1423   struct Plugin *plugin;
1424   int ret;
1425
1426   if (nrc == NULL)
1427     return GNUNET_NO;
1428   plugin = nrc->plugin;
1429   ret = prepared_statement_run_select (plugin,
1430                                        plugin->zero_iter,
1431                                        7, nrc->rbind,
1432                                        &return_ok, NULL,
1433                                        MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
1434                                        -1);
1435   nrc->count++;
1436   return ret;
1437 }
1438
1439
1440 /**
1441  * Select a subset of the items in the datastore and call
1442  * the given iterator for each of them.
1443  *
1444  * @param cls our "struct Plugin*"
1445  * @param type entries of which type should be considered?
1446  *        Use 0 for any type.
1447  * @param iter function to call on each matching value;
1448  *        will be called once with a NULL value at the end
1449  * @param iter_cls closure for iter
1450  */
1451 static void
1452 mysql_plugin_iter_zero_anonymity (void *cls,
1453                                   enum GNUNET_BLOCK_Type type,
1454                                   PluginIterator iter,
1455                                   void *iter_cls)
1456 {
1457   struct Plugin *plugin = cls;
1458   struct NextRequestClosure *nrc;
1459
1460   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1461   nrc->plugin = plugin;
1462   nrc->type = type;  
1463   nrc->dviter = iter;
1464   nrc->dviter_cls = iter_cls;
1465   nrc->prep = &iterator_zero_prepare;
1466   mysql_plugin_next_request (nrc, GNUNET_NO);
1467 }
1468
1469
1470 /**
1471  * Run the SELECT statement for the replication function.
1472  * 
1473  * @param cls the 'struct Plugin'
1474  * @param nrc the context (not used)
1475  */
1476 static int
1477 replication_prepare (void *cls,
1478                      struct NextRequestClosure *nrc)
1479 {
1480   struct Plugin *plugin = cls;
1481
1482   return prepared_statement_run_select (plugin,
1483                                         plugin->select_replication, 
1484                                         7, nrc->rbind, 
1485                                         &return_ok, NULL,
1486                                         -1);
1487 }
1488
1489
1490
1491 /**
1492  * Context for 'repl_iter' function.
1493  */
1494 struct ReplCtx
1495 {
1496   
1497   /**
1498    * Plugin handle.
1499    */
1500   struct Plugin *plugin;
1501   
1502   /**
1503    * Function to call for the result (or the NULL).
1504    */
1505   PluginIterator iter;
1506   
1507   /**
1508    * Closure for iter.
1509    */
1510   void *iter_cls;
1511 };
1512
1513
1514 /**
1515  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1516  * Decrements the replication counter and calls the original
1517  * iterator.
1518  *
1519  * @param cls closure
1520  * @param next_cls closure to pass to the "next" function.
1521  * @param key key for the content
1522  * @param size number of bytes in data
1523  * @param data content stored
1524  * @param type type of the content
1525  * @param priority priority of the content
1526  * @param anonymity anonymity-level for the content
1527  * @param expiration expiration time for the content
1528  * @param uid unique identifier for the datum;
1529  *        maybe 0 if no unique identifier is available
1530  *
1531  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1532  *         (continue on call to "next", of course),
1533  *         GNUNET_NO to delete the item and continue (if supported)
1534  */
1535 static int
1536 repl_iter (void *cls,
1537            void *next_cls,
1538            const GNUNET_HashCode *key,
1539            uint32_t size,
1540            const void *data,
1541            enum GNUNET_BLOCK_Type type,
1542            uint32_t priority,
1543            uint32_t anonymity,
1544            struct GNUNET_TIME_Absolute expiration, 
1545            uint64_t uid)
1546 {
1547   struct ReplCtx *rc = cls;
1548   struct Plugin *plugin = rc->plugin;
1549   unsigned long long oid;
1550   int ret;
1551
1552   ret = rc->iter (rc->iter_cls,
1553                   next_cls, key,
1554                   size, data, 
1555                   type, priority, anonymity, expiration,
1556                   uid);
1557   if (NULL != key)
1558     {
1559       oid = (unsigned long long) uid;
1560       ret = prepared_statement_run (plugin,
1561                                     plugin->dec_repl,
1562                                     NULL,
1563                                     MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
1564                                     -1);
1565       if (ret == GNUNET_SYSERR)
1566         {
1567           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1568                       "Failed to reduce replication counter\n");
1569           return GNUNET_SYSERR;
1570         }
1571     }
1572   return ret;
1573 }
1574
1575
1576 /**
1577  * Get a random item for replication.  Returns a single, not expired, random item
1578  * from those with the highest replication counters.  The item's 
1579  * replication counter is decremented by one IF it was positive before.
1580  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1581  *
1582  * @param cls closure
1583  * @param iter function to call the value (once only).
1584  * @param iter_cls closure for iter
1585  */
1586 static void
1587 mysql_plugin_replication_get (void *cls,
1588                               PluginIterator iter, void *iter_cls)
1589 {
1590   struct Plugin *plugin = cls;
1591   struct NextRequestClosure *nrc;
1592   struct ReplCtx rc;
1593   
1594   rc.plugin = plugin;
1595   rc.iter = iter;
1596   rc.iter_cls = iter_cls;
1597   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1598   nrc->plugin = plugin;
1599   nrc->now = GNUNET_TIME_absolute_get ();
1600   nrc->prep = &replication_prepare;
1601   nrc->prep_cls = plugin;
1602   nrc->type = 0;
1603   nrc->dviter = &repl_iter;
1604   nrc->dviter_cls = &rc;
1605   nrc->end_it = GNUNET_NO;
1606   nrc->one_shot = GNUNET_YES;
1607   mysql_next_request_cont (nrc, NULL);
1608 }
1609
1610
1611 /**
1612  * Run the SELECT statement for the expiration function.
1613  * 
1614  * @param cls the 'struct Plugin'
1615  * @param nrc the context (not used)
1616  * @return GNUNET_OK on success, GNUNET_NO if there are
1617  *         no more values, GNUNET_SYSERR on error
1618  */
1619 static int
1620 expiration_prepare (void *cls,
1621                     struct NextRequestClosure *nrc)
1622 {
1623   struct Plugin *plugin = cls;
1624   long long nt;
1625
1626   if (NULL == nrc)
1627     return GNUNET_NO;
1628   nt = (long long) nrc->now.abs_value;
1629   return prepared_statement_run_select
1630     (plugin,
1631      plugin->select_expiration, 
1632      7, nrc->rbind, 
1633      &return_ok, NULL,
1634      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1635      -1);
1636 }
1637
1638
1639 /**
1640  * Get a random item for expiration.
1641  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1642  *
1643  * @param cls closure
1644  * @param iter function to call the value (once only).
1645  * @param iter_cls closure for iter
1646  */
1647 static void
1648 mysql_plugin_expiration_get (void *cls,
1649                              PluginIterator iter, void *iter_cls)
1650 {
1651   struct Plugin *plugin = cls;
1652   struct NextRequestClosure *nrc;
1653
1654   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1655   nrc->plugin = plugin;
1656   nrc->now = GNUNET_TIME_absolute_get ();
1657   nrc->prep = &expiration_prepare;
1658   nrc->prep_cls = plugin;
1659   nrc->type = 0;
1660   nrc->dviter = iter;
1661   nrc->dviter_cls = iter_cls;
1662   nrc->end_it = GNUNET_NO;
1663   nrc->one_shot = GNUNET_YES;
1664   mysql_next_request_cont (nrc, NULL);
1665 }
1666
1667
1668 /**
1669  * Drop database.
1670  *
1671  * @param cls the "struct Plugin*"
1672  */
1673 static void 
1674 mysql_plugin_drop (void *cls)
1675 {
1676   struct Plugin *plugin = cls;
1677
1678   if (GNUNET_OK != run_statement (plugin,
1679                                   "DROP TABLE gn090"))
1680     return;           /* error */
1681   plugin->env->duc (plugin->env->cls, 0);
1682 }
1683
1684
1685 /**
1686  * Entry point for the plugin.
1687  *
1688  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1689  * @return our "struct Plugin*"
1690  */
1691 void *
1692 libgnunet_plugin_datastore_mysql_init (void *cls)
1693 {
1694   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1695   struct GNUNET_DATASTORE_PluginFunctions *api;
1696   struct Plugin *plugin;
1697
1698   plugin = GNUNET_malloc (sizeof (struct Plugin));
1699   plugin->env = env;
1700   plugin->cnffile = get_my_cnf_path (env->cfg);
1701   if (GNUNET_OK != iopen (plugin))
1702     {
1703       iclose (plugin);
1704       GNUNET_free_non_null (plugin->cnffile);
1705       GNUNET_free (plugin);
1706       return NULL;
1707     }
1708 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1709 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1710   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1711              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1712              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1713              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1714              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1715              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1716              " hash BINARY(64) NOT NULL DEFAULT '',"
1717              " vhash BINARY(64) NOT NULL DEFAULT '',"
1718              " value BLOB NOT NULL DEFAULT '',"
1719              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1720              " PRIMARY KEY (uid),"
1721              " INDEX idx_hash (hash(64)),"
1722              " INDEX idx_hash_uid (hash(64),uid),"
1723              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1724              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1725              " INDEX idx_prio (prio),"
1726              " INDEX idx_repl (repl),"
1727              " INDEX idx_expire_prio (expire,prio),"
1728              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1729              ") ENGINE=InnoDB") ||
1730       MRUNS ("SET AUTOCOMMIT = 1") ||
1731       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1732       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1733       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1734       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1735       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1736       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1737                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1738       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1739       || PINIT (plugin->get_size, SELECT_SIZE)
1740       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1741       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1742       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1743                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1744       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1745       || PINIT (plugin->dec_repl, DEC_REPL)
1746       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1747       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1748       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1749     {
1750       iclose (plugin);
1751       GNUNET_free_non_null (plugin->cnffile);
1752       GNUNET_free (plugin);
1753       return NULL;
1754     }
1755 #undef PINIT
1756 #undef MRUNS
1757
1758   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1759   api->cls = plugin;
1760   api->get_size = &mysql_plugin_get_size;
1761   api->put = &mysql_plugin_put;
1762   api->next_request = &mysql_plugin_next_request;
1763   api->get = &mysql_plugin_get;
1764   api->replication_get = &mysql_plugin_replication_get;
1765   api->expiration_get = &mysql_plugin_expiration_get;
1766   api->update = &mysql_plugin_update;
1767   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1768   api->drop = &mysql_plugin_drop;
1769   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1770                    "mysql", _("Mysql database running\n"));
1771   return api;
1772 }
1773
1774
1775 /**
1776  * Exit point from the plugin.
1777  * @param cls our "struct Plugin*"
1778  * @return always NULL
1779  */
1780 void *
1781 libgnunet_plugin_datastore_mysql_done (void *cls)
1782 {
1783   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1784   struct Plugin *plugin = api->cls;
1785
1786   iclose (plugin);
1787   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1788     {
1789       GNUNET_SCHEDULER_cancel (plugin->next_task);
1790       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1791       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1792       GNUNET_free (plugin->next_task_nc);
1793       plugin->next_task_nc = NULL;
1794     }
1795   GNUNET_free_non_null (plugin->cnffile);
1796   GNUNET_free (plugin);
1797   GNUNET_free (api);
1798   mysql_library_end ();
1799   return NULL;
1800 }
1801
1802 /* end of plugin_datastore_mysql.c */