fix
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163 /**
164  * Context for the universal iterator.
165  */
166 struct NextRequestClosure;
167
168 /**
169  * Type of a function that will prepare
170  * the next iteration.
171  *
172  * @param cls closure
173  * @param nc the next context; NULL for the last
174  *         call which gives the callback a chance to
175  *         clean up the closure
176  * @return GNUNET_OK on success, GNUNET_NO if there are
177  *         no more values, GNUNET_SYSERR on error
178  */
179 typedef int (*PrepareFunction)(void *cls,
180                                struct NextRequestClosure *nc);
181
182
183 struct NextRequestClosure
184 {
185   struct Plugin *plugin;
186
187   struct GNUNET_TIME_Absolute now;
188
189   /**
190    * Function to call to prepare the next
191    * iteration.
192    */
193   PrepareFunction prep;
194
195   /**
196    * Closure for prep.
197    */
198   void *prep_cls;
199
200   MYSQL_BIND rbind[7];
201
202   enum GNUNET_BLOCK_Type type;
203
204   PluginIterator dviter;
205
206   void *dviter_cls;
207
208   unsigned int count;
209
210   int end_it;
211 };
212
213
214 /**
215  * Context for all functions in this plugin.
216  */
217 struct Plugin 
218 {
219   /**
220    * Our execution environment.
221    */
222   struct GNUNET_DATASTORE_PluginEnvironment *env;
223
224   /**
225    * Handle to talk to MySQL.
226    */
227   MYSQL *dbf;
228   
229   /**
230    * We keep all prepared statements in a DLL.  This is the head.
231    */
232   struct GNUNET_MysqlStatementHandle *shead;
233
234   /**
235    * We keep all prepared statements in a DLL.  This is the tail.
236    */
237   struct GNUNET_MysqlStatementHandle *stail;
238
239   /**
240    * Filename of "my.cnf" (msyql configuration).
241    */
242   char *cnffile;
243
244   /**
245    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
246    */
247   struct NextRequestClosure *next_task_nc;
248
249   /**
250    * Pending task with scheduler for running the next request.
251    */
252   GNUNET_SCHEDULER_TaskIdentifier next_task;
253
254   /**
255    * Prepared statements.
256    */
257 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
258   struct GNUNET_MysqlStatementHandle *insert_entry;
259   
260 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
261   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
262
263 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
264   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
265   
266 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_uid) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
267   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
268
269 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
270   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
271
272 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
273   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
274  
275 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
276   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
277
278 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
279   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
280  
281 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
282   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
283   
284 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
285   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
286
287 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1"
288   struct GNUNET_MysqlStatementHandle *update_entry;
289
290 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
291   struct GNUNET_MysqlStatementHandle *get_size;
292
293 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_anonLevel_uid) WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
294   struct GNUNET_MysqlStatementHandle *zero_iter;
295
296 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_expire_prio) WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
297   "UNION "\
298   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_prio) ORDER BY prio ASC LIMIT 1) "\
299   "ORDER BY expire ASC LIMIT 1"
300   struct GNUNET_MysqlStatementHandle *select_expiration;
301
302 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_repl) ORDER BY repl DESC,RAND() LIMIT 1"
303   struct GNUNET_MysqlStatementHandle *select_replication;
304
305 };
306
307
308 /**
309  * Obtain the location of ".my.cnf".
310  *
311  * @param cfg our configuration
312  * @return NULL on error
313  */
314 static char *
315 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
316 {
317   char *cnffile;
318   char *home_dir;
319   struct stat st;
320 #ifndef WINDOWS
321   struct passwd *pw;
322 #endif
323   int configured;
324
325 #ifndef WINDOWS
326   pw = getpwuid (getuid ());
327   if (!pw)
328     {
329       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
330                            "getpwuid");
331       return NULL;
332     }
333   if (GNUNET_YES ==
334       GNUNET_CONFIGURATION_have_value (cfg,
335                                        "datastore-mysql", "CONFIG"))
336     {
337       GNUNET_assert (GNUNET_OK == 
338                      GNUNET_CONFIGURATION_get_value_filename (cfg,
339                                                               "datastore-mysql", "CONFIG", &cnffile));
340       configured = GNUNET_YES;
341     }
342   else
343     {
344       home_dir = GNUNET_strdup (pw->pw_dir);
345 #else
346       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
347       plibc_conv_to_win_path ("~/", home_dir);
348 #endif
349       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
350       GNUNET_free (home_dir);
351       configured = GNUNET_NO;
352     }
353   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
354               _("Trying to use file `%s' for MySQL configuration.\n"),
355               cnffile);
356   if ((0 != STAT (cnffile, &st)) ||
357       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
358     {
359       if (configured == GNUNET_YES)
360         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
361                     _("Could not access file `%s': %s\n"), cnffile,
362                     STRERROR (errno));
363       GNUNET_free (cnffile);
364       return NULL;
365     }
366   return cnffile;
367 }
368
369
370
371 /**
372  * Free a prepared statement.
373  *
374  * @param plugin plugin context
375  * @param s prepared statement
376  */
377 static void
378 prepared_statement_destroy (struct Plugin *plugin, 
379                             struct GNUNET_MysqlStatementHandle
380                             *s)
381 {
382   GNUNET_CONTAINER_DLL_remove (plugin->shead,
383                                plugin->stail,
384                                s);
385   if (s->valid)
386     mysql_stmt_close (s->statement);
387   GNUNET_free (s->query);
388   GNUNET_free (s);
389 }
390
391
392 /**
393  * Close database connection and all prepared statements (we got a DB
394  * disconnect error).
395  */
396 static int
397 iclose (struct Plugin *plugin)
398 {
399   struct GNUNET_MysqlStatementHandle *spos;
400
401   spos = plugin->shead;
402   while (NULL != plugin->shead)
403     prepared_statement_destroy (plugin,
404                                 plugin->shead);
405   if (plugin->dbf != NULL)
406     {
407       mysql_close (plugin->dbf);
408       plugin->dbf = NULL;
409     }
410   return GNUNET_OK;
411 }
412
413
414 /**
415  * Open the connection with the database (and initialize
416  * our default options).
417  *
418  * @return GNUNET_OK on success
419  */
420 static int
421 iopen (struct Plugin *ret)
422 {
423   char *mysql_dbname;
424   char *mysql_server;
425   char *mysql_user;
426   char *mysql_password;
427   unsigned long long mysql_port;
428   my_bool reconnect;
429   unsigned int timeout;
430
431   ret->dbf = mysql_init (NULL);
432   if (ret->dbf == NULL)
433     return GNUNET_SYSERR;
434   if (ret->cnffile != NULL)
435     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
436   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
437   reconnect = 0;
438   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
439   mysql_options (ret->dbf,
440                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
441   mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
442   timeout = 60; /* in seconds */
443   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
444   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
445   mysql_dbname = NULL;
446   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
447                                                      "datastore-mysql", "DATABASE"))
448     GNUNET_assert (GNUNET_OK == 
449                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
450                                                           "datastore-mysql", "DATABASE", 
451                                                           &mysql_dbname));
452   else
453     mysql_dbname = GNUNET_strdup ("gnunet");
454   mysql_user = NULL;
455   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
456                                                      "datastore-mysql", "USER"))
457     {
458       GNUNET_assert (GNUNET_OK == 
459                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
460                                                            "datastore-mysql", "USER", 
461                                                            &mysql_user));
462     }
463   mysql_password = NULL;
464   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
465                                                      "datastore-mysql", "PASSWORD"))
466     {
467       GNUNET_assert (GNUNET_OK ==
468                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
469                                                            "datastore-mysql", "PASSWORD",
470                                                            &mysql_password));
471     }
472   mysql_server = NULL;
473   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
474                                                      "datastore-mysql", "HOST"))
475     {
476       GNUNET_assert (GNUNET_OK == 
477                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
478                                                            "datastore-mysql", "HOST", 
479                                                            &mysql_server));
480     }
481   mysql_port = 0;
482   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
483                                                      "datastore-mysql", "PORT"))
484     {
485       GNUNET_assert (GNUNET_OK ==
486                     GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
487                                                            "PORT", &mysql_port));
488     }
489
490   GNUNET_assert (mysql_dbname != NULL);
491   mysql_real_connect (ret->dbf, 
492                       mysql_server, 
493                       mysql_user, mysql_password,
494                       mysql_dbname, 
495                       (unsigned int) mysql_port, NULL,
496                       CLIENT_IGNORE_SIGPIPE);
497   GNUNET_free_non_null (mysql_server);
498   GNUNET_free_non_null (mysql_user);
499   GNUNET_free_non_null (mysql_password);
500   GNUNET_free (mysql_dbname);
501   if (mysql_error (ret->dbf)[0])
502     {
503       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
504                  "mysql_real_connect", ret);
505       return GNUNET_SYSERR;
506     }
507   return GNUNET_OK;
508 }
509
510
511 /**
512  * Run the given MySQL statement.
513  *
514  * @param plugin plugin context
515  * @param statement SQL statement to run
516  * @return GNUNET_OK on success, GNUNET_SYSERR on error
517  */
518 static int
519 run_statement (struct Plugin *plugin,
520                const char *statement)
521 {
522   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
523     return GNUNET_SYSERR;
524   mysql_query (plugin->dbf, statement);
525   if (mysql_error (plugin->dbf)[0])
526     {
527       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
528                  "mysql_query", plugin);
529       iclose (plugin);
530       return GNUNET_SYSERR;
531     }
532   return GNUNET_OK;
533 }
534
535
536 /**
537  * Create a prepared statement.
538  *
539  * @param plugin plugin context
540  * @param statement SQL statement text to prepare
541  * @return NULL on error
542  */
543 static struct GNUNET_MysqlStatementHandle *
544 prepared_statement_create (struct Plugin *plugin, 
545                            const char *statement)
546 {
547   struct GNUNET_MysqlStatementHandle *ret;
548
549   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
550   ret->query = GNUNET_strdup (statement);
551   GNUNET_CONTAINER_DLL_insert (plugin->shead,
552                                plugin->stail,
553                                ret);
554   return ret;
555 }
556
557
558 /**
559  * Prepare a statement for running.
560  *
561  * @param plugin plugin context
562  * @param ret handle to prepared statement
563  * @return GNUNET_OK on success
564  */
565 static int
566 prepare_statement (struct Plugin *plugin, 
567                    struct GNUNET_MysqlStatementHandle *ret)
568 {
569   if (GNUNET_YES == ret->valid)
570     return GNUNET_OK;
571   if ((NULL == plugin->dbf) && 
572       (GNUNET_OK != iopen (plugin)))
573     return GNUNET_SYSERR;
574   ret->statement = mysql_stmt_init (plugin->dbf);
575   if (ret->statement == NULL)
576     {
577       iclose (plugin);
578       return GNUNET_SYSERR;
579     }
580   if (mysql_stmt_prepare (ret->statement, 
581                           ret->query,
582                           strlen (ret->query)))
583     {
584       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
585                  "mysql_stmt_prepare", 
586                  plugin);
587       mysql_stmt_close (ret->statement);
588       ret->statement = NULL;
589       iclose (plugin);
590       return GNUNET_SYSERR;
591     }
592   ret->valid = GNUNET_YES;
593   return GNUNET_OK;
594
595 }
596
597
598 /**
599  * Bind the parameters for the given MySQL statement
600  * and run it.
601  *
602  * @param plugin plugin context
603  * @param s statement to bind and run
604  * @param ap arguments for the binding
605  * @return GNUNET_SYSERR on error, GNUNET_OK on success
606  */
607 static int
608 init_params (struct Plugin *plugin,
609              struct GNUNET_MysqlStatementHandle *s,
610              va_list ap)
611 {
612   MYSQL_BIND qbind[MAX_PARAM];
613   unsigned int pc;
614   unsigned int off;
615   enum enum_field_types ft;
616
617   pc = mysql_stmt_param_count (s->statement);
618   if (pc > MAX_PARAM)
619     {
620       /* increase internal constant! */
621       GNUNET_break (0);
622       return GNUNET_SYSERR;
623     }
624   memset (qbind, 0, sizeof (qbind));
625   off = 0;
626   ft = 0;
627   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
628     {
629       qbind[off].buffer_type = ft;
630       switch (ft)
631         {
632         case MYSQL_TYPE_FLOAT:
633           qbind[off].buffer = va_arg (ap, float *);
634           break;
635         case MYSQL_TYPE_LONGLONG:
636           qbind[off].buffer = va_arg (ap, unsigned long long *);
637           qbind[off].is_unsigned = va_arg (ap, int);
638           break;
639         case MYSQL_TYPE_LONG:
640           qbind[off].buffer = va_arg (ap, unsigned int *);
641           qbind[off].is_unsigned = va_arg (ap, int);
642           break;
643         case MYSQL_TYPE_VAR_STRING:
644         case MYSQL_TYPE_STRING:
645         case MYSQL_TYPE_BLOB:
646           qbind[off].buffer = va_arg (ap, void *);
647           qbind[off].buffer_length = va_arg (ap, unsigned long);
648           qbind[off].length = va_arg (ap, unsigned long *);
649           break;
650         default:
651           /* unsupported type */
652           GNUNET_break (0);
653           return GNUNET_SYSERR;
654         }
655       pc--;
656       off++;
657     }
658   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
659     {
660       GNUNET_assert (0);
661       return GNUNET_SYSERR;
662     }
663   if (mysql_stmt_bind_param (s->statement, qbind))
664     {
665       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
666                   _("`%s' failed at %s:%d with error: %s\n"),
667                   "mysql_stmt_bind_param",
668                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
669       iclose (plugin);
670       return GNUNET_SYSERR;
671     }
672   if (mysql_stmt_execute (s->statement))
673     {
674       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
675                   _("`%s' failed at %s:%d with error: %s\n"),
676                   "mysql_stmt_execute",
677                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
678       iclose (plugin);
679       return GNUNET_SYSERR;
680     }
681   return GNUNET_OK;
682 }
683
684 /**
685  * Type of a callback that will be called for each
686  * data set returned from MySQL.
687  *
688  * @param cls user-defined argument
689  * @param num_values number of elements in values
690  * @param values values returned by MySQL
691  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
692  */
693 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
694                                           unsigned int num_values,
695                                           MYSQL_BIND *values);
696
697
698 /**
699  * Run a prepared SELECT statement.
700  *
701  * @param plugin plugin context
702  * @param s statement to run
703  * @param result_size number of elements in results array
704  * @param results pointer to already initialized MYSQL_BIND
705  *        array (of sufficient size) for passing results
706  * @param processor function to call on each result
707  * @param processor_cls extra argument to processor
708  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
709  *        values (size + buffer-reference for pointers); terminated
710  *        with "-1"
711  * @return GNUNET_SYSERR on error, otherwise
712  *         the number of successfully affected (or queried) rows
713  */
714 static int
715 prepared_statement_run_select (struct Plugin *plugin,
716                                struct GNUNET_MysqlStatementHandle *s,
717                                unsigned int result_size,
718                                MYSQL_BIND *results,
719                                GNUNET_MysqlDataProcessor processor, void *processor_cls,
720                                ...)
721 {
722   va_list ap;
723   int ret;
724   unsigned int rsize;
725   int total;
726
727   if (GNUNET_OK != prepare_statement (plugin, s))
728     {
729       GNUNET_break (0);
730       return GNUNET_SYSERR;
731     }
732   va_start (ap, processor_cls);
733   if (GNUNET_OK != init_params (plugin, s, ap))
734     {
735       GNUNET_break (0);
736       va_end (ap);
737       return GNUNET_SYSERR;
738     }
739   va_end (ap);
740   rsize = mysql_stmt_field_count (s->statement);
741   if (rsize > result_size)
742     {
743       GNUNET_break (0);
744       return GNUNET_SYSERR;
745     }
746   if (mysql_stmt_bind_result (s->statement, results))
747     {
748       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
749                   _("`%s' failed at %s:%d with error: %s\n"),
750                   "mysql_stmt_bind_result",
751                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
752       iclose (plugin);
753       return GNUNET_SYSERR;
754     }
755
756   total = 0;
757   while (1)
758     {
759       ret = mysql_stmt_fetch (s->statement);
760       if (ret == MYSQL_NO_DATA)
761         break;
762       if (ret != 0)
763         {
764           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
765                       _("`%s' failed at %s:%d with error: %s\n"),
766                       "mysql_stmt_fetch",
767                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
768           iclose (plugin);
769           return GNUNET_SYSERR;
770         }
771       if (processor != NULL)
772         if (GNUNET_OK != processor (processor_cls, rsize, results))
773           break;
774       total++;
775     }
776   mysql_stmt_reset (s->statement);
777   return total;
778 }
779
780
781 /**
782  * Run a prepared statement that does NOT produce results.
783  *
784  * @param plugin plugin context
785  * @param s statement to run
786  * @param insert_id NULL or address where to store the row ID of whatever
787  *        was inserted (only for INSERT statements!)
788  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
789  *        values (size + buffer-reference for pointers); terminated
790  *        with "-1"
791  * @return GNUNET_SYSERR on error, otherwise
792  *         the number of successfully affected rows
793  */
794 static int
795 prepared_statement_run (struct Plugin *plugin,
796                         struct GNUNET_MysqlStatementHandle *s,
797                         unsigned long long *insert_id, ...)
798 {
799   va_list ap;
800   int affected;
801
802   if (GNUNET_OK != prepare_statement (plugin, s))
803     return GNUNET_SYSERR;
804   va_start (ap, insert_id);
805   if (GNUNET_OK != init_params (plugin, s, ap))
806     {
807       va_end (ap);
808       return GNUNET_SYSERR;
809     }
810   va_end (ap);
811   affected = mysql_stmt_affected_rows (s->statement);
812   if (NULL != insert_id)
813     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
814   mysql_stmt_reset (s->statement);
815   return affected;
816 }
817
818
819 /**
820  * Delete an entry from the gn090 table.
821  *
822  * @param plugin plugin context
823  * @param uid unique ID of the entry to delete
824  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
825  */
826 static int
827 do_delete_entry (struct Plugin *plugin,
828                  unsigned long long uid)
829 {
830   int ret;
831  
832 #if DEBUG_MYSQL
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Deleting value %llu from gn090 table\n",
835               uid);
836 #endif
837   ret = prepared_statement_run (plugin,
838                                 plugin->delete_entry_by_uid,
839                                 NULL,
840                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
841                                 -1);
842   if (ret > 0)
843     return GNUNET_OK;
844   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
845               "Deleting value %llu from gn090 table failed\n",
846               uid);
847   return ret;
848 }
849
850
851 /**
852  * Function that simply returns GNUNET_OK
853  *
854  * @param cls closure, not used
855  * @param num_values not used
856  * @param values not used
857  * @return GNUNET_OK
858  */
859 static int
860 return_ok (void *cls, 
861            unsigned int num_values, 
862            MYSQL_BIND * values)
863 {
864   return GNUNET_OK;
865 }
866
867
868 /**
869  * Continuation of "mysql_next_request".
870  *
871  * @param next_cls the next context
872  * @param tc the task context (unused)
873  */
874 static void 
875 mysql_next_request_cont (void *next_cls,
876                          const struct GNUNET_SCHEDULER_TaskContext *tc)
877 {
878   struct NextRequestClosure *nrc = next_cls;
879   struct Plugin *plugin;
880   int ret;
881   unsigned int type;
882   unsigned int priority;
883   unsigned int anonymity;
884   unsigned long long exp;
885   unsigned long hashSize;
886   unsigned long size;
887   unsigned long long uid;
888   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
889   GNUNET_HashCode key;
890   struct GNUNET_TIME_Absolute expiration;
891   MYSQL_BIND *rbind = nrc->rbind;
892
893   plugin = nrc->plugin;
894   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
895   plugin->next_task_nc = NULL;
896
897   if (GNUNET_YES == nrc->end_it) 
898     goto END_SET;
899   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
900   nrc->now = GNUNET_TIME_absolute_get ();
901   hashSize = sizeof (GNUNET_HashCode);
902   memset (nrc->rbind, 0, sizeof (nrc->rbind));
903   rbind = nrc->rbind;
904   rbind[0].buffer_type = MYSQL_TYPE_LONG;
905   rbind[0].buffer = &type;
906   rbind[0].is_unsigned = 1;
907   rbind[1].buffer_type = MYSQL_TYPE_LONG;
908   rbind[1].buffer = &priority;
909   rbind[1].is_unsigned = 1;
910   rbind[2].buffer_type = MYSQL_TYPE_LONG;
911   rbind[2].buffer = &anonymity;
912   rbind[2].is_unsigned = 1;
913   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
914   rbind[3].buffer = &exp;
915   rbind[3].is_unsigned = 1;
916   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
917   rbind[4].buffer = &key;
918   rbind[4].buffer_length = hashSize;
919   rbind[4].length = &hashSize;
920   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
921   rbind[5].buffer = value;
922   rbind[5].buffer_length = size = sizeof (value);
923   rbind[5].length = &size;
924   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
925   rbind[6].buffer = &uid;
926   rbind[6].is_unsigned = 1;
927
928   if (GNUNET_OK != nrc->prep (nrc->prep_cls,
929                               nrc))
930     goto END_SET;
931   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
932   GNUNET_assert (size <= sizeof(value));
933   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
934        (hashSize != sizeof (GNUNET_HashCode)) )
935     {
936       GNUNET_break (0);
937       goto END_SET;
938     }     
939 #if DEBUG_MYSQL
940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
942               (unsigned int) size,
943               GNUNET_h2s (&key),
944               priority,
945               anonymity,
946               exp);
947 #endif
948   expiration.abs_value = exp;
949   ret = nrc->dviter (nrc->dviter_cls, (nrc->end_it == GNUNET_YES) ? NULL : nrc,
950                      &key,
951                      size, value,
952                      type, priority, anonymity, expiration,
953                      uid);
954   if (ret == GNUNET_SYSERR)
955     {
956       nrc->end_it = GNUNET_YES;
957       return;
958     }
959   if (ret == GNUNET_NO)
960     {
961       do_delete_entry (plugin, uid);
962       if (size != 0)
963         plugin->env->duc (plugin->env->cls,
964                           - size);
965     }
966   return;
967  END_SET:
968   /* call dviter with "end of set" */
969   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
970   nrc->dviter (nrc->dviter_cls, 
971                NULL, NULL, 0, NULL, 0, 0, 0, 
972                GNUNET_TIME_UNIT_ZERO_ABS, 0);
973   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
974   nrc->prep (nrc->prep_cls, NULL);
975   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
976   GNUNET_free (nrc);
977 }
978
979
980 /**
981  * Function invoked on behalf of a "PluginIterator"
982  * asking the database plugin to call the iterator
983  * with the next item.
984  *
985  * @param next_cls whatever argument was given
986  *        to the PluginIterator as "next_cls".
987  * @param end_it set to GNUNET_YES if we
988  *        should terminate the iteration early
989  *        (iterator should be still called once more
990  *         to signal the end of the iteration).
991  */
992 static void 
993 mysql_plugin_next_request (void *next_cls,
994                            int end_it)
995 {
996   struct NextRequestClosure *nrc = next_cls;
997
998   if (GNUNET_YES == end_it)
999     nrc->end_it = GNUNET_YES;
1000   nrc->plugin->next_task_nc = nrc;
1001   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1002                                                      nrc);
1003 }  
1004
1005
1006 /**
1007  * Get an estimate of how much space the database is
1008  * currently using.
1009  *
1010  * @param cls our "struct Plugin *"
1011  * @return number of bytes used on disk
1012  */
1013 static unsigned long long
1014 mysql_plugin_get_size (void *cls)
1015 {
1016   struct Plugin *plugin = cls;
1017   MYSQL_BIND cbind[1];
1018   long long total;
1019
1020   memset (cbind, 0, sizeof (cbind));
1021   total = 0;
1022   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1023   cbind[0].buffer = &total;
1024   cbind[0].is_unsigned = GNUNET_NO;
1025   if (GNUNET_OK != 
1026       prepared_statement_run_select (plugin,
1027                                      plugin->get_size,
1028                                      1, cbind, 
1029                                      &return_ok, NULL,
1030                                      -1))
1031     return 0;
1032   return total;
1033 }
1034
1035
1036 /**
1037  * Store an item in the datastore.
1038  *
1039  * @param cls closure
1040  * @param key key for the item
1041  * @param size number of bytes in data
1042  * @param data content stored
1043  * @param type type of the content
1044  * @param priority priority of the content
1045  * @param anonymity anonymity-level for the content
1046  * @param replication replication-level for the content
1047  * @param expiration expiration time for the content
1048  * @param msg set to error message
1049  * @return GNUNET_OK on success
1050  */
1051 static int
1052 mysql_plugin_put (void *cls,
1053                   const GNUNET_HashCode * key,
1054                   uint32_t size,
1055                   const void *data,
1056                   enum GNUNET_BLOCK_Type type,
1057                   uint32_t priority,
1058                   uint32_t anonymity,
1059                   uint32_t replication,
1060                   struct GNUNET_TIME_Absolute expiration,
1061                   char **msg)
1062 {
1063   struct Plugin *plugin = cls;
1064   unsigned int irepl = replication;
1065   unsigned int itype = type;
1066   unsigned int ipriority = priority;
1067   unsigned int ianonymity = anonymity;
1068   unsigned long long lexpiration = expiration.abs_value;
1069   unsigned long hashSize;
1070   unsigned long hashSize2;
1071   unsigned long lsize;
1072   GNUNET_HashCode vhash;
1073
1074   if (size > MAX_DATUM_SIZE)
1075     {
1076       GNUNET_break (0);
1077       return GNUNET_SYSERR;
1078     }
1079   hashSize = sizeof (GNUNET_HashCode);
1080   hashSize2 = sizeof (GNUNET_HashCode);
1081   lsize = size;
1082   GNUNET_CRYPTO_hash (data, size, &vhash);
1083   if (GNUNET_OK !=
1084       prepared_statement_run (plugin,
1085                               plugin->insert_entry,
1086                               NULL,
1087                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
1088                               MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1089                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
1090                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
1091                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
1092                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1093                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
1094                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
1095                               -1))
1096     return GNUNET_SYSERR;    
1097 #if DEBUG_MYSQL
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099               "Inserted value `%s' with size %u into gn090 table\n",
1100               GNUNET_h2s (key),
1101               (unsigned int) size);
1102 #endif
1103   if (size > 0)
1104     plugin->env->duc (plugin->env->cls,
1105                       size);
1106   return GNUNET_OK;
1107 }
1108
1109
1110 /**
1111  * Update the priority for a particular key in the datastore.  If
1112  * the expiration time in value is different than the time found in
1113  * the datastore, the higher value should be kept.  For the
1114  * anonymity level, the lower value is to be used.  The specified
1115  * priority should be added to the existing priority, ignoring the
1116  * priority in value.
1117  *
1118  * Note that it is possible for multiple values to match this put.
1119  * In that case, all of the respective values are updated.
1120  *
1121  * @param cls our "struct Plugin*"
1122  * @param uid unique identifier of the datum
1123  * @param delta by how much should the priority
1124  *     change?  If priority + delta < 0 the
1125  *     priority should be set to 0 (never go
1126  *     negative).
1127  * @param expire new expiration time should be the
1128  *     MAX of any existing expiration time and
1129  *     this value
1130  * @param msg set to error message
1131  * @return GNUNET_OK on success
1132  */
1133 static int
1134 mysql_plugin_update (void *cls,
1135                      uint64_t uid,
1136                      int delta, 
1137                      struct GNUNET_TIME_Absolute expire,
1138                      char **msg)
1139 {
1140   struct Plugin *plugin = cls;
1141   unsigned long long vkey = uid;
1142   unsigned long long lexpire = expire.abs_value;
1143   int ret;
1144
1145 #if DEBUG_MYSQL
1146   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
1148               vkey,
1149               delta,
1150               lexpire);
1151 #endif
1152   ret = prepared_statement_run (plugin,
1153                                 plugin->update_entry,
1154                                 NULL,
1155                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
1156                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1157                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1158                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
1159                                 -1);
1160   if (ret != GNUNET_OK)
1161     {
1162       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1163                   "Failed to update value %llu\n",
1164                   vkey);
1165     }
1166   return ret;
1167 }
1168
1169
1170 struct GetContext
1171 {
1172   GNUNET_HashCode key;
1173   GNUNET_HashCode vhash;
1174
1175   unsigned int prio;
1176   unsigned int anonymity;
1177   unsigned long long expiration;
1178   unsigned long long vkey;
1179   unsigned long long total;
1180   unsigned int off;
1181   unsigned int count;
1182   int have_vhash;
1183 };
1184
1185
1186 static int
1187 get_statement_prepare (void *cls,
1188                        struct NextRequestClosure *nrc)
1189 {
1190   struct GetContext *gc = cls;
1191   struct Plugin *plugin;
1192   int ret;
1193   unsigned long hashSize;
1194   
1195   if (NULL == nrc)
1196     {
1197       GNUNET_free (gc);
1198       return GNUNET_NO;
1199     }
1200   if (gc->count == gc->total)
1201     return GNUNET_NO;
1202   plugin = nrc->plugin;
1203   hashSize = sizeof (GNUNET_HashCode);
1204   if (++gc->off >= gc->total)
1205     gc->off = 0;
1206 #if DEBUG_MYSQL
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208               "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
1209               gc->count+1,
1210               gc->total,
1211               gc->off,
1212               GNUNET_h2s (&gc->key));  
1213 #endif
1214   if (nrc->type != 0)
1215     {
1216       if (gc->have_vhash)
1217         {
1218           ret = prepared_statement_run_select (plugin,
1219                                                plugin->select_entry_by_hash_vhash_and_type, 
1220                                                7, nrc->rbind, 
1221                                                &return_ok, NULL, 
1222                                                MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1223                                                MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1224                                                MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1225                                                MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1226                                                -1);
1227         }
1228       else
1229         {
1230           ret =
1231             prepared_statement_run_select (plugin,
1232                                            plugin->select_entry_by_hash_and_type, 
1233                                            7, nrc->rbind, 
1234                                            &return_ok, NULL,
1235                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1236                                            MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1237                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1238                                            -1);
1239         }
1240     }
1241   else
1242     {
1243       if (gc->have_vhash)
1244         {
1245           ret =
1246             prepared_statement_run_select (plugin,
1247                                            plugin->select_entry_by_hash_and_vhash, 
1248                                            7, nrc->rbind, 
1249                                            &return_ok, NULL,
1250                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
1251                                            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
1252                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1253                                            -1);
1254         }
1255       else
1256         {
1257           ret =
1258             prepared_statement_run_select (plugin,
1259                                            plugin->select_entry_by_hash, 
1260                                            7, nrc->rbind, 
1261                                            &return_ok, NULL,
1262                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1263                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1264                                            -1);
1265         }
1266     }
1267   gc->count++;
1268   return ret;
1269 }
1270
1271
1272 /**
1273  * Iterate over the results for a particular key in the datastore.
1274  *
1275  * @param cls closure
1276  * @param key maybe NULL (to match all entries)
1277  * @param vhash hash of the value, maybe NULL (to
1278  *        match all values that have the right key).
1279  *        Note that for DBlocks there is no difference
1280  *        betwen key and vhash, but for other blocks
1281  *        there may be!
1282  * @param type entries of which type are relevant?
1283  *        Use 0 for any type.
1284  * @param iter function to call on each matching value;
1285  *        will be called once with a NULL value at the end
1286  * @param iter_cls closure for iter
1287  */
1288 static void
1289 mysql_plugin_get (void *cls,
1290                   const GNUNET_HashCode *key,
1291                   const GNUNET_HashCode *vhash,
1292                   enum GNUNET_BLOCK_Type type,
1293                   PluginIterator iter, void *iter_cls)
1294 {
1295   struct Plugin *plugin = cls;
1296   unsigned int itype = type;
1297   int ret;
1298   MYSQL_BIND cbind[1];
1299   struct GetContext *gc;
1300   struct NextRequestClosure *nrc;
1301   long long total;
1302   unsigned long hashSize;
1303
1304   GNUNET_assert (key != NULL);
1305   if (iter == NULL) 
1306     return;
1307   hashSize = sizeof (GNUNET_HashCode);
1308   memset (cbind, 0, sizeof (cbind));
1309   total = -1;
1310   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1311   cbind[0].buffer = &total;
1312   cbind[0].is_unsigned = GNUNET_NO;
1313   if (type != 0)
1314     {
1315       if (vhash != NULL)
1316         {
1317           ret =
1318             prepared_statement_run_select (plugin,
1319                                            plugin->count_entry_by_hash_vhash_and_type, 
1320                                            1, cbind, 
1321                                            &return_ok, NULL,
1322                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1323                                            MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1324                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1325                                            -1);
1326         }
1327       else
1328         {
1329           ret =
1330             prepared_statement_run_select (plugin,
1331                                            plugin->count_entry_by_hash_and_type, 
1332                                            1, cbind, 
1333                                            &return_ok, NULL,
1334                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1335                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1336                                            -1);
1337         }
1338     }
1339   else
1340     {
1341       if (vhash != NULL)
1342         {
1343           ret =
1344             prepared_statement_run_select (plugin,
1345                                            plugin->count_entry_by_hash_and_vhash, 
1346                                            1, cbind,
1347                                            &return_ok, NULL,
1348                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1349                                            MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1350                                            -1);
1351
1352         }
1353       else
1354         {
1355           ret =
1356             prepared_statement_run_select (plugin,
1357                                            plugin->count_entry_by_hash,
1358                                            1, cbind, 
1359                                            &return_ok, NULL, 
1360                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1361                                            -1);
1362         }
1363     }
1364   if ((ret != GNUNET_OK) || (0 >= total))
1365     {
1366       iter (iter_cls, 
1367             NULL, NULL, 0, NULL, 0, 0, 0, 
1368             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1369       return;
1370     }
1371 #if DEBUG_MYSQL
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Iterating over %lld results for GET `%s'\n",
1374               total,
1375               GNUNET_h2s (key));
1376 #endif
1377   gc = GNUNET_malloc (sizeof (struct GetContext));
1378   gc->key = *key;
1379   if (vhash != NULL)
1380     {
1381       gc->have_vhash = GNUNET_YES;
1382       gc->vhash = *vhash;
1383     }
1384   gc->total = total;
1385   gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1386   
1387
1388   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1389   nrc->plugin = plugin;
1390   nrc->type = type;  
1391   nrc->dviter = iter;
1392   nrc->dviter_cls = iter_cls;
1393   nrc->prep = &get_statement_prepare;
1394   nrc->prep_cls = gc;
1395   mysql_plugin_next_request (nrc, GNUNET_NO);
1396 }
1397
1398
1399 /**
1400  * Run the prepared statement to get the next data item ready.
1401  * 
1402  * @param cls not used
1403  * @param nrc closure for the next request iterator
1404  * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1405  */
1406 static int
1407 iterator_zero_prepare (void *cls,
1408                        struct NextRequestClosure *nrc)
1409 {
1410   struct Plugin *plugin;
1411   int ret;
1412
1413   if (nrc == NULL)
1414     return GNUNET_NO;
1415   plugin = nrc->plugin;
1416   ret = prepared_statement_run_select (plugin,
1417                                        plugin->zero_iter,
1418                                        7, nrc->rbind,
1419                                        &return_ok, NULL,
1420                                        MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
1421                                        -1);
1422   nrc->count++;
1423   return ret;
1424 }
1425
1426
1427 /**
1428  * Select a subset of the items in the datastore and call
1429  * the given iterator for each of them.
1430  *
1431  * @param cls our "struct Plugin*"
1432  * @param type entries of which type should be considered?
1433  *        Use 0 for any type.
1434  * @param iter function to call on each matching value;
1435  *        will be called once with a NULL value at the end
1436  * @param iter_cls closure for iter
1437  */
1438 static void
1439 mysql_plugin_iter_zero_anonymity (void *cls,
1440                                   enum GNUNET_BLOCK_Type type,
1441                                   PluginIterator iter,
1442                                   void *iter_cls)
1443 {
1444   struct Plugin *plugin = cls;
1445   struct NextRequestClosure *nrc;
1446
1447   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1448   nrc->plugin = plugin;
1449   nrc->type = type;  
1450   nrc->dviter = iter;
1451   nrc->dviter_cls = iter_cls;
1452   nrc->prep = &iterator_zero_prepare;
1453   mysql_plugin_next_request (nrc, GNUNET_NO);
1454 }
1455
1456
1457 /**
1458  * Run the SELECT statement for the replication function.
1459  * 
1460  * @param cls the 'struct Plugin'
1461  * @param nrc the context (not used)
1462  */
1463 static int
1464 replication_prepare (void *cls,
1465                      struct NextRequestClosure *nrc)
1466 {
1467   struct Plugin *plugin = cls;
1468
1469   nrc->end_it = GNUNET_YES;
1470   return prepared_statement_run_select (plugin,
1471                                         plugin->select_replication, 
1472                                         7, nrc->rbind, 
1473                                         &return_ok, NULL,
1474                                         -1);
1475 }
1476
1477
1478 /**
1479  * Get a random item for replication.  Returns a single, not expired, random item
1480  * from those with the highest replication counters.  The item's 
1481  * replication counter is decremented by one IF it was positive before.
1482  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1483  *
1484  * @param cls closure
1485  * @param iter function to call the value (once only).
1486  * @param iter_cls closure for iter
1487  */
1488 static void
1489 mysql_plugin_replication_get (void *cls,
1490                               PluginIterator iter, void *iter_cls)
1491 {
1492   struct Plugin *plugin = cls;
1493   struct NextRequestClosure nrc;
1494
1495   memset (&nrc, 0, sizeof (nrc));
1496   nrc.plugin = plugin;
1497   nrc.now = GNUNET_TIME_absolute_get ();
1498   nrc.prep = &replication_prepare;
1499   nrc.prep_cls = plugin;
1500   nrc.type = 0;
1501   nrc.dviter = iter;
1502   nrc.dviter_cls = iter_cls;
1503   nrc.end_it = GNUNET_NO;
1504   mysql_next_request_cont (&nrc, NULL);
1505 }
1506
1507
1508 /**
1509  * Run the SELECT statement for the expiration function.
1510  * 
1511  * @param cls the 'struct Plugin'
1512  * @param nrc the context (not used)
1513  * @return GNUNET_OK on success, GNUNET_NO if there are
1514  *         no more values, GNUNET_SYSERR on error
1515  */
1516 static int
1517 expiration_prepare (void *cls,
1518                     struct NextRequestClosure *nrc)
1519 {
1520   struct Plugin *plugin = cls;
1521   long long nt;
1522
1523   if (NULL == nrc)
1524     return GNUNET_NO;
1525   nrc->end_it = GNUNET_YES;
1526   nt = (long long) nrc->now.abs_value;
1527   return prepared_statement_run_select
1528     (plugin,
1529      plugin->select_expiration, 
1530      7, nrc->rbind, 
1531      &return_ok, NULL,
1532      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1533      -1);
1534 }
1535
1536
1537 /**
1538  * Get a random item for expiration.
1539  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1540  *
1541  * @param cls closure
1542  * @param iter function to call the value (once only).
1543  * @param iter_cls closure for iter
1544  */
1545 static void
1546 mysql_plugin_expiration_get (void *cls,
1547                              PluginIterator iter, void *iter_cls)
1548 {
1549   struct Plugin *plugin = cls;
1550   struct NextRequestClosure nrc;
1551
1552   memset (&nrc, 0, sizeof (nrc));
1553   nrc.plugin = plugin;
1554   nrc.now = GNUNET_TIME_absolute_get ();
1555   nrc.prep = &expiration_prepare;
1556   nrc.prep_cls = plugin;
1557   nrc.type = 0;
1558   nrc.dviter = iter;
1559   nrc.dviter_cls = iter_cls;
1560   nrc.end_it = GNUNET_NO;
1561   mysql_next_request_cont (&nrc, NULL);
1562 }
1563
1564
1565 /**
1566  * Drop database.
1567  *
1568  * @param cls the "struct Plugin*"
1569  */
1570 static void 
1571 mysql_plugin_drop (void *cls)
1572 {
1573   struct Plugin *plugin = cls;
1574
1575   if (GNUNET_OK != run_statement (plugin,
1576                                   "DROP TABLE gn090"))
1577     return;           /* error */
1578   plugin->env->duc (plugin->env->cls, 0);
1579 }
1580
1581
1582 /**
1583  * Entry point for the plugin.
1584  *
1585  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1586  * @return our "struct Plugin*"
1587  */
1588 void *
1589 libgnunet_plugin_datastore_mysql_init (void *cls)
1590 {
1591   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1592   struct GNUNET_DATASTORE_PluginFunctions *api;
1593   struct Plugin *plugin;
1594
1595   plugin = GNUNET_malloc (sizeof (struct Plugin));
1596   plugin->env = env;
1597   plugin->cnffile = get_my_cnf_path (env->cfg);
1598   if (GNUNET_OK != iopen (plugin))
1599     {
1600       iclose (plugin);
1601       GNUNET_free_non_null (plugin->cnffile);
1602       GNUNET_free (plugin);
1603       return NULL;
1604     }
1605 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1606 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1607   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1608              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1609              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1610              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1611              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1612              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1613              " hash BINARY(64) NOT NULL DEFAULT '',"
1614              " vhash BINARY(64) NOT NULL DEFAULT '',"
1615              " value BLOB NOT NULL DEFAULT '',"
1616              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1617              " PRIMARY KEY (uid),"
1618              " INDEX idx_hash (hash(64)),"
1619              " INDEX idx_hash_uid (hash(64),uid),"
1620              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1621              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1622              " INDEX idx_prio (prio),"
1623              " INDEX idx_repl (repl),"
1624              " INDEX idx_expire_prio (expire,prio),"
1625              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1626              ") ENGINE=InnoDB") ||
1627       MRUNS ("SET AUTOCOMMIT = 1") ||
1628       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1629       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1630       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1631       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1632       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1633       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1634                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1635       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1636       || PINIT (plugin->get_size, SELECT_SIZE)
1637       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1638       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1639       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1640                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1641       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1642       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1643       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1644       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1645     {
1646       iclose (plugin);
1647       GNUNET_free_non_null (plugin->cnffile);
1648       GNUNET_free (plugin);
1649       return NULL;
1650     }
1651 #undef PINIT
1652 #undef MRUNS
1653
1654   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1655   api->cls = plugin;
1656   api->get_size = &mysql_plugin_get_size;
1657   api->put = &mysql_plugin_put;
1658   api->next_request = &mysql_plugin_next_request;
1659   api->get = &mysql_plugin_get;
1660   api->replication_get = &mysql_plugin_replication_get;
1661   api->expiration_get = &mysql_plugin_expiration_get;
1662   api->update = &mysql_plugin_update;
1663   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1664   api->drop = &mysql_plugin_drop;
1665   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1666                    "mysql", _("Mysql database running\n"));
1667   return api;
1668 }
1669
1670
1671 /**
1672  * Exit point from the plugin.
1673  * @param cls our "struct Plugin*"
1674  * @return always NULL
1675  */
1676 void *
1677 libgnunet_plugin_datastore_mysql_done (void *cls)
1678 {
1679   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1680   struct Plugin *plugin = api->cls;
1681
1682   iclose (plugin);
1683   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1684     {
1685       GNUNET_SCHEDULER_cancel (plugin->next_task);
1686       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1687       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1688       GNUNET_free (plugin->next_task_nc);
1689       plugin->next_task_nc = NULL;
1690     }
1691   GNUNET_free_non_null (plugin->cnffile);
1692   GNUNET_free (plugin);
1693   GNUNET_free (api);
1694   mysql_library_end ();
1695   return NULL;
1696 }
1697
1698 /* end of plugin_datastore_mysql.c */