fixes
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163 /**
164  * Context for the universal iterator.
165  */
166 struct NextRequestClosure;
167
168 /**
169  * Type of a function that will prepare
170  * the next iteration.
171  *
172  * @param cls closure
173  * @param nc the next context; NULL for the last
174  *         call which gives the callback a chance to
175  *         clean up the closure
176  * @return GNUNET_OK on success, GNUNET_NO if there are
177  *         no more values, GNUNET_SYSERR on error
178  */
179 typedef int (*PrepareFunction)(void *cls,
180                                struct NextRequestClosure *nc);
181
182
183 struct NextRequestClosure
184 {
185   struct Plugin *plugin;
186
187   struct GNUNET_TIME_Absolute now;
188
189   /**
190    * Function to call to prepare the next
191    * iteration.
192    */
193   PrepareFunction prep;
194
195   /**
196    * Closure for prep.
197    */
198   void *prep_cls;
199
200   MYSQL_BIND rbind[7];
201
202   enum GNUNET_BLOCK_Type type;
203
204   PluginIterator dviter;
205
206   void *dviter_cls;
207
208   unsigned int count;
209
210   int end_it;
211
212   int one_shot;
213 };
214
215
216 /**
217  * Context for all functions in this plugin.
218  */
219 struct Plugin 
220 {
221   /**
222    * Our execution environment.
223    */
224   struct GNUNET_DATASTORE_PluginEnvironment *env;
225
226   /**
227    * Handle to talk to MySQL.
228    */
229   MYSQL *dbf;
230   
231   /**
232    * We keep all prepared statements in a DLL.  This is the head.
233    */
234   struct GNUNET_MysqlStatementHandle *shead;
235
236   /**
237    * We keep all prepared statements in a DLL.  This is the tail.
238    */
239   struct GNUNET_MysqlStatementHandle *stail;
240
241   /**
242    * Filename of "my.cnf" (msyql configuration).
243    */
244   char *cnffile;
245
246   /**
247    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
248    */
249   struct NextRequestClosure *next_task_nc;
250
251   /**
252    * Pending task with scheduler for running the next request.
253    */
254   GNUNET_SCHEDULER_TaskIdentifier next_task;
255
256   /**
257    * Prepared statements.
258    */
259 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
260   struct GNUNET_MysqlStatementHandle *insert_entry;
261   
262 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
263   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
264
265 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 WHERE hash=?"
266   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
267   
268 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
269   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
270
271 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?"
272   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
273
274 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
275   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
276  
277 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND type=?"
278   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
279
280 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
281   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
282  
283 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?"
284   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
285   
286 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
287   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
288
289 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
290   struct GNUNET_MysqlStatementHandle *update_entry;
291
292 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
293   struct GNUNET_MysqlStatementHandle *dec_repl;
294
295 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
296   struct GNUNET_MysqlStatementHandle *get_size;
297
298 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
299   struct GNUNET_MysqlStatementHandle *zero_iter;
300
301 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
302   "UNION "\
303   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY prio ASC LIMIT 1) "\
304   "ORDER BY expire ASC LIMIT 1"
305   struct GNUNET_MysqlStatementHandle *select_expiration;
306
307 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY repl DESC,RAND() LIMIT 1"
308   struct GNUNET_MysqlStatementHandle *select_replication;
309
310 };
311
312
313 /**
314  * Obtain the location of ".my.cnf".
315  *
316  * @param cfg our configuration
317  * @return NULL on error
318  */
319 static char *
320 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
321 {
322   char *cnffile;
323   char *home_dir;
324   struct stat st;
325 #ifndef WINDOWS
326   struct passwd *pw;
327 #endif
328   int configured;
329
330 #ifndef WINDOWS
331   pw = getpwuid (getuid ());
332   if (!pw)
333     {
334       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
335                            "getpwuid");
336       return NULL;
337     }
338   if (GNUNET_YES ==
339       GNUNET_CONFIGURATION_have_value (cfg,
340                                        "datastore-mysql", "CONFIG"))
341     {
342       GNUNET_assert (GNUNET_OK == 
343                      GNUNET_CONFIGURATION_get_value_filename (cfg,
344                                                               "datastore-mysql", "CONFIG", &cnffile));
345       configured = GNUNET_YES;
346     }
347   else
348     {
349       home_dir = GNUNET_strdup (pw->pw_dir);
350 #else
351       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
352       plibc_conv_to_win_path ("~/", home_dir);
353 #endif
354       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
355       GNUNET_free (home_dir);
356       configured = GNUNET_NO;
357     }
358   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
359               _("Trying to use file `%s' for MySQL configuration.\n"),
360               cnffile);
361   if ((0 != STAT (cnffile, &st)) ||
362       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
363     {
364       if (configured == GNUNET_YES)
365         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
366                     _("Could not access file `%s': %s\n"), cnffile,
367                     STRERROR (errno));
368       GNUNET_free (cnffile);
369       return NULL;
370     }
371   return cnffile;
372 }
373
374
375
376 /**
377  * Free a prepared statement.
378  *
379  * @param plugin plugin context
380  * @param s prepared statement
381  */
382 static void
383 prepared_statement_destroy (struct Plugin *plugin, 
384                             struct GNUNET_MysqlStatementHandle
385                             *s)
386 {
387   GNUNET_CONTAINER_DLL_remove (plugin->shead,
388                                plugin->stail,
389                                s);
390   if (s->valid)
391     mysql_stmt_close (s->statement);
392   GNUNET_free (s->query);
393   GNUNET_free (s);
394 }
395
396
397 /**
398  * Close database connection and all prepared statements (we got a DB
399  * disconnect error).
400  */
401 static int
402 iclose (struct Plugin *plugin)
403 {
404   struct GNUNET_MysqlStatementHandle *spos;
405
406   spos = plugin->shead;
407   while (NULL != plugin->shead)
408     prepared_statement_destroy (plugin,
409                                 plugin->shead);
410   if (plugin->dbf != NULL)
411     {
412       mysql_close (plugin->dbf);
413       plugin->dbf = NULL;
414     }
415   return GNUNET_OK;
416 }
417
418
419 /**
420  * Open the connection with the database (and initialize
421  * our default options).
422  *
423  * @return GNUNET_OK on success
424  */
425 static int
426 iopen (struct Plugin *ret)
427 {
428   char *mysql_dbname;
429   char *mysql_server;
430   char *mysql_user;
431   char *mysql_password;
432   unsigned long long mysql_port;
433   my_bool reconnect;
434   unsigned int timeout;
435
436   ret->dbf = mysql_init (NULL);
437   if (ret->dbf == NULL)
438     return GNUNET_SYSERR;
439   if (ret->cnffile != NULL)
440     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
441   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
442   reconnect = 0;
443   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
444   mysql_options (ret->dbf,
445                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
446   mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
447   timeout = 60; /* in seconds */
448   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
449   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
450   mysql_dbname = NULL;
451   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
452                                                      "datastore-mysql", "DATABASE"))
453     GNUNET_assert (GNUNET_OK == 
454                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
455                                                           "datastore-mysql", "DATABASE", 
456                                                           &mysql_dbname));
457   else
458     mysql_dbname = GNUNET_strdup ("gnunet");
459   mysql_user = NULL;
460   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
461                                                      "datastore-mysql", "USER"))
462     {
463       GNUNET_assert (GNUNET_OK == 
464                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
465                                                            "datastore-mysql", "USER", 
466                                                            &mysql_user));
467     }
468   mysql_password = NULL;
469   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
470                                                      "datastore-mysql", "PASSWORD"))
471     {
472       GNUNET_assert (GNUNET_OK ==
473                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
474                                                            "datastore-mysql", "PASSWORD",
475                                                            &mysql_password));
476     }
477   mysql_server = NULL;
478   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
479                                                      "datastore-mysql", "HOST"))
480     {
481       GNUNET_assert (GNUNET_OK == 
482                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
483                                                            "datastore-mysql", "HOST", 
484                                                            &mysql_server));
485     }
486   mysql_port = 0;
487   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
488                                                      "datastore-mysql", "PORT"))
489     {
490       GNUNET_assert (GNUNET_OK ==
491                     GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
492                                                            "PORT", &mysql_port));
493     }
494
495   GNUNET_assert (mysql_dbname != NULL);
496   mysql_real_connect (ret->dbf, 
497                       mysql_server, 
498                       mysql_user, mysql_password,
499                       mysql_dbname, 
500                       (unsigned int) mysql_port, NULL,
501                       CLIENT_IGNORE_SIGPIPE);
502   GNUNET_free_non_null (mysql_server);
503   GNUNET_free_non_null (mysql_user);
504   GNUNET_free_non_null (mysql_password);
505   GNUNET_free (mysql_dbname);
506   if (mysql_error (ret->dbf)[0])
507     {
508       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
509                  "mysql_real_connect", ret);
510       return GNUNET_SYSERR;
511     }
512   return GNUNET_OK;
513 }
514
515
516 /**
517  * Run the given MySQL statement.
518  *
519  * @param plugin plugin context
520  * @param statement SQL statement to run
521  * @return GNUNET_OK on success, GNUNET_SYSERR on error
522  */
523 static int
524 run_statement (struct Plugin *plugin,
525                const char *statement)
526 {
527   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
528     return GNUNET_SYSERR;
529   mysql_query (plugin->dbf, statement);
530   if (mysql_error (plugin->dbf)[0])
531     {
532       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
533                  "mysql_query", plugin);
534       iclose (plugin);
535       return GNUNET_SYSERR;
536     }
537   return GNUNET_OK;
538 }
539
540
541 /**
542  * Create a prepared statement.
543  *
544  * @param plugin plugin context
545  * @param statement SQL statement text to prepare
546  * @return NULL on error
547  */
548 static struct GNUNET_MysqlStatementHandle *
549 prepared_statement_create (struct Plugin *plugin, 
550                            const char *statement)
551 {
552   struct GNUNET_MysqlStatementHandle *ret;
553
554   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
555   ret->query = GNUNET_strdup (statement);
556   GNUNET_CONTAINER_DLL_insert (plugin->shead,
557                                plugin->stail,
558                                ret);
559   return ret;
560 }
561
562
563 /**
564  * Prepare a statement for running.
565  *
566  * @param plugin plugin context
567  * @param ret handle to prepared statement
568  * @return GNUNET_OK on success
569  */
570 static int
571 prepare_statement (struct Plugin *plugin, 
572                    struct GNUNET_MysqlStatementHandle *ret)
573 {
574   if (GNUNET_YES == ret->valid)
575     return GNUNET_OK;
576   if ((NULL == plugin->dbf) && 
577       (GNUNET_OK != iopen (plugin)))
578     return GNUNET_SYSERR;
579   ret->statement = mysql_stmt_init (plugin->dbf);
580   if (ret->statement == NULL)
581     {
582       iclose (plugin);
583       return GNUNET_SYSERR;
584     }
585   if (mysql_stmt_prepare (ret->statement, 
586                           ret->query,
587                           strlen (ret->query)))
588     {
589       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
590                  "mysql_stmt_prepare", 
591                  plugin);
592       mysql_stmt_close (ret->statement);
593       ret->statement = NULL;
594       iclose (plugin);
595       return GNUNET_SYSERR;
596     }
597   ret->valid = GNUNET_YES;
598   return GNUNET_OK;
599
600 }
601
602
603 /**
604  * Bind the parameters for the given MySQL statement
605  * and run it.
606  *
607  * @param plugin plugin context
608  * @param s statement to bind and run
609  * @param ap arguments for the binding
610  * @return GNUNET_SYSERR on error, GNUNET_OK on success
611  */
612 static int
613 init_params (struct Plugin *plugin,
614              struct GNUNET_MysqlStatementHandle *s,
615              va_list ap)
616 {
617   MYSQL_BIND qbind[MAX_PARAM];
618   unsigned int pc;
619   unsigned int off;
620   enum enum_field_types ft;
621
622   pc = mysql_stmt_param_count (s->statement);
623   if (pc > MAX_PARAM)
624     {
625       /* increase internal constant! */
626       GNUNET_break (0);
627       return GNUNET_SYSERR;
628     }
629   memset (qbind, 0, sizeof (qbind));
630   off = 0;
631   ft = 0;
632   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
633     {
634       qbind[off].buffer_type = ft;
635       switch (ft)
636         {
637         case MYSQL_TYPE_FLOAT:
638           qbind[off].buffer = va_arg (ap, float *);
639           break;
640         case MYSQL_TYPE_LONGLONG:
641           qbind[off].buffer = va_arg (ap, unsigned long long *);
642           qbind[off].is_unsigned = va_arg (ap, int);
643           break;
644         case MYSQL_TYPE_LONG:
645           qbind[off].buffer = va_arg (ap, unsigned int *);
646           qbind[off].is_unsigned = va_arg (ap, int);
647           break;
648         case MYSQL_TYPE_VAR_STRING:
649         case MYSQL_TYPE_STRING:
650         case MYSQL_TYPE_BLOB:
651           qbind[off].buffer = va_arg (ap, void *);
652           qbind[off].buffer_length = va_arg (ap, unsigned long);
653           qbind[off].length = va_arg (ap, unsigned long *);
654           break;
655         default:
656           /* unsupported type */
657           GNUNET_break (0);
658           return GNUNET_SYSERR;
659         }
660       pc--;
661       off++;
662     }
663   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
664     {
665       GNUNET_assert (0);
666       return GNUNET_SYSERR;
667     }
668   if (mysql_stmt_bind_param (s->statement, qbind))
669     {
670       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671                   _("`%s' failed at %s:%d with error: %s\n"),
672                   "mysql_stmt_bind_param",
673                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
674       iclose (plugin);
675       return GNUNET_SYSERR;
676     }
677   if (mysql_stmt_execute (s->statement))
678     {
679       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
680                   _("`%s' failed at %s:%d with error: %s\n"),
681                   "mysql_stmt_execute",
682                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
683       iclose (plugin);
684       return GNUNET_SYSERR;
685     }
686   return GNUNET_OK;
687 }
688
689 /**
690  * Type of a callback that will be called for each
691  * data set returned from MySQL.
692  *
693  * @param cls user-defined argument
694  * @param num_values number of elements in values
695  * @param values values returned by MySQL
696  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
697  */
698 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
699                                           unsigned int num_values,
700                                           MYSQL_BIND *values);
701
702
703 /**
704  * Run a prepared SELECT statement.
705  *
706  * @param plugin plugin context
707  * @param s statement to run
708  * @param result_size number of elements in results array
709  * @param results pointer to already initialized MYSQL_BIND
710  *        array (of sufficient size) for passing results
711  * @param processor function to call on each result
712  * @param processor_cls extra argument to processor
713  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
714  *        values (size + buffer-reference for pointers); terminated
715  *        with "-1"
716  * @return GNUNET_SYSERR on error, otherwise
717  *         the number of successfully affected (or queried) rows
718  */
719 static int
720 prepared_statement_run_select (struct Plugin *plugin,
721                                struct GNUNET_MysqlStatementHandle *s,
722                                unsigned int result_size,
723                                MYSQL_BIND *results,
724                                GNUNET_MysqlDataProcessor processor, void *processor_cls,
725                                ...)
726 {
727   va_list ap;
728   int ret;
729   unsigned int rsize;
730   int total;
731
732   if (GNUNET_OK != prepare_statement (plugin, s))
733     {
734       GNUNET_break (0);
735       return GNUNET_SYSERR;
736     }
737   va_start (ap, processor_cls);
738   if (GNUNET_OK != init_params (plugin, s, ap))
739     {
740       GNUNET_break (0);
741       va_end (ap);
742       return GNUNET_SYSERR;
743     }
744   va_end (ap);
745   rsize = mysql_stmt_field_count (s->statement);
746   if (rsize > result_size)
747     {
748       GNUNET_break (0);
749       return GNUNET_SYSERR;
750     }
751   if (mysql_stmt_bind_result (s->statement, results))
752     {
753       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
754                   _("`%s' failed at %s:%d with error: %s\n"),
755                   "mysql_stmt_bind_result",
756                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
757       iclose (plugin);
758       return GNUNET_SYSERR;
759     }
760
761   total = 0;
762   while (1)
763     {
764       ret = mysql_stmt_fetch (s->statement);
765       if (ret == MYSQL_NO_DATA)
766         break;
767       if (ret != 0)
768         {
769           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
770                       _("`%s' failed at %s:%d with error: %s\n"),
771                       "mysql_stmt_fetch",
772                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
773           iclose (plugin);
774           return GNUNET_SYSERR;
775         }
776       if (processor != NULL)
777         if (GNUNET_OK != processor (processor_cls, rsize, results))
778           break;
779       total++;
780     }
781   mysql_stmt_reset (s->statement);
782   return total;
783 }
784
785
786 /**
787  * Run a prepared statement that does NOT produce results.
788  *
789  * @param plugin plugin context
790  * @param s statement to run
791  * @param insert_id NULL or address where to store the row ID of whatever
792  *        was inserted (only for INSERT statements!)
793  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
794  *        values (size + buffer-reference for pointers); terminated
795  *        with "-1"
796  * @return GNUNET_SYSERR on error, otherwise
797  *         the number of successfully affected rows
798  */
799 static int
800 prepared_statement_run (struct Plugin *plugin,
801                         struct GNUNET_MysqlStatementHandle *s,
802                         unsigned long long *insert_id, ...)
803 {
804   va_list ap;
805   int affected;
806
807   if (GNUNET_OK != prepare_statement (plugin, s))
808     return GNUNET_SYSERR;
809   va_start (ap, insert_id);
810   if (GNUNET_OK != init_params (plugin, s, ap))
811     {
812       va_end (ap);
813       return GNUNET_SYSERR;
814     }
815   va_end (ap);
816   affected = mysql_stmt_affected_rows (s->statement);
817   if (NULL != insert_id)
818     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
819   mysql_stmt_reset (s->statement);
820   return affected;
821 }
822
823
824 /**
825  * Delete an entry from the gn090 table.
826  *
827  * @param plugin plugin context
828  * @param uid unique ID of the entry to delete
829  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
830  */
831 static int
832 do_delete_entry (struct Plugin *plugin,
833                  unsigned long long uid)
834 {
835   int ret;
836  
837 #if DEBUG_MYSQL
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "Deleting value %llu from gn090 table\n",
840               uid);
841 #endif
842   ret = prepared_statement_run (plugin,
843                                 plugin->delete_entry_by_uid,
844                                 NULL,
845                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
846                                 -1);
847   if (ret >= 0)
848     return GNUNET_OK;
849   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
850               "Deleting value %llu from gn090 table failed\n",
851               uid);
852   return ret;
853 }
854
855
856 /**
857  * Function that simply returns GNUNET_OK
858  *
859  * @param cls closure, not used
860  * @param num_values not used
861  * @param values not used
862  * @return GNUNET_OK
863  */
864 static int
865 return_ok (void *cls, 
866            unsigned int num_values, 
867            MYSQL_BIND *values)
868 {
869   return GNUNET_OK;
870 }
871
872
873 /**
874  * Get an estimate of how much space the database is
875  * currently using.
876  *
877  * @param cls our "struct Plugin *"
878  * @return number of bytes used on disk
879  */
880 static unsigned long long
881 mysql_plugin_get_size (void *cls)
882 {
883   struct Plugin *plugin = cls;
884   MYSQL_BIND cbind[1];
885   long long total;
886
887   memset (cbind, 0, sizeof (cbind));
888   total = 0;
889   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
890   cbind[0].buffer = &total;
891   cbind[0].is_unsigned = GNUNET_NO;
892   if (GNUNET_OK != 
893       prepared_statement_run_select (plugin,
894                                      plugin->get_size,
895                                      1, cbind, 
896                                      &return_ok, NULL,
897                                      -1))
898     return 0;
899   return total;
900 }
901
902
903 /**
904  * Store an item in the datastore.
905  *
906  * @param cls closure
907  * @param key key for the item
908  * @param size number of bytes in data
909  * @param data content stored
910  * @param type type of the content
911  * @param priority priority of the content
912  * @param anonymity anonymity-level for the content
913  * @param replication replication-level for the content
914  * @param expiration expiration time for the content
915  * @param msg set to error message
916  * @return GNUNET_OK on success
917  */
918 static int
919 mysql_plugin_put (void *cls,
920                   const GNUNET_HashCode * key,
921                   uint32_t size,
922                   const void *data,
923                   enum GNUNET_BLOCK_Type type,
924                   uint32_t priority,
925                   uint32_t anonymity,
926                   uint32_t replication,
927                   struct GNUNET_TIME_Absolute expiration,
928                   char **msg)
929 {
930   struct Plugin *plugin = cls;
931   unsigned int irepl = replication;
932   unsigned int itype = type;
933   unsigned int ipriority = priority;
934   unsigned int ianonymity = anonymity;
935   unsigned long long lexpiration = expiration.abs_value;
936   unsigned long hashSize;
937   unsigned long hashSize2;
938   unsigned long lsize;
939   GNUNET_HashCode vhash;
940
941   if (size > MAX_DATUM_SIZE)
942     {
943       GNUNET_break (0);
944       return GNUNET_SYSERR;
945     }
946   hashSize = sizeof (GNUNET_HashCode);
947   hashSize2 = sizeof (GNUNET_HashCode);
948   lsize = size;
949   GNUNET_CRYPTO_hash (data, size, &vhash);
950   if (GNUNET_OK !=
951       prepared_statement_run (plugin,
952                               plugin->insert_entry,
953                               NULL,
954                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
955                               MYSQL_TYPE_LONG, &itype, GNUNET_YES,
956                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
957                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
958                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
959                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
960                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
961                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
962                               -1))
963     return GNUNET_SYSERR;    
964 #if DEBUG_MYSQL
965   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
966               "Inserted value `%s' with size %u into gn090 table\n",
967               GNUNET_h2s (key),
968               (unsigned int) size);
969 #endif
970   if (size > 0)
971     plugin->env->duc (plugin->env->cls,
972                       size);
973   return GNUNET_OK;
974 }
975
976
977 /**
978  * Update the priority for a particular key in the datastore.  If
979  * the expiration time in value is different than the time found in
980  * the datastore, the higher value should be kept.  For the
981  * anonymity level, the lower value is to be used.  The specified
982  * priority should be added to the existing priority, ignoring the
983  * priority in value.
984  *
985  * Note that it is possible for multiple values to match this put.
986  * In that case, all of the respective values are updated.
987  *
988  * @param cls our "struct Plugin*"
989  * @param uid unique identifier of the datum
990  * @param delta by how much should the priority
991  *     change?  If priority + delta < 0 the
992  *     priority should be set to 0 (never go
993  *     negative).
994  * @param expire new expiration time should be the
995  *     MAX of any existing expiration time and
996  *     this value
997  * @param msg set to error message
998  * @return GNUNET_OK on success
999  */
1000 static int
1001 mysql_plugin_update (void *cls,
1002                      uint64_t uid,
1003                      int delta, 
1004                      struct GNUNET_TIME_Absolute expire,
1005                      char **msg)
1006 {
1007   struct Plugin *plugin = cls;
1008   unsigned long long vkey = uid;
1009   unsigned long long lexpire = expire.abs_value;
1010   int ret;
1011
1012 #if DEBUG_MYSQL
1013   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
1015               vkey,
1016               delta,
1017               lexpire);
1018 #endif
1019   ret = prepared_statement_run (plugin,
1020                                 plugin->update_entry,
1021                                 NULL,
1022                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
1023                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1024                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1025                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
1026                                 -1);
1027   if (ret != GNUNET_OK)
1028     {
1029       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1030                   "Failed to update value %llu\n",
1031                   vkey);
1032     }
1033   return ret;
1034 }
1035
1036
1037
1038
1039 /**
1040  * Continuation of "mysql_next_request".
1041  *
1042  * @param next_cls the next context
1043  * @param tc the task context (unused)
1044  */
1045 static void 
1046 mysql_next_request_cont (void *next_cls,
1047                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1048 {
1049   struct NextRequestClosure *nrc = next_cls;
1050   struct Plugin *plugin;
1051   int ret;
1052   unsigned int type;
1053   unsigned int priority;
1054   unsigned int anonymity;
1055   unsigned long long exp;
1056   unsigned long hashSize;
1057   unsigned long size;
1058   unsigned long long uid;
1059   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
1060   GNUNET_HashCode key;
1061   struct GNUNET_TIME_Absolute expiration;
1062   MYSQL_BIND *rbind = nrc->rbind;
1063
1064   plugin = nrc->plugin;
1065   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1066   plugin->next_task_nc = NULL;
1067
1068   if (GNUNET_YES == nrc->end_it) 
1069     goto END_SET;
1070   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1071   nrc->now = GNUNET_TIME_absolute_get ();
1072   hashSize = sizeof (GNUNET_HashCode);
1073   memset (nrc->rbind, 0, sizeof (nrc->rbind));
1074   rbind = nrc->rbind;
1075   rbind[0].buffer_type = MYSQL_TYPE_LONG;
1076   rbind[0].buffer = &type;
1077   rbind[0].is_unsigned = 1;
1078   rbind[1].buffer_type = MYSQL_TYPE_LONG;
1079   rbind[1].buffer = &priority;
1080   rbind[1].is_unsigned = 1;
1081   rbind[2].buffer_type = MYSQL_TYPE_LONG;
1082   rbind[2].buffer = &anonymity;
1083   rbind[2].is_unsigned = 1;
1084   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1085   rbind[3].buffer = &exp;
1086   rbind[3].is_unsigned = 1;
1087   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1088   rbind[4].buffer = &key;
1089   rbind[4].buffer_length = hashSize;
1090   rbind[4].length = &hashSize;
1091   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1092   rbind[5].buffer = value;
1093   rbind[5].buffer_length = size = sizeof (value);
1094   rbind[5].length = &size;
1095   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1096   rbind[6].buffer = &uid;
1097   rbind[6].is_unsigned = 1;
1098
1099   if (GNUNET_OK != nrc->prep (nrc->prep_cls,
1100                               nrc))
1101     goto END_SET;
1102   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1103   GNUNET_assert (size <= sizeof(value));
1104   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1105        (hashSize != sizeof (GNUNET_HashCode)) )
1106     {
1107       GNUNET_break (0);
1108       goto END_SET;
1109     }     
1110 #if DEBUG_MYSQL
1111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1113               (unsigned int) size,
1114               GNUNET_h2s (&key),
1115               priority,
1116               anonymity,
1117               exp);
1118 #endif
1119   expiration.abs_value = exp;
1120   ret = nrc->dviter (nrc->dviter_cls, 
1121                      (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
1122                      &key,
1123                      size, value,
1124                      type, priority, anonymity, expiration,
1125                      uid);
1126   if (ret == GNUNET_SYSERR)
1127     {
1128       nrc->end_it = GNUNET_YES;
1129       return;
1130     }
1131   if (ret == GNUNET_NO)
1132     {
1133       do_delete_entry (plugin, uid);
1134       if (size != 0)
1135         plugin->env->duc (plugin->env->cls,
1136                           - size);
1137     }
1138   if (nrc->one_shot == GNUNET_YES)
1139     GNUNET_free (nrc);
1140   return;
1141  END_SET:
1142   /* call dviter with "end of set" */
1143   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1144   nrc->dviter (nrc->dviter_cls, 
1145                NULL, NULL, 0, NULL, 0, 0, 0, 
1146                GNUNET_TIME_UNIT_ZERO_ABS, 0);
1147   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1148   nrc->prep (nrc->prep_cls, NULL);
1149   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1150   GNUNET_free (nrc);
1151 }
1152
1153
1154 /**
1155  * Function invoked on behalf of a "PluginIterator"
1156  * asking the database plugin to call the iterator
1157  * with the next item.
1158  *
1159  * @param next_cls whatever argument was given
1160  *        to the PluginIterator as "next_cls".
1161  * @param end_it set to GNUNET_YES if we
1162  *        should terminate the iteration early
1163  *        (iterator should be still called once more
1164  *         to signal the end of the iteration).
1165  */
1166 static void 
1167 mysql_plugin_next_request (void *next_cls,
1168                            int end_it)
1169 {
1170   struct NextRequestClosure *nrc = next_cls;
1171
1172   if (GNUNET_YES == end_it)
1173     nrc->end_it = GNUNET_YES;
1174   nrc->plugin->next_task_nc = nrc;
1175   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1176                                                      nrc);
1177 }  
1178
1179
1180 /**
1181  * Context for 'get_statement_prepare'.
1182  */
1183 struct GetContext
1184 {
1185   GNUNET_HashCode key;
1186   GNUNET_HashCode vhash;
1187
1188   unsigned int prio;
1189   unsigned int anonymity;
1190   unsigned long long expiration;
1191   unsigned long long vkey;
1192   unsigned long long total;
1193   unsigned int off;
1194   unsigned int count;
1195   int have_vhash;
1196 };
1197
1198
1199 static int
1200 get_statement_prepare (void *cls,
1201                        struct NextRequestClosure *nrc)
1202 {
1203   struct GetContext *gc = cls;
1204   struct Plugin *plugin;
1205   int ret;
1206   unsigned long hashSize;
1207   
1208   if (NULL == nrc)
1209     {
1210       GNUNET_free (gc);
1211       return GNUNET_NO;
1212     }
1213   if (gc->count == gc->total)
1214     return GNUNET_NO;
1215   plugin = nrc->plugin;
1216   hashSize = sizeof (GNUNET_HashCode);
1217   if (++gc->off >= gc->total)
1218     gc->off = 0;
1219 #if DEBUG_MYSQL
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221               "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
1222               gc->count+1,
1223               gc->total,
1224               gc->off,
1225               GNUNET_h2s (&gc->key));  
1226 #endif
1227   if (nrc->type != 0)
1228     {
1229       if (gc->have_vhash)
1230         {
1231           ret = prepared_statement_run_select (plugin,
1232                                                plugin->select_entry_by_hash_vhash_and_type, 
1233                                                7, nrc->rbind, 
1234                                                &return_ok, NULL, 
1235                                                MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1236                                                MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1237                                                MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1238                                                MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1239                                                -1);
1240         }
1241       else
1242         {
1243           ret =
1244             prepared_statement_run_select (plugin,
1245                                            plugin->select_entry_by_hash_and_type, 
1246                                            7, nrc->rbind, 
1247                                            &return_ok, NULL,
1248                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1249                                            MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1250                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1251                                            -1);
1252         }
1253     }
1254   else
1255     {
1256       if (gc->have_vhash)
1257         {
1258           ret =
1259             prepared_statement_run_select (plugin,
1260                                            plugin->select_entry_by_hash_and_vhash, 
1261                                            7, nrc->rbind, 
1262                                            &return_ok, NULL,
1263                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
1264                                            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
1265                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1266                                            -1);
1267         }
1268       else
1269         {
1270           ret =
1271             prepared_statement_run_select (plugin,
1272                                            plugin->select_entry_by_hash, 
1273                                            7, nrc->rbind, 
1274                                            &return_ok, NULL,
1275                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1276                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1277                                            -1);
1278         }
1279     }
1280   gc->count++;
1281   return ret;
1282 }
1283
1284
1285 /**
1286  * Iterate over the results for a particular key in the datastore.
1287  *
1288  * @param cls closure
1289  * @param key maybe NULL (to match all entries)
1290  * @param vhash hash of the value, maybe NULL (to
1291  *        match all values that have the right key).
1292  *        Note that for DBlocks there is no difference
1293  *        betwen key and vhash, but for other blocks
1294  *        there may be!
1295  * @param type entries of which type are relevant?
1296  *        Use 0 for any type.
1297  * @param iter function to call on each matching value;
1298  *        will be called once with a NULL value at the end
1299  * @param iter_cls closure for iter
1300  */
1301 static void
1302 mysql_plugin_get (void *cls,
1303                   const GNUNET_HashCode *key,
1304                   const GNUNET_HashCode *vhash,
1305                   enum GNUNET_BLOCK_Type type,
1306                   PluginIterator iter, void *iter_cls)
1307 {
1308   struct Plugin *plugin = cls;
1309   unsigned int itype = type;
1310   int ret;
1311   MYSQL_BIND cbind[1];
1312   struct GetContext *gc;
1313   struct NextRequestClosure *nrc;
1314   long long total;
1315   unsigned long hashSize;
1316   unsigned long hashSize2;
1317
1318   GNUNET_assert (key != NULL);
1319   if (iter == NULL) 
1320     return;
1321   hashSize = sizeof (GNUNET_HashCode);
1322   hashSize2 = sizeof (GNUNET_HashCode);
1323   memset (cbind, 0, sizeof (cbind));
1324   total = -1;
1325   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1326   cbind[0].buffer = &total;
1327   cbind[0].is_unsigned = GNUNET_NO;
1328   if (type != 0)
1329     {
1330       if (vhash != NULL)
1331         {
1332           ret =
1333             prepared_statement_run_select (plugin,
1334                                            plugin->count_entry_by_hash_vhash_and_type, 
1335                                            1, cbind, 
1336                                            &return_ok, NULL,
1337                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1338                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1339                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1340                                            -1);
1341         }
1342       else
1343         {
1344           ret =
1345             prepared_statement_run_select (plugin,
1346                                            plugin->count_entry_by_hash_and_type, 
1347                                            1, cbind, 
1348                                            &return_ok, NULL,
1349                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1350                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1351                                            -1);
1352         }
1353     }
1354   else
1355     {
1356       if (vhash != NULL)
1357         {
1358           ret =
1359             prepared_statement_run_select (plugin,
1360                                            plugin->count_entry_by_hash_and_vhash, 
1361                                            1, cbind,
1362                                            &return_ok, NULL,
1363                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1364                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1365                                            -1);
1366
1367         }
1368       else
1369         {
1370           ret =
1371             prepared_statement_run_select (plugin,
1372                                            plugin->count_entry_by_hash,
1373                                            1, cbind, 
1374                                            &return_ok, NULL, 
1375                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1376                                            -1);
1377         }
1378     }
1379   if ((ret != GNUNET_OK) || (0 >= total))
1380     {
1381       iter (iter_cls, 
1382             NULL, NULL, 0, NULL, 0, 0, 0, 
1383             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1384       return;
1385     }
1386 #if DEBUG_MYSQL
1387   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1388               "Iterating over %lld results for GET `%s'\n",
1389               total,
1390               GNUNET_h2s (key));
1391 #endif
1392   gc = GNUNET_malloc (sizeof (struct GetContext));
1393   gc->key = *key;
1394   if (vhash != NULL)
1395     {
1396       gc->have_vhash = GNUNET_YES;
1397       gc->vhash = *vhash;
1398     }
1399   gc->total = total;
1400   gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1401   
1402
1403   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1404   nrc->plugin = plugin;
1405   nrc->type = type;  
1406   nrc->dviter = iter;
1407   nrc->dviter_cls = iter_cls;
1408   nrc->prep = &get_statement_prepare;
1409   nrc->prep_cls = gc;
1410   mysql_plugin_next_request (nrc, GNUNET_NO);
1411 }
1412
1413
1414 /**
1415  * Run the prepared statement to get the next data item ready.
1416  * 
1417  * @param cls not used
1418  * @param nrc closure for the next request iterator
1419  * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1420  */
1421 static int
1422 iterator_zero_prepare (void *cls,
1423                        struct NextRequestClosure *nrc)
1424 {
1425   struct Plugin *plugin;
1426   int ret;
1427
1428   if (nrc == NULL)
1429     return GNUNET_NO;
1430   plugin = nrc->plugin;
1431   ret = prepared_statement_run_select (plugin,
1432                                        plugin->zero_iter,
1433                                        7, nrc->rbind,
1434                                        &return_ok, NULL,
1435                                        MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
1436                                        -1);
1437   nrc->count++;
1438   return ret;
1439 }
1440
1441
1442 /**
1443  * Select a subset of the items in the datastore and call
1444  * the given iterator for each of them.
1445  *
1446  * @param cls our "struct Plugin*"
1447  * @param type entries of which type should be considered?
1448  *        Use 0 for any type.
1449  * @param iter function to call on each matching value;
1450  *        will be called once with a NULL value at the end
1451  * @param iter_cls closure for iter
1452  */
1453 static void
1454 mysql_plugin_iter_zero_anonymity (void *cls,
1455                                   enum GNUNET_BLOCK_Type type,
1456                                   PluginIterator iter,
1457                                   void *iter_cls)
1458 {
1459   struct Plugin *plugin = cls;
1460   struct NextRequestClosure *nrc;
1461
1462   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1463   nrc->plugin = plugin;
1464   nrc->type = type;  
1465   nrc->dviter = iter;
1466   nrc->dviter_cls = iter_cls;
1467   nrc->prep = &iterator_zero_prepare;
1468   mysql_plugin_next_request (nrc, GNUNET_NO);
1469 }
1470
1471
1472 /**
1473  * Run the SELECT statement for the replication function.
1474  * 
1475  * @param cls the 'struct Plugin'
1476  * @param nrc the context (not used)
1477  */
1478 static int
1479 replication_prepare (void *cls,
1480                      struct NextRequestClosure *nrc)
1481 {
1482   struct Plugin *plugin = cls;
1483
1484   return prepared_statement_run_select (plugin,
1485                                         plugin->select_replication, 
1486                                         7, nrc->rbind, 
1487                                         &return_ok, NULL,
1488                                         -1);
1489 }
1490
1491
1492
1493 /**
1494  * Context for 'repl_iter' function.
1495  */
1496 struct ReplCtx
1497 {
1498   
1499   /**
1500    * Plugin handle.
1501    */
1502   struct Plugin *plugin;
1503   
1504   /**
1505    * Function to call for the result (or the NULL).
1506    */
1507   PluginIterator iter;
1508   
1509   /**
1510    * Closure for iter.
1511    */
1512   void *iter_cls;
1513 };
1514
1515
1516 /**
1517  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1518  * Decrements the replication counter and calls the original
1519  * iterator.
1520  *
1521  * @param cls closure
1522  * @param next_cls closure to pass to the "next" function.
1523  * @param key key for the content
1524  * @param size number of bytes in data
1525  * @param data content stored
1526  * @param type type of the content
1527  * @param priority priority of the content
1528  * @param anonymity anonymity-level for the content
1529  * @param expiration expiration time for the content
1530  * @param uid unique identifier for the datum;
1531  *        maybe 0 if no unique identifier is available
1532  *
1533  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1534  *         (continue on call to "next", of course),
1535  *         GNUNET_NO to delete the item and continue (if supported)
1536  */
1537 static int
1538 repl_iter (void *cls,
1539            void *next_cls,
1540            const GNUNET_HashCode *key,
1541            uint32_t size,
1542            const void *data,
1543            enum GNUNET_BLOCK_Type type,
1544            uint32_t priority,
1545            uint32_t anonymity,
1546            struct GNUNET_TIME_Absolute expiration, 
1547            uint64_t uid)
1548 {
1549   struct ReplCtx *rc = cls;
1550   struct Plugin *plugin = rc->plugin;
1551   unsigned long long oid;
1552   int ret;
1553   int iret;
1554
1555   ret = rc->iter (rc->iter_cls,
1556                   next_cls, key,
1557                   size, data, 
1558                   type, priority, anonymity, expiration,
1559                   uid);
1560   if (NULL != key)
1561     {
1562       oid = (unsigned long long) uid;
1563       iret = prepared_statement_run (plugin,
1564                                     plugin->dec_repl,
1565                                     NULL,
1566                                     MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
1567                                     -1);
1568       if (iret == GNUNET_SYSERR)
1569         {
1570           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1571                       "Failed to reduce replication counter\n");
1572           return GNUNET_SYSERR;
1573         }
1574     }
1575   return ret;
1576 }
1577
1578
1579 /**
1580  * Get a random item for replication.  Returns a single, not expired, random item
1581  * from those with the highest replication counters.  The item's 
1582  * replication counter is decremented by one IF it was positive before.
1583  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1584  *
1585  * @param cls closure
1586  * @param iter function to call the value (once only).
1587  * @param iter_cls closure for iter
1588  */
1589 static void
1590 mysql_plugin_replication_get (void *cls,
1591                               PluginIterator iter, void *iter_cls)
1592 {
1593   struct Plugin *plugin = cls;
1594   struct NextRequestClosure *nrc;
1595   struct ReplCtx rc;
1596   
1597   rc.plugin = plugin;
1598   rc.iter = iter;
1599   rc.iter_cls = iter_cls;
1600   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1601   nrc->plugin = plugin;
1602   nrc->now = GNUNET_TIME_absolute_get ();
1603   nrc->prep = &replication_prepare;
1604   nrc->prep_cls = plugin;
1605   nrc->type = 0;
1606   nrc->dviter = &repl_iter;
1607   nrc->dviter_cls = &rc;
1608   nrc->end_it = GNUNET_NO;
1609   nrc->one_shot = GNUNET_YES;
1610   mysql_next_request_cont (nrc, NULL);
1611 }
1612
1613
1614 /**
1615  * Run the SELECT statement for the expiration function.
1616  * 
1617  * @param cls the 'struct Plugin'
1618  * @param nrc the context (not used)
1619  * @return GNUNET_OK on success, GNUNET_NO if there are
1620  *         no more values, GNUNET_SYSERR on error
1621  */
1622 static int
1623 expiration_prepare (void *cls,
1624                     struct NextRequestClosure *nrc)
1625 {
1626   struct Plugin *plugin = cls;
1627   long long nt;
1628
1629   if (NULL == nrc)
1630     return GNUNET_NO;
1631   nt = (long long) nrc->now.abs_value;
1632   return prepared_statement_run_select
1633     (plugin,
1634      plugin->select_expiration, 
1635      7, nrc->rbind, 
1636      &return_ok, NULL,
1637      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1638      -1);
1639 }
1640
1641
1642 /**
1643  * Get a random item for expiration.
1644  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1645  *
1646  * @param cls closure
1647  * @param iter function to call the value (once only).
1648  * @param iter_cls closure for iter
1649  */
1650 static void
1651 mysql_plugin_expiration_get (void *cls,
1652                              PluginIterator iter, void *iter_cls)
1653 {
1654   struct Plugin *plugin = cls;
1655   struct NextRequestClosure *nrc;
1656
1657   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1658   nrc->plugin = plugin;
1659   nrc->now = GNUNET_TIME_absolute_get ();
1660   nrc->prep = &expiration_prepare;
1661   nrc->prep_cls = plugin;
1662   nrc->type = 0;
1663   nrc->dviter = iter;
1664   nrc->dviter_cls = iter_cls;
1665   nrc->end_it = GNUNET_NO;
1666   nrc->one_shot = GNUNET_YES;
1667   mysql_next_request_cont (nrc, NULL);
1668 }
1669
1670
1671 /**
1672  * Drop database.
1673  *
1674  * @param cls the "struct Plugin*"
1675  */
1676 static void 
1677 mysql_plugin_drop (void *cls)
1678 {
1679   struct Plugin *plugin = cls;
1680
1681   if (GNUNET_OK != run_statement (plugin,
1682                                   "DROP TABLE gn090"))
1683     return;           /* error */
1684   plugin->env->duc (plugin->env->cls, 0);
1685 }
1686
1687
1688 /**
1689  * Entry point for the plugin.
1690  *
1691  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1692  * @return our "struct Plugin*"
1693  */
1694 void *
1695 libgnunet_plugin_datastore_mysql_init (void *cls)
1696 {
1697   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1698   struct GNUNET_DATASTORE_PluginFunctions *api;
1699   struct Plugin *plugin;
1700
1701   plugin = GNUNET_malloc (sizeof (struct Plugin));
1702   plugin->env = env;
1703   plugin->cnffile = get_my_cnf_path (env->cfg);
1704   if (GNUNET_OK != iopen (plugin))
1705     {
1706       iclose (plugin);
1707       GNUNET_free_non_null (plugin->cnffile);
1708       GNUNET_free (plugin);
1709       return NULL;
1710     }
1711 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1712 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1713   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1714              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1715              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1716              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1717              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1718              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1719              " hash BINARY(64) NOT NULL DEFAULT '',"
1720              " vhash BINARY(64) NOT NULL DEFAULT '',"
1721              " value BLOB NOT NULL DEFAULT '',"
1722              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1723              " PRIMARY KEY (uid)"
1724 #if 0
1725              " INDEX idx_hash (hash(64)),"
1726              " INDEX idx_hash_uid (hash(64),uid),"
1727              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1728              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1729              " INDEX idx_prio (prio),"
1730              " INDEX idx_repl (repl),"
1731              " INDEX idx_expire_prio (expire,prio),"
1732              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1733 #endif
1734              ") ENGINE=InnoDB") ||
1735       MRUNS ("SET AUTOCOMMIT = 1") ||
1736       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1737       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1738       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1739       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1740       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1741       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1742                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1743       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1744       || PINIT (plugin->get_size, SELECT_SIZE)
1745       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1746       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1747       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1748                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1749       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1750       || PINIT (plugin->dec_repl, DEC_REPL)
1751       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1752       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1753       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1754     {
1755       iclose (plugin);
1756       GNUNET_free_non_null (plugin->cnffile);
1757       GNUNET_free (plugin);
1758       return NULL;
1759     }
1760 #undef PINIT
1761 #undef MRUNS
1762
1763   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1764   api->cls = plugin;
1765   api->get_size = &mysql_plugin_get_size;
1766   api->put = &mysql_plugin_put;
1767   api->next_request = &mysql_plugin_next_request;
1768   api->get = &mysql_plugin_get;
1769   api->replication_get = &mysql_plugin_replication_get;
1770   api->expiration_get = &mysql_plugin_expiration_get;
1771   api->update = &mysql_plugin_update;
1772   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1773   api->drop = &mysql_plugin_drop;
1774   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1775                    "mysql", _("Mysql database running\n"));
1776   return api;
1777 }
1778
1779
1780 /**
1781  * Exit point from the plugin.
1782  * @param cls our "struct Plugin*"
1783  * @return always NULL
1784  */
1785 void *
1786 libgnunet_plugin_datastore_mysql_done (void *cls)
1787 {
1788   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1789   struct Plugin *plugin = api->cls;
1790
1791   iclose (plugin);
1792   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1793     {
1794       GNUNET_SCHEDULER_cancel (plugin->next_task);
1795       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1796       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1797       GNUNET_free (plugin->next_task_nc);
1798       plugin->next_task_nc = NULL;
1799     }
1800   GNUNET_free_non_null (plugin->cnffile);
1801   GNUNET_free (plugin);
1802   GNUNET_free (api);
1803   mysql_library_end ();
1804   return NULL;
1805 }
1806
1807 /* end of plugin_datastore_mysql.c */