getting mysql code to compile again, fixes to sqlite code
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163 /**
164  * Context for the universal iterator.
165  */
166 struct NextRequestClosure;
167
168 /**
169  * Type of a function that will prepare
170  * the next iteration.
171  *
172  * @param cls closure
173  * @param nc the next context; NULL for the last
174  *         call which gives the callback a chance to
175  *         clean up the closure
176  * @return GNUNET_OK on success, GNUNET_NO if there are
177  *         no more values, GNUNET_SYSERR on error
178  */
179 typedef int (*PrepareFunction)(void *cls,
180                                struct NextRequestClosure *nc);
181
182
183 struct NextRequestClosure
184 {
185   struct Plugin *plugin;
186
187   struct GNUNET_TIME_Absolute now;
188
189   /**
190    * Function to call to prepare the next
191    * iteration.
192    */
193   PrepareFunction prep;
194
195   /**
196    * Closure for prep.
197    */
198   void *prep_cls;
199
200   MYSQL_BIND rbind[7];
201
202   enum GNUNET_BLOCK_Type type;
203
204   PluginIterator dviter;
205
206   void *dviter_cls;
207
208   unsigned int count;
209
210   int end_it;
211 };
212
213
214 /**
215  * Context for all functions in this plugin.
216  */
217 struct Plugin 
218 {
219   /**
220    * Our execution environment.
221    */
222   struct GNUNET_DATASTORE_PluginEnvironment *env;
223
224   /**
225    * Handle to talk to MySQL.
226    */
227   MYSQL *dbf;
228   
229   /**
230    * We keep all prepared statements in a DLL.  This is the head.
231    */
232   struct GNUNET_MysqlStatementHandle *shead;
233
234   /**
235    * We keep all prepared statements in a DLL.  This is the tail.
236    */
237   struct GNUNET_MysqlStatementHandle *stail;
238
239   /**
240    * Filename of "my.cnf" (msyql configuration).
241    */
242   char *cnffile;
243
244   /**
245    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
246    */
247   struct NextRequestClosure *next_task_nc;
248
249   /**
250    * Pending task with scheduler for running the next request.
251    */
252   GNUNET_SCHEDULER_TaskIdentifier next_task;
253
254   /**
255    * Prepared statements.
256    */
257 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
258   struct GNUNET_MysqlStatementHandle *insert_entry;
259   
260 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
261   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
262
263 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
264   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
265   
266 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_uid) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
267   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
268
269 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
270   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
271
272 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
273   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
274  
275 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
276   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
277
278 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
279   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
280  
281 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
282   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
283   
284 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
285   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
286
287 #define UPDATE_ENTRY "UPDATE gn090 FORCE INDEX (uid) SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1"
288   struct GNUNET_MysqlStatementHandle *update_entry;
289
290 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
291   struct GNUNET_MysqlStatementHandle *get_size;
292
293 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_anonLevel_uid) WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
294   struct GNUNET_MysqlStatementHandle *zero_iter;
295
296 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_expire_prio) WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
297   "UNION "\
298   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_prio) ORDER BY prio ASC LIMIT 1) "\
299   "ORDER BY expire ASC LIMIT 1"
300   struct GNUNET_MysqlStatementHandle *select_expiration;
301
302 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_repl) ORDER BY repl DESC,RAND() LIMIT 1"
303   struct GNUNET_MysqlStatementHandle *select_replication;
304
305 };
306
307
308 /**
309  * Obtain the location of ".my.cnf".
310  *
311  * @param cfg our configuration
312  * @return NULL on error
313  */
314 static char *
315 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
316 {
317   char *cnffile;
318   char *home_dir;
319   struct stat st;
320 #ifndef WINDOWS
321   struct passwd *pw;
322 #endif
323   int configured;
324
325 #ifndef WINDOWS
326   pw = getpwuid (getuid ());
327   if (!pw)
328     {
329       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
330                            "getpwuid");
331       return NULL;
332     }
333   if (GNUNET_YES ==
334       GNUNET_CONFIGURATION_have_value (cfg,
335                                        "datastore-mysql", "CONFIG"))
336     {
337       GNUNET_assert (GNUNET_OK == 
338                      GNUNET_CONFIGURATION_get_value_filename (cfg,
339                                                               "datastore-mysql", "CONFIG", &cnffile));
340       configured = GNUNET_YES;
341     }
342   else
343     {
344       home_dir = GNUNET_strdup (pw->pw_dir);
345 #else
346       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
347       plibc_conv_to_win_path ("~/", home_dir);
348 #endif
349       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
350       GNUNET_free (home_dir);
351       configured = GNUNET_NO;
352     }
353   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
354               _("Trying to use file `%s' for MySQL configuration.\n"),
355               cnffile);
356   if ((0 != STAT (cnffile, &st)) ||
357       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
358     {
359       if (configured == GNUNET_YES)
360         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
361                     _("Could not access file `%s': %s\n"), cnffile,
362                     STRERROR (errno));
363       GNUNET_free (cnffile);
364       return NULL;
365     }
366   return cnffile;
367 }
368
369
370
371 /**
372  * Free a prepared statement.
373  *
374  * @param plugin plugin context
375  * @param s prepared statement
376  */
377 static void
378 prepared_statement_destroy (struct Plugin *plugin, 
379                             struct GNUNET_MysqlStatementHandle
380                             *s)
381 {
382   GNUNET_CONTAINER_DLL_remove (plugin->shead,
383                                plugin->stail,
384                                s);
385   if (s->valid)
386     mysql_stmt_close (s->statement);
387   GNUNET_free (s->query);
388   GNUNET_free (s);
389 }
390
391
392 /**
393  * Close database connection and all prepared statements (we got a DB
394  * disconnect error).
395  */
396 static int
397 iclose (struct Plugin *plugin)
398 {
399   struct GNUNET_MysqlStatementHandle *spos;
400
401   spos = plugin->shead;
402   while (NULL != plugin->shead)
403     prepared_statement_destroy (plugin,
404                                 plugin->shead);
405   if (plugin->dbf != NULL)
406     {
407       mysql_close (plugin->dbf);
408       plugin->dbf = NULL;
409     }
410   return GNUNET_OK;
411 }
412
413
414 /**
415  * Open the connection with the database (and initialize
416  * our default options).
417  *
418  * @return GNUNET_OK on success
419  */
420 static int
421 iopen (struct Plugin *ret)
422 {
423   char *mysql_dbname;
424   char *mysql_server;
425   char *mysql_user;
426   char *mysql_password;
427   unsigned long long mysql_port;
428   my_bool reconnect;
429   unsigned int timeout;
430
431   ret->dbf = mysql_init (NULL);
432   if (ret->dbf == NULL)
433     return GNUNET_SYSERR;
434   if (ret->cnffile != NULL)
435     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
436   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
437   reconnect = 0;
438   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
439   mysql_options (ret->dbf,
440                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
441   mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
442   timeout = 60; /* in seconds */
443   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
444   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
445   mysql_dbname = NULL;
446   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
447                                                      "datastore-mysql", "DATABASE"))
448     GNUNET_assert (GNUNET_OK == 
449                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
450                                                           "datastore-mysql", "DATABASE", 
451                                                           &mysql_dbname));
452   else
453     mysql_dbname = GNUNET_strdup ("gnunet");
454   mysql_user = NULL;
455   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
456                                                      "datastore-mysql", "USER"))
457     {
458       GNUNET_assert (GNUNET_OK == 
459                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
460                                                            "datastore-mysql", "USER", 
461                                                            &mysql_user));
462     }
463   mysql_password = NULL;
464   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
465                                                      "datastore-mysql", "PASSWORD"))
466     {
467       GNUNET_assert (GNUNET_OK ==
468                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
469                                                            "datastore-mysql", "PASSWORD",
470                                                            &mysql_password));
471     }
472   mysql_server = NULL;
473   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
474                                                      "datastore-mysql", "HOST"))
475     {
476       GNUNET_assert (GNUNET_OK == 
477                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
478                                                            "datastore-mysql", "HOST", 
479                                                            &mysql_server));
480     }
481   mysql_port = 0;
482   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
483                                                      "datastore-mysql", "PORT"))
484     {
485       GNUNET_assert (GNUNET_OK ==
486                     GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
487                                                            "PORT", &mysql_port));
488     }
489
490   GNUNET_assert (mysql_dbname != NULL);
491   mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
492                       mysql_dbname, (unsigned int) mysql_port, NULL,
493                       CLIENT_IGNORE_SIGPIPE);
494   GNUNET_free_non_null (mysql_server);
495   GNUNET_free_non_null (mysql_user);
496   GNUNET_free_non_null (mysql_password);
497   GNUNET_free (mysql_dbname);
498   if (mysql_error (ret->dbf)[0])
499     {
500       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
501                  "mysql_real_connect", ret);
502       return GNUNET_SYSERR;
503     }
504   return GNUNET_OK;
505 }
506
507
508 /**
509  * Run the given MySQL statement.
510  *
511  * @param plugin plugin context
512  * @param statement SQL statement to run
513  * @return GNUNET_OK on success, GNUNET_SYSERR on error
514  */
515 static int
516 run_statement (struct Plugin *plugin,
517                const char *statement)
518 {
519   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
520     return GNUNET_SYSERR;
521   mysql_query (plugin->dbf, statement);
522   if (mysql_error (plugin->dbf)[0])
523     {
524       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
525                  "mysql_query", plugin);
526       iclose (plugin);
527       return GNUNET_SYSERR;
528     }
529   return GNUNET_OK;
530 }
531
532
533 /**
534  * Create a prepared statement.
535  *
536  * @param plugin plugin context
537  * @param statement SQL statement text to prepare
538  * @return NULL on error
539  */
540 static struct GNUNET_MysqlStatementHandle *
541 prepared_statement_create (struct Plugin *plugin, 
542                            const char *statement)
543 {
544   struct GNUNET_MysqlStatementHandle *ret;
545
546   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
547   ret->query = GNUNET_strdup (statement);
548   GNUNET_CONTAINER_DLL_insert (plugin->shead,
549                                plugin->stail,
550                                ret);
551   return ret;
552 }
553
554
555 /**
556  * Prepare a statement for running.
557  *
558  * @param plugin plugin context
559  * @param ret handle to prepared statement
560  * @return GNUNET_OK on success
561  */
562 static int
563 prepare_statement (struct Plugin *plugin, 
564                    struct GNUNET_MysqlStatementHandle *ret)
565 {
566   if (GNUNET_YES == ret->valid)
567     return GNUNET_OK;
568   if ((NULL == plugin->dbf) && 
569       (GNUNET_OK != iopen (plugin)))
570     return GNUNET_SYSERR;
571   ret->statement = mysql_stmt_init (plugin->dbf);
572   if (ret->statement == NULL)
573     {
574       iclose (plugin);
575       return GNUNET_SYSERR;
576     }
577   if (mysql_stmt_prepare (ret->statement, 
578                           ret->query,
579                           strlen (ret->query)))
580     {
581       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
582                  "mysql_stmt_prepare", 
583                  plugin);
584       mysql_stmt_close (ret->statement);
585       ret->statement = NULL;
586       iclose (plugin);
587       return GNUNET_SYSERR;
588     }
589   ret->valid = GNUNET_YES;
590   return GNUNET_OK;
591
592 }
593
594
595 /**
596  * Bind the parameters for the given MySQL statement
597  * and run it.
598  *
599  * @param plugin plugin context
600  * @param s statement to bind and run
601  * @param ap arguments for the binding
602  * @return GNUNET_SYSERR on error, GNUNET_OK on success
603  */
604 static int
605 init_params (struct Plugin *plugin,
606              struct GNUNET_MysqlStatementHandle *s,
607              va_list ap)
608 {
609   MYSQL_BIND qbind[MAX_PARAM];
610   unsigned int pc;
611   unsigned int off;
612   enum enum_field_types ft;
613
614   pc = mysql_stmt_param_count (s->statement);
615   if (pc > MAX_PARAM)
616     {
617       /* increase internal constant! */
618       GNUNET_break (0);
619       return GNUNET_SYSERR;
620     }
621   memset (qbind, 0, sizeof (qbind));
622   off = 0;
623   ft = 0;
624   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
625     {
626       qbind[off].buffer_type = ft;
627       switch (ft)
628         {
629         case MYSQL_TYPE_FLOAT:
630           qbind[off].buffer = va_arg (ap, float *);
631           break;
632         case MYSQL_TYPE_LONGLONG:
633           qbind[off].buffer = va_arg (ap, unsigned long long *);
634           qbind[off].is_unsigned = va_arg (ap, int);
635           break;
636         case MYSQL_TYPE_LONG:
637           qbind[off].buffer = va_arg (ap, unsigned int *);
638           qbind[off].is_unsigned = va_arg (ap, int);
639           break;
640         case MYSQL_TYPE_VAR_STRING:
641         case MYSQL_TYPE_STRING:
642         case MYSQL_TYPE_BLOB:
643           qbind[off].buffer = va_arg (ap, void *);
644           qbind[off].buffer_length = va_arg (ap, unsigned long);
645           qbind[off].length = va_arg (ap, unsigned long *);
646           break;
647         default:
648           /* unsupported type */
649           GNUNET_break (0);
650           return GNUNET_SYSERR;
651         }
652       pc--;
653       off++;
654     }
655   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
656     {
657       GNUNET_break (0);
658       return GNUNET_SYSERR;
659     }
660   if (mysql_stmt_bind_param (s->statement, qbind))
661     {
662       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
663                   _("`%s' failed at %s:%d with error: %s\n"),
664                   "mysql_stmt_bind_param",
665                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
666       iclose (plugin);
667       return GNUNET_SYSERR;
668     }
669   if (mysql_stmt_execute (s->statement))
670     {
671       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
672                   _("`%s' failed at %s:%d with error: %s\n"),
673                   "mysql_stmt_execute",
674                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
675       iclose (plugin);
676       return GNUNET_SYSERR;
677     }
678   return GNUNET_OK;
679 }
680
681 /**
682  * Type of a callback that will be called for each
683  * data set returned from MySQL.
684  *
685  * @param cls user-defined argument
686  * @param num_values number of elements in values
687  * @param values values returned by MySQL
688  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
689  */
690 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
691                                           unsigned int num_values,
692                                           MYSQL_BIND *values);
693
694
695 /**
696  * Run a prepared SELECT statement.
697  *
698  * @param plugin plugin context
699  * @param s statement to run
700  * @param result_size number of elements in results array
701  * @param results pointer to already initialized MYSQL_BIND
702  *        array (of sufficient size) for passing results
703  * @param processor function to call on each result
704  * @param processor_cls extra argument to processor
705  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
706  *        values (size + buffer-reference for pointers); terminated
707  *        with "-1"
708  * @return GNUNET_SYSERR on error, otherwise
709  *         the number of successfully affected (or queried) rows
710  */
711 static int
712 prepared_statement_run_select (struct Plugin *plugin,
713                                struct GNUNET_MysqlStatementHandle *s,
714                                unsigned int result_size,
715                                MYSQL_BIND *results,
716                                GNUNET_MysqlDataProcessor processor, void *processor_cls,
717                                ...)
718 {
719   va_list ap;
720   int ret;
721   unsigned int rsize;
722   int total;
723
724   if (GNUNET_OK != prepare_statement (plugin, s))
725     {
726       GNUNET_break (0);
727       return GNUNET_SYSERR;
728     }
729   va_start (ap, processor_cls);
730   if (GNUNET_OK != init_params (plugin, s, ap))
731     {
732       GNUNET_break (0);
733       va_end (ap);
734       return GNUNET_SYSERR;
735     }
736   va_end (ap);
737   rsize = mysql_stmt_field_count (s->statement);
738   if (rsize > result_size)
739     {
740       GNUNET_break (0);
741       return GNUNET_SYSERR;
742     }
743   if (mysql_stmt_bind_result (s->statement, results))
744     {
745       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
746                   _("`%s' failed at %s:%d with error: %s\n"),
747                   "mysql_stmt_bind_result",
748                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
749       iclose (plugin);
750       return GNUNET_SYSERR;
751     }
752
753   total = 0;
754   while (1)
755     {
756       ret = mysql_stmt_fetch (s->statement);
757       if (ret == MYSQL_NO_DATA)
758         break;
759       if (ret != 0)
760         {
761           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
762                       _("`%s' failed at %s:%d with error: %s\n"),
763                       "mysql_stmt_fetch",
764                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
765           iclose (plugin);
766           return GNUNET_SYSERR;
767         }
768       if (processor != NULL)
769         if (GNUNET_OK != processor (processor_cls, rsize, results))
770           break;
771       total++;
772     }
773   mysql_stmt_reset (s->statement);
774   return total;
775 }
776
777
778 /**
779  * Run a prepared statement that does NOT produce results.
780  *
781  * @param plugin plugin context
782  * @param s statement to run
783  * @param insert_id NULL or address where to store the row ID of whatever
784  *        was inserted (only for INSERT statements!)
785  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
786  *        values (size + buffer-reference for pointers); terminated
787  *        with "-1"
788  * @return GNUNET_SYSERR on error, otherwise
789  *         the number of successfully affected rows
790  */
791 static int
792 prepared_statement_run (struct Plugin *plugin,
793                         struct GNUNET_MysqlStatementHandle *s,
794                         unsigned long long *insert_id, ...)
795 {
796   va_list ap;
797   int affected;
798
799   if (GNUNET_OK != prepare_statement (plugin, s))
800     return GNUNET_SYSERR;
801   va_start (ap, insert_id);
802   if (GNUNET_OK != init_params (plugin, s, ap))
803     {
804       va_end (ap);
805       return GNUNET_SYSERR;
806     }
807   va_end (ap);
808   affected = mysql_stmt_affected_rows (s->statement);
809   if (NULL != insert_id)
810     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
811   mysql_stmt_reset (s->statement);
812   return affected;
813 }
814
815
816 /**
817  * Delete an entry from the gn090 table.
818  *
819  * @param plugin plugin context
820  * @param uid unique ID of the entry to delete
821  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
822  */
823 static int
824 do_delete_entry (struct Plugin *plugin,
825                  unsigned long long uid)
826 {
827   int ret;
828  
829 #if DEBUG_MYSQL
830   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831               "Deleting value %llu from gn090 table\n",
832               uid);
833 #endif
834   ret = prepared_statement_run (plugin,
835                                 plugin->delete_entry_by_uid,
836                                 NULL,
837                                 MYSQL_TYPE_LONGLONG, uid, GNUNET_YES,
838                                 -1);
839   if (ret > 0)
840     return GNUNET_OK;
841   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
842               "Deleting value %llu from gn090 table failed\n",
843               uid);
844   return ret;
845 }
846
847
848 /**
849  * Function that simply returns GNUNET_OK
850  *
851  * @param cls closure, not used
852  * @param num_values not used
853  * @param values not used
854  * @return GNUNET_OK
855  */
856 static int
857 return_ok (void *cls, 
858            unsigned int num_values, 
859            MYSQL_BIND * values)
860 {
861   return GNUNET_OK;
862 }
863
864
865 /**
866  * Continuation of "mysql_next_request".
867  *
868  * @param next_cls the next context
869  * @param tc the task context (unused)
870  */
871 static void 
872 mysql_next_request_cont (void *next_cls,
873                          const struct GNUNET_SCHEDULER_TaskContext *tc)
874 {
875   struct NextRequestClosure *nrc = next_cls;
876   struct Plugin *plugin;
877   int ret;
878   unsigned int type;
879   unsigned int priority;
880   unsigned int anonymity;
881   unsigned long long exp;
882   unsigned long hashSize;
883   unsigned long size;
884   unsigned long long uid;
885   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
886   GNUNET_HashCode key;
887   struct GNUNET_TIME_Absolute expiration;
888   MYSQL_BIND *rbind = nrc->rbind;
889
890   plugin = nrc->plugin;
891   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
892   plugin->next_task_nc = NULL;
893
894   if (GNUNET_YES == nrc->end_it) 
895     goto END_SET;
896   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
897   nrc->now = GNUNET_TIME_absolute_get ();
898   hashSize = sizeof (GNUNET_HashCode);
899   memset (nrc->rbind, 0, sizeof (nrc->rbind));
900   rbind = nrc->rbind;
901   rbind[0].buffer_type = MYSQL_TYPE_LONG;
902   rbind[0].buffer = &type;
903   rbind[0].is_unsigned = 1;
904   rbind[1].buffer_type = MYSQL_TYPE_LONG;
905   rbind[1].buffer = &priority;
906   rbind[1].is_unsigned = 1;
907   rbind[2].buffer_type = MYSQL_TYPE_LONG;
908   rbind[2].buffer = &anonymity;
909   rbind[2].is_unsigned = 1;
910   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
911   rbind[3].buffer = &exp;
912   rbind[3].is_unsigned = 1;
913   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
914   rbind[4].buffer = &key;
915   rbind[4].buffer_length = hashSize;
916   rbind[4].length = &hashSize;
917   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
918   rbind[5].buffer = value;
919   rbind[5].buffer_length = size = sizeof (value);
920   rbind[5].length = &size;
921   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
922   rbind[6].buffer = &uid;
923   rbind[6].is_unsigned = 1;
924
925   if (GNUNET_OK != nrc->prep (nrc->prep_cls,
926                               nrc))
927     goto END_SET;
928   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
929   GNUNET_assert (size <= sizeof(value));
930   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
931        (hashSize != sizeof (GNUNET_HashCode)) )
932     {
933       GNUNET_break (0);
934       goto END_SET;
935     }     
936 #if DEBUG_MYSQL
937   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
939               (unsigned int) size,
940               GNUNET_h2s (&key),
941               priority,
942               anonymity,
943               exp);
944 #endif
945   expiration.abs_value = exp;
946   ret = nrc->dviter (nrc->dviter_cls, nrc,
947                      &key,
948                      size, value,
949                      type, priority, anonymity, expiration,
950                      uid);
951   if (ret == GNUNET_SYSERR)
952     {
953       nrc->end_it = GNUNET_YES;
954       return;
955     }
956   if (ret == GNUNET_NO)
957     {
958       do_delete_entry (plugin, uid);
959       if (size != 0)
960         plugin->env->duc (plugin->env->cls,
961                           - size);
962     }
963   return;
964  END_SET:
965   /* call dviter with "end of set" */
966   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
967   nrc->dviter (nrc->dviter_cls, 
968                NULL, NULL, 0, NULL, 0, 0, 0, 
969                GNUNET_TIME_UNIT_ZERO_ABS, 0);
970   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
971   nrc->prep (nrc->prep_cls, NULL);
972   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
973   GNUNET_free (nrc);
974 }
975
976
977 /**
978  * Function invoked on behalf of a "PluginIterator"
979  * asking the database plugin to call the iterator
980  * with the next item.
981  *
982  * @param next_cls whatever argument was given
983  *        to the PluginIterator as "next_cls".
984  * @param end_it set to GNUNET_YES if we
985  *        should terminate the iteration early
986  *        (iterator should be still called once more
987  *         to signal the end of the iteration).
988  */
989 static void 
990 mysql_plugin_next_request (void *next_cls,
991                            int end_it)
992 {
993   struct NextRequestClosure *nrc = next_cls;
994
995   if (GNUNET_YES == end_it)
996     nrc->end_it = GNUNET_YES;
997   nrc->plugin->next_task_nc = nrc;
998   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
999                                                      nrc);
1000 }  
1001
1002
1003 /**
1004  * Get an estimate of how much space the database is
1005  * currently using.
1006  *
1007  * @param cls our "struct Plugin *"
1008  * @return number of bytes used on disk
1009  */
1010 static unsigned long long
1011 mysql_plugin_get_size (void *cls)
1012 {
1013   struct Plugin *plugin = cls;
1014   MYSQL_BIND cbind[1];
1015   long long total;
1016
1017   memset (cbind, 0, sizeof (cbind));
1018   total = 0;
1019   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1020   cbind[0].buffer = &total;
1021   cbind[0].is_unsigned = GNUNET_NO;
1022   if (GNUNET_OK != 
1023       prepared_statement_run_select (plugin,
1024                                      plugin->get_size,
1025                                      1, cbind, 
1026                                      &return_ok, NULL,
1027                                      -1))
1028     return 0;
1029   return total;
1030 }
1031
1032
1033 /**
1034  * Store an item in the datastore.
1035  *
1036  * @param cls closure
1037  * @param key key for the item
1038  * @param size number of bytes in data
1039  * @param data content stored
1040  * @param type type of the content
1041  * @param priority priority of the content
1042  * @param anonymity anonymity-level for the content
1043  * @param replication replication-level for the content
1044  * @param expiration expiration time for the content
1045  * @param msg set to error message
1046  * @return GNUNET_OK on success
1047  */
1048 static int
1049 mysql_plugin_put (void *cls,
1050                   const GNUNET_HashCode * key,
1051                   uint32_t size,
1052                   const void *data,
1053                   enum GNUNET_BLOCK_Type type,
1054                   uint32_t priority,
1055                   uint32_t anonymity,
1056                   uint32_t replication,
1057                   struct GNUNET_TIME_Absolute expiration,
1058                   char **msg)
1059 {
1060   struct Plugin *plugin = cls;
1061   unsigned int irepl = replication;
1062   unsigned int itype = type;
1063   unsigned int ipriority = priority;
1064   unsigned int ianonymity = anonymity;
1065   unsigned long long lexpiration = expiration.abs_value;
1066   unsigned long hashSize;
1067   unsigned long hashSize2;
1068   unsigned long lsize;
1069   GNUNET_HashCode vhash;
1070
1071   if (size > MAX_DATUM_SIZE)
1072     {
1073       GNUNET_break (0);
1074       return GNUNET_SYSERR;
1075     }
1076   hashSize = sizeof (GNUNET_HashCode);
1077   hashSize2 = sizeof (GNUNET_HashCode);
1078   lsize = size;
1079   GNUNET_CRYPTO_hash (data, size, &vhash);
1080   if (GNUNET_OK !=
1081       prepared_statement_run (plugin,
1082                               plugin->insert_entry,
1083                               NULL,
1084                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
1085                               MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1086                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
1087                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
1088                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
1089                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1090                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
1091                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
1092                               -1))
1093     return GNUNET_SYSERR;    
1094 #if DEBUG_MYSQL
1095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096               "Inserted value `%s' with size %u into gn090 table\n",
1097               GNUNET_h2s (key),
1098               (unsigned int) size);
1099 #endif
1100   if (size > 0)
1101     plugin->env->duc (plugin->env->cls,
1102                       size);
1103   return GNUNET_OK;
1104 }
1105
1106
1107 /**
1108  * Update the priority for a particular key in the datastore.  If
1109  * the expiration time in value is different than the time found in
1110  * the datastore, the higher value should be kept.  For the
1111  * anonymity level, the lower value is to be used.  The specified
1112  * priority should be added to the existing priority, ignoring the
1113  * priority in value.
1114  *
1115  * Note that it is possible for multiple values to match this put.
1116  * In that case, all of the respective values are updated.
1117  *
1118  * @param cls our "struct Plugin*"
1119  * @param uid unique identifier of the datum
1120  * @param delta by how much should the priority
1121  *     change?  If priority + delta < 0 the
1122  *     priority should be set to 0 (never go
1123  *     negative).
1124  * @param expire new expiration time should be the
1125  *     MAX of any existing expiration time and
1126  *     this value
1127  * @param msg set to error message
1128  * @return GNUNET_OK on success
1129  */
1130 static int
1131 mysql_plugin_update (void *cls,
1132                      uint64_t uid,
1133                      int delta, 
1134                      struct GNUNET_TIME_Absolute expire,
1135                      char **msg)
1136 {
1137   struct Plugin *plugin = cls;
1138   unsigned long long vkey = uid;
1139   unsigned long long lexpire = expire.abs_value;
1140   int ret;
1141
1142 #if DEBUG_MYSQL
1143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1144               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
1145               vkey,
1146               delta,
1147               lexpire);
1148 #endif
1149   ret = prepared_statement_run (plugin,
1150                                 plugin->update_entry,
1151                                 NULL,
1152                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
1153                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1154                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1155                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
1156                                 -1);
1157   if (ret != GNUNET_OK)
1158     {
1159       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1160                   "Failed to update value %llu\n",
1161                   vkey);
1162     }
1163   return ret;
1164 }
1165
1166
1167 struct GetContext
1168 {
1169   GNUNET_HashCode key;
1170   GNUNET_HashCode vhash;
1171
1172   unsigned int prio;
1173   unsigned int anonymity;
1174   unsigned long long expiration;
1175   unsigned long long vkey;
1176   unsigned long long total;
1177   unsigned int off;
1178   unsigned int count;
1179   int have_vhash;
1180 };
1181
1182
1183 static int
1184 get_statement_prepare (void *cls,
1185                        struct NextRequestClosure *nrc)
1186 {
1187   struct GetContext *gc = cls;
1188   struct Plugin *plugin;
1189   int ret;
1190   unsigned long hashSize;
1191   
1192   if (NULL == nrc)
1193     {
1194       GNUNET_free (gc);
1195       return GNUNET_NO;
1196     }
1197   if (gc->count == gc->total)
1198     return GNUNET_NO;
1199   plugin = nrc->plugin;
1200   hashSize = sizeof (GNUNET_HashCode);
1201   if (++gc->off >= gc->total)
1202     gc->off = 0;
1203 #if DEBUG_MYSQL
1204   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205               "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
1206               gc->count+1,
1207               gc->total,
1208               gc->off,
1209               GNUNET_h2s (&gc->key));  
1210 #endif
1211   if (nrc->type != 0)
1212     {
1213       if (gc->have_vhash)
1214         {
1215           ret = prepared_statement_run_select (plugin,
1216                                                plugin->select_entry_by_hash_vhash_and_type, 
1217                                                7, nrc->rbind, 
1218                                                &return_ok, NULL, 
1219                                                MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1220                                                MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1221                                                MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1222                                                MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1223                                                -1);
1224         }
1225       else
1226         {
1227           ret =
1228             prepared_statement_run_select (plugin,
1229                                            plugin->select_entry_by_hash_and_type, 
1230                                            7, nrc->rbind, 
1231                                            &return_ok, NULL,
1232                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1233                                            MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1234                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1235                                            -1);
1236         }
1237     }
1238   else
1239     {
1240       if (gc->have_vhash)
1241         {
1242           ret =
1243             prepared_statement_run_select (plugin,
1244                                            plugin->select_entry_by_hash_and_vhash, 
1245                                            7, nrc->rbind, 
1246                                            &return_ok, NULL,
1247                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
1248                                            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
1249                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1250                                            -1);
1251         }
1252       else
1253         {
1254           ret =
1255             prepared_statement_run_select (plugin,
1256                                            plugin->select_entry_by_hash, 
1257                                            7, nrc->rbind, 
1258                                            &return_ok, NULL,
1259                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1260                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1261                                            -1);
1262         }
1263     }
1264   gc->count++;
1265   return ret;
1266 }
1267
1268
1269 /**
1270  * Iterate over the results for a particular key in the datastore.
1271  *
1272  * @param cls closure
1273  * @param key maybe NULL (to match all entries)
1274  * @param vhash hash of the value, maybe NULL (to
1275  *        match all values that have the right key).
1276  *        Note that for DBlocks there is no difference
1277  *        betwen key and vhash, but for other blocks
1278  *        there may be!
1279  * @param type entries of which type are relevant?
1280  *        Use 0 for any type.
1281  * @param iter function to call on each matching value;
1282  *        will be called once with a NULL value at the end
1283  * @param iter_cls closure for iter
1284  */
1285 static void
1286 mysql_plugin_get (void *cls,
1287                   const GNUNET_HashCode *key,
1288                   const GNUNET_HashCode *vhash,
1289                   enum GNUNET_BLOCK_Type type,
1290                   PluginIterator iter, void *iter_cls)
1291 {
1292   struct Plugin *plugin = cls;
1293   unsigned int itype = type;
1294   int ret;
1295   MYSQL_BIND cbind[1];
1296   struct GetContext *gc;
1297   struct NextRequestClosure *nrc;
1298   long long total;
1299   unsigned long hashSize;
1300
1301   GNUNET_assert (key != NULL);
1302   if (iter == NULL) 
1303     return;
1304   hashSize = sizeof (GNUNET_HashCode);
1305   memset (cbind, 0, sizeof (cbind));
1306   total = -1;
1307   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1308   cbind[0].buffer = &total;
1309   cbind[0].is_unsigned = GNUNET_NO;
1310   if (type != 0)
1311     {
1312       if (vhash != NULL)
1313         {
1314           ret =
1315             prepared_statement_run_select (plugin,
1316                                            plugin->count_entry_by_hash_vhash_and_type, 
1317                                            1, cbind, 
1318                                            &return_ok, NULL,
1319                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1320                                            MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1321                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1322                                            -1);
1323         }
1324       else
1325         {
1326           ret =
1327             prepared_statement_run_select (plugin,
1328                                            plugin->count_entry_by_hash_and_type, 
1329                                            1, cbind, 
1330                                            &return_ok, NULL,
1331                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1332                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1333                                            -1);
1334         }
1335     }
1336   else
1337     {
1338       if (vhash != NULL)
1339         {
1340           ret =
1341             prepared_statement_run_select (plugin,
1342                                            plugin->count_entry_by_hash_and_vhash, 
1343                                            1, cbind,
1344                                            &return_ok, NULL,
1345                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1346                                            MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1347                                            -1);
1348
1349         }
1350       else
1351         {
1352           ret =
1353             prepared_statement_run_select (plugin,
1354                                            plugin->count_entry_by_hash,
1355                                            1, cbind, 
1356                                            &return_ok, NULL, 
1357                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1358                                            -1);
1359         }
1360     }
1361   if ((ret != GNUNET_OK) || (0 >= total))
1362     {
1363       iter (iter_cls, 
1364             NULL, NULL, 0, NULL, 0, 0, 0, 
1365             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1366       return;
1367     }
1368 #if DEBUG_MYSQL
1369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370               "Iterating over %lld results for GET `%s'\n",
1371               total,
1372               GNUNET_h2s (key));
1373 #endif
1374   gc = GNUNET_malloc (sizeof (struct GetContext));
1375   gc->key = *key;
1376   if (vhash != NULL)
1377     {
1378       gc->have_vhash = GNUNET_YES;
1379       gc->vhash = *vhash;
1380     }
1381   gc->total = total;
1382   gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1383   
1384
1385   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1386   nrc->plugin = plugin;
1387   nrc->type = type;  
1388   nrc->dviter = iter;
1389   nrc->dviter_cls = iter_cls;
1390   nrc->prep = &get_statement_prepare;
1391   nrc->prep_cls = gc;
1392   mysql_plugin_next_request (nrc, GNUNET_NO);
1393 }
1394
1395
1396 /**
1397  * Run the prepared statement to get the next data item ready.
1398  * 
1399  * @param cls not used
1400  * @param nrc closure for the next request iterator
1401  * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1402  */
1403 static int
1404 iterator_zero_prepare (void *cls,
1405                        struct NextRequestClosure *nrc)
1406 {
1407   struct Plugin *plugin;
1408   int ret;
1409
1410   if (nrc == NULL)
1411     return GNUNET_NO;
1412   plugin = nrc->plugin;
1413   ret = prepared_statement_run_select (plugin,
1414                                        plugin->zero_iter,
1415                                        7, nrc->rbind,
1416                                        &return_ok, NULL,
1417                                        MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
1418                                        -1);
1419   nrc->count++;
1420   return ret;
1421 }
1422
1423
1424 /**
1425  * Select a subset of the items in the datastore and call
1426  * the given iterator for each of them.
1427  *
1428  * @param cls our "struct Plugin*"
1429  * @param type entries of which type should be considered?
1430  *        Use 0 for any type.
1431  * @param iter function to call on each matching value;
1432  *        will be called once with a NULL value at the end
1433  * @param iter_cls closure for iter
1434  */
1435 static void
1436 mysql_plugin_iter_zero_anonymity (void *cls,
1437                                   enum GNUNET_BLOCK_Type type,
1438                                   PluginIterator iter,
1439                                   void *iter_cls)
1440 {
1441   struct Plugin *plugin = cls;
1442   struct NextRequestClosure *nrc;
1443
1444   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1445   nrc->plugin = plugin;
1446   nrc->type = type;  
1447   nrc->dviter = iter;
1448   nrc->dviter_cls = iter_cls;
1449   nrc->prep = &iterator_zero_prepare;
1450   mysql_plugin_next_request (nrc, GNUNET_NO);
1451 }
1452
1453
1454 /**
1455  * Run the SELECT statement for the replication function.
1456  * 
1457  * @param cls the 'struct Plugin'
1458  * @param nrc the context (not used)
1459  */
1460 static int
1461 replication_prepare (void *cls,
1462                      struct NextRequestClosure *nrc)
1463 {
1464   struct Plugin *plugin = cls;
1465   unsigned long long nt;
1466
1467   nt = (unsigned long long) nrc->now.abs_value;
1468   return prepared_statement_run_select (plugin,
1469                                         plugin->select_replication, 
1470                                         6, nrc->rbind, 
1471                                         &return_ok, NULL,
1472                                         MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1473                                         -1);
1474 }
1475
1476
1477 /**
1478  * Get a random item for replication.  Returns a single, not expired, random item
1479  * from those with the highest replication counters.  The item's 
1480  * replication counter is decremented by one IF it was positive before.
1481  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1482  *
1483  * @param cls closure
1484  * @param iter function to call the value (once only).
1485  * @param iter_cls closure for iter
1486  */
1487 static void
1488 mysql_plugin_replication_get (void *cls,
1489                               PluginIterator iter, void *iter_cls)
1490 {
1491   struct Plugin *plugin = cls;
1492   struct NextRequestClosure nrc;
1493
1494   memset (&nrc, 0, sizeof (nrc));
1495   nrc.plugin = plugin;
1496   nrc.now = GNUNET_TIME_absolute_get ();
1497   nrc.prep = &replication_prepare;
1498   nrc.prep_cls = plugin;
1499   nrc.type = 0;
1500   nrc.dviter = iter;
1501   nrc.dviter_cls = iter_cls;
1502   nrc.end_it = GNUNET_NO;
1503   mysql_next_request_cont (&nrc, NULL);
1504 }
1505
1506
1507 /**
1508  * Run the SELECT statement for the expiration function.
1509  * 
1510  * @param cls the 'struct Plugin'
1511  * @param nrc the context (not used)
1512  */
1513 static int
1514 expiration_prepare (void *cls,
1515                     struct NextRequestClosure *nrc)
1516 {
1517   struct Plugin *plugin = cls;
1518   long long nt;
1519
1520   nt = (long long) nrc->now.abs_value;
1521   return prepared_statement_run_select
1522     (plugin,
1523      plugin->select_expiration, 
1524      6, nrc->rbind, 
1525      &return_ok, NULL,
1526      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1527      -1);
1528 }
1529
1530
1531 /**
1532  * Get a random item for expiration.
1533  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1534  *
1535  * @param cls closure
1536  * @param iter function to call the value (once only).
1537  * @param iter_cls closure for iter
1538  */
1539 static void
1540 mysql_plugin_expiration_get (void *cls,
1541                              PluginIterator iter, void *iter_cls)
1542 {
1543   struct Plugin *plugin = cls;
1544   struct NextRequestClosure nrc;
1545
1546   memset (&nrc, 0, sizeof (nrc));
1547   nrc.plugin = plugin;
1548   nrc.now = GNUNET_TIME_absolute_get ();
1549   nrc.prep = &expiration_prepare;
1550   nrc.prep_cls = plugin;
1551   nrc.type = 0;
1552   nrc.dviter = iter;
1553   nrc.dviter_cls = iter_cls;
1554   nrc.end_it = GNUNET_NO;
1555   mysql_next_request_cont (&nrc, NULL);
1556 }
1557
1558
1559 /**
1560  * Drop database.
1561  *
1562  * @param cls the "struct Plugin*"
1563  */
1564 static void 
1565 mysql_plugin_drop (void *cls)
1566 {
1567   struct Plugin *plugin = cls;
1568
1569   if ((GNUNET_OK != run_statement (plugin,
1570                                    "DROP TABLE gn090")) ||
1571       (GNUNET_OK != run_statement (plugin,
1572                                    "DROP TABLE gn072")))
1573     return;           /* error */
1574   plugin->env->duc (plugin->env->cls, 0);
1575 }
1576
1577
1578 /**
1579  * Entry point for the plugin.
1580  *
1581  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1582  * @return our "struct Plugin*"
1583  */
1584 void *
1585 libgnunet_plugin_datastore_mysql_init (void *cls)
1586 {
1587   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1588   struct GNUNET_DATASTORE_PluginFunctions *api;
1589   struct Plugin *plugin;
1590
1591   plugin = GNUNET_malloc (sizeof (struct Plugin));
1592   plugin->env = env;
1593   plugin->cnffile = get_my_cnf_path (env->cfg);
1594   if (GNUNET_OK != iopen (plugin))
1595     {
1596       iclose (plugin);
1597       GNUNET_free_non_null (plugin->cnffile);
1598       GNUNET_free (plugin);
1599       return NULL;
1600     }
1601 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1602 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1603   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1604              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1605              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1606              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1607              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1608              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1609              " hash BINARY(64) NOT NULL DEFAULT '',"
1610              " vhash BINARY(64) NOT NULL DEFAULT '',"
1611              " value BLOB NOT NULL DEFAULT ''"
1612              " uid BIGINT NOT NULL AUTO_INCREMENT"
1613              " PRIMARY KEY (uid)"
1614              " INDEX idx_hash (hash(64)),"
1615              " INDEX idx_hash_uid (hash(64),uid),"
1616              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1617              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1618              " INDEX idx_prio (prio),"
1619              " INDEX idx_repl (repl),"
1620              " INDEX idx_expire_prio (expire,prio),"
1621              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1622              ") ENGINE=InnoDB") ||
1623       MRUNS ("SET AUTOCOMMIT = 1") ||
1624       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1625       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1626       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1627       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1628       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1629       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1630                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1631       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1632       || PINIT (plugin->get_size, SELECT_SIZE)
1633       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1634       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1635       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1636                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1637       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1638       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1639       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1640       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1641     {
1642       iclose (plugin);
1643       GNUNET_free_non_null (plugin->cnffile);
1644       GNUNET_free (plugin);
1645       return NULL;
1646     }
1647 #undef PINIT
1648 #undef MRUNS
1649
1650   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1651   api->cls = plugin;
1652   api->get_size = &mysql_plugin_get_size;
1653   api->put = &mysql_plugin_put;
1654   api->next_request = &mysql_plugin_next_request;
1655   api->get = &mysql_plugin_get;
1656   api->replication_get = &mysql_plugin_replication_get;
1657   api->expiration_get = &mysql_plugin_expiration_get;
1658   api->update = &mysql_plugin_update;
1659   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1660   api->drop = &mysql_plugin_drop;
1661   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1662                    "mysql", _("Mysql database running\n"));
1663   return api;
1664 }
1665
1666
1667 /**
1668  * Exit point from the plugin.
1669  * @param cls our "struct Plugin*"
1670  * @return always NULL
1671  */
1672 void *
1673 libgnunet_plugin_datastore_mysql_done (void *cls)
1674 {
1675   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1676   struct Plugin *plugin = api->cls;
1677
1678   iclose (plugin);
1679   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1680     {
1681       GNUNET_SCHEDULER_cancel (plugin->next_task);
1682       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1683       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1684       GNUNET_free (plugin->next_task_nc);
1685       plugin->next_task_nc = NULL;
1686     }
1687   GNUNET_free_non_null (plugin->cnffile);
1688   GNUNET_free (plugin);
1689   GNUNET_free (api);
1690   mysql_library_end ();
1691   return NULL;
1692 }
1693
1694 /* end of plugin_datastore_mysql.c */