debugging
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163 /**
164  * Context for the universal iterator.
165  */
166 struct NextRequestClosure;
167
168 /**
169  * Type of a function that will prepare
170  * the next iteration.
171  *
172  * @param cls closure
173  * @param nc the next context; NULL for the last
174  *         call which gives the callback a chance to
175  *         clean up the closure
176  * @return GNUNET_OK on success, GNUNET_NO if there are
177  *         no more values, GNUNET_SYSERR on error
178  */
179 typedef int (*PrepareFunction)(void *cls,
180                                struct NextRequestClosure *nc);
181
182
183 struct NextRequestClosure
184 {
185   struct Plugin *plugin;
186
187   struct GNUNET_TIME_Absolute now;
188
189   /**
190    * Function to call to prepare the next
191    * iteration.
192    */
193   PrepareFunction prep;
194
195   /**
196    * Closure for prep.
197    */
198   void *prep_cls;
199
200   MYSQL_BIND rbind[7];
201
202   enum GNUNET_BLOCK_Type type;
203
204   PluginIterator dviter;
205
206   void *dviter_cls;
207
208   unsigned int count;
209
210   int end_it;
211
212   int one_shot;
213 };
214
215
216 /**
217  * Context for all functions in this plugin.
218  */
219 struct Plugin 
220 {
221   /**
222    * Our execution environment.
223    */
224   struct GNUNET_DATASTORE_PluginEnvironment *env;
225
226   /**
227    * Handle to talk to MySQL.
228    */
229   MYSQL *dbf;
230   
231   /**
232    * We keep all prepared statements in a DLL.  This is the head.
233    */
234   struct GNUNET_MysqlStatementHandle *shead;
235
236   /**
237    * We keep all prepared statements in a DLL.  This is the tail.
238    */
239   struct GNUNET_MysqlStatementHandle *stail;
240
241   /**
242    * Filename of "my.cnf" (msyql configuration).
243    */
244   char *cnffile;
245
246   /**
247    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
248    */
249   struct NextRequestClosure *next_task_nc;
250
251   /**
252    * Pending task with scheduler for running the next request.
253    */
254   GNUNET_SCHEDULER_TaskIdentifier next_task;
255
256   /**
257    * Prepared statements.
258    */
259 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
260   struct GNUNET_MysqlStatementHandle *insert_entry;
261   
262 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
263   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
264
265 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
266   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
267   
268 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_uid) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
269   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
270
271 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
272   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
273
274 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
275   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
276  
277 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
278   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
279
280 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
281   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
282  
283 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
284   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
285   
286 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
287   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
288
289 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
290   struct GNUNET_MysqlStatementHandle *update_entry;
291
292 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
293   struct GNUNET_MysqlStatementHandle *dec_repl;
294
295 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
296   struct GNUNET_MysqlStatementHandle *get_size;
297
298 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_anonLevel_uid) WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
299   struct GNUNET_MysqlStatementHandle *zero_iter;
300
301 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_expire_prio) WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
302   "UNION "\
303   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_prio) ORDER BY prio ASC LIMIT 1) "\
304   "ORDER BY expire ASC LIMIT 1"
305   struct GNUNET_MysqlStatementHandle *select_expiration;
306
307 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX(idx_repl) ORDER BY repl DESC,RAND() LIMIT 1"
308   struct GNUNET_MysqlStatementHandle *select_replication;
309
310 };
311
312
313 /**
314  * Obtain the location of ".my.cnf".
315  *
316  * @param cfg our configuration
317  * @return NULL on error
318  */
319 static char *
320 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
321 {
322   char *cnffile;
323   char *home_dir;
324   struct stat st;
325 #ifndef WINDOWS
326   struct passwd *pw;
327 #endif
328   int configured;
329
330 #ifndef WINDOWS
331   pw = getpwuid (getuid ());
332   if (!pw)
333     {
334       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
335                            "getpwuid");
336       return NULL;
337     }
338   if (GNUNET_YES ==
339       GNUNET_CONFIGURATION_have_value (cfg,
340                                        "datastore-mysql", "CONFIG"))
341     {
342       GNUNET_assert (GNUNET_OK == 
343                      GNUNET_CONFIGURATION_get_value_filename (cfg,
344                                                               "datastore-mysql", "CONFIG", &cnffile));
345       configured = GNUNET_YES;
346     }
347   else
348     {
349       home_dir = GNUNET_strdup (pw->pw_dir);
350 #else
351       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
352       plibc_conv_to_win_path ("~/", home_dir);
353 #endif
354       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
355       GNUNET_free (home_dir);
356       configured = GNUNET_NO;
357     }
358   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
359               _("Trying to use file `%s' for MySQL configuration.\n"),
360               cnffile);
361   if ((0 != STAT (cnffile, &st)) ||
362       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
363     {
364       if (configured == GNUNET_YES)
365         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
366                     _("Could not access file `%s': %s\n"), cnffile,
367                     STRERROR (errno));
368       GNUNET_free (cnffile);
369       return NULL;
370     }
371   return cnffile;
372 }
373
374
375
376 /**
377  * Free a prepared statement.
378  *
379  * @param plugin plugin context
380  * @param s prepared statement
381  */
382 static void
383 prepared_statement_destroy (struct Plugin *plugin, 
384                             struct GNUNET_MysqlStatementHandle
385                             *s)
386 {
387   GNUNET_CONTAINER_DLL_remove (plugin->shead,
388                                plugin->stail,
389                                s);
390   if (s->valid)
391     mysql_stmt_close (s->statement);
392   GNUNET_free (s->query);
393   GNUNET_free (s);
394 }
395
396
397 /**
398  * Close database connection and all prepared statements (we got a DB
399  * disconnect error).
400  */
401 static int
402 iclose (struct Plugin *plugin)
403 {
404   struct GNUNET_MysqlStatementHandle *spos;
405
406   spos = plugin->shead;
407   while (NULL != plugin->shead)
408     prepared_statement_destroy (plugin,
409                                 plugin->shead);
410   if (plugin->dbf != NULL)
411     {
412       mysql_close (plugin->dbf);
413       plugin->dbf = NULL;
414     }
415   return GNUNET_OK;
416 }
417
418
419 /**
420  * Open the connection with the database (and initialize
421  * our default options).
422  *
423  * @return GNUNET_OK on success
424  */
425 static int
426 iopen (struct Plugin *ret)
427 {
428   char *mysql_dbname;
429   char *mysql_server;
430   char *mysql_user;
431   char *mysql_password;
432   unsigned long long mysql_port;
433   my_bool reconnect;
434   unsigned int timeout;
435
436   ret->dbf = mysql_init (NULL);
437   if (ret->dbf == NULL)
438     return GNUNET_SYSERR;
439   if (ret->cnffile != NULL)
440     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
441   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
442   reconnect = 0;
443   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
444   mysql_options (ret->dbf,
445                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
446   mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
447   timeout = 60; /* in seconds */
448   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
449   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
450   mysql_dbname = NULL;
451   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
452                                                      "datastore-mysql", "DATABASE"))
453     GNUNET_assert (GNUNET_OK == 
454                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
455                                                           "datastore-mysql", "DATABASE", 
456                                                           &mysql_dbname));
457   else
458     mysql_dbname = GNUNET_strdup ("gnunet");
459   mysql_user = NULL;
460   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
461                                                      "datastore-mysql", "USER"))
462     {
463       GNUNET_assert (GNUNET_OK == 
464                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
465                                                            "datastore-mysql", "USER", 
466                                                            &mysql_user));
467     }
468   mysql_password = NULL;
469   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
470                                                      "datastore-mysql", "PASSWORD"))
471     {
472       GNUNET_assert (GNUNET_OK ==
473                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
474                                                            "datastore-mysql", "PASSWORD",
475                                                            &mysql_password));
476     }
477   mysql_server = NULL;
478   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
479                                                      "datastore-mysql", "HOST"))
480     {
481       GNUNET_assert (GNUNET_OK == 
482                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
483                                                            "datastore-mysql", "HOST", 
484                                                            &mysql_server));
485     }
486   mysql_port = 0;
487   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
488                                                      "datastore-mysql", "PORT"))
489     {
490       GNUNET_assert (GNUNET_OK ==
491                     GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
492                                                            "PORT", &mysql_port));
493     }
494
495   GNUNET_assert (mysql_dbname != NULL);
496   mysql_real_connect (ret->dbf, 
497                       mysql_server, 
498                       mysql_user, mysql_password,
499                       mysql_dbname, 
500                       (unsigned int) mysql_port, NULL,
501                       CLIENT_IGNORE_SIGPIPE);
502   GNUNET_free_non_null (mysql_server);
503   GNUNET_free_non_null (mysql_user);
504   GNUNET_free_non_null (mysql_password);
505   GNUNET_free (mysql_dbname);
506   if (mysql_error (ret->dbf)[0])
507     {
508       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
509                  "mysql_real_connect", ret);
510       return GNUNET_SYSERR;
511     }
512   return GNUNET_OK;
513 }
514
515
516 /**
517  * Run the given MySQL statement.
518  *
519  * @param plugin plugin context
520  * @param statement SQL statement to run
521  * @return GNUNET_OK on success, GNUNET_SYSERR on error
522  */
523 static int
524 run_statement (struct Plugin *plugin,
525                const char *statement)
526 {
527   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
528     return GNUNET_SYSERR;
529   mysql_query (plugin->dbf, statement);
530   if (mysql_error (plugin->dbf)[0])
531     {
532       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
533                  "mysql_query", plugin);
534       iclose (plugin);
535       return GNUNET_SYSERR;
536     }
537   return GNUNET_OK;
538 }
539
540
541 /**
542  * Create a prepared statement.
543  *
544  * @param plugin plugin context
545  * @param statement SQL statement text to prepare
546  * @return NULL on error
547  */
548 static struct GNUNET_MysqlStatementHandle *
549 prepared_statement_create (struct Plugin *plugin, 
550                            const char *statement)
551 {
552   struct GNUNET_MysqlStatementHandle *ret;
553
554   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
555   ret->query = GNUNET_strdup (statement);
556   GNUNET_CONTAINER_DLL_insert (plugin->shead,
557                                plugin->stail,
558                                ret);
559   return ret;
560 }
561
562
563 /**
564  * Prepare a statement for running.
565  *
566  * @param plugin plugin context
567  * @param ret handle to prepared statement
568  * @return GNUNET_OK on success
569  */
570 static int
571 prepare_statement (struct Plugin *plugin, 
572                    struct GNUNET_MysqlStatementHandle *ret)
573 {
574   if (GNUNET_YES == ret->valid)
575     return GNUNET_OK;
576   if ((NULL == plugin->dbf) && 
577       (GNUNET_OK != iopen (plugin)))
578     return GNUNET_SYSERR;
579   ret->statement = mysql_stmt_init (plugin->dbf);
580   if (ret->statement == NULL)
581     {
582       iclose (plugin);
583       return GNUNET_SYSERR;
584     }
585   if (mysql_stmt_prepare (ret->statement, 
586                           ret->query,
587                           strlen (ret->query)))
588     {
589       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
590                  "mysql_stmt_prepare", 
591                  plugin);
592       mysql_stmt_close (ret->statement);
593       ret->statement = NULL;
594       iclose (plugin);
595       return GNUNET_SYSERR;
596     }
597   ret->valid = GNUNET_YES;
598   return GNUNET_OK;
599
600 }
601
602
603 /**
604  * Bind the parameters for the given MySQL statement
605  * and run it.
606  *
607  * @param plugin plugin context
608  * @param s statement to bind and run
609  * @param ap arguments for the binding
610  * @return GNUNET_SYSERR on error, GNUNET_OK on success
611  */
612 static int
613 init_params (struct Plugin *plugin,
614              struct GNUNET_MysqlStatementHandle *s,
615              va_list ap)
616 {
617   MYSQL_BIND qbind[MAX_PARAM];
618   unsigned int pc;
619   unsigned int off;
620   enum enum_field_types ft;
621
622   pc = mysql_stmt_param_count (s->statement);
623   if (pc > MAX_PARAM)
624     {
625       /* increase internal constant! */
626       GNUNET_break (0);
627       return GNUNET_SYSERR;
628     }
629   memset (qbind, 0, sizeof (qbind));
630   off = 0;
631   ft = 0;
632   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
633     {
634       qbind[off].buffer_type = ft;
635       switch (ft)
636         {
637         case MYSQL_TYPE_FLOAT:
638           qbind[off].buffer = va_arg (ap, float *);
639           break;
640         case MYSQL_TYPE_LONGLONG:
641           qbind[off].buffer = va_arg (ap, unsigned long long *);
642           qbind[off].is_unsigned = va_arg (ap, int);
643           break;
644         case MYSQL_TYPE_LONG:
645           qbind[off].buffer = va_arg (ap, unsigned int *);
646           qbind[off].is_unsigned = va_arg (ap, int);
647           break;
648         case MYSQL_TYPE_VAR_STRING:
649         case MYSQL_TYPE_STRING:
650         case MYSQL_TYPE_BLOB:
651           qbind[off].buffer = va_arg (ap, void *);
652           qbind[off].buffer_length = va_arg (ap, unsigned long);
653           qbind[off].length = va_arg (ap, unsigned long *);
654           break;
655         default:
656           /* unsupported type */
657           GNUNET_break (0);
658           return GNUNET_SYSERR;
659         }
660       pc--;
661       off++;
662     }
663   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
664     {
665       GNUNET_assert (0);
666       return GNUNET_SYSERR;
667     }
668   if (mysql_stmt_bind_param (s->statement, qbind))
669     {
670       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671                   _("`%s' failed at %s:%d with error: %s\n"),
672                   "mysql_stmt_bind_param",
673                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
674       iclose (plugin);
675       return GNUNET_SYSERR;
676     }
677   if (mysql_stmt_execute (s->statement))
678     {
679       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
680                   _("`%s' failed at %s:%d with error: %s\n"),
681                   "mysql_stmt_execute",
682                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
683       iclose (plugin);
684       return GNUNET_SYSERR;
685     }
686   return GNUNET_OK;
687 }
688
689 /**
690  * Type of a callback that will be called for each
691  * data set returned from MySQL.
692  *
693  * @param cls user-defined argument
694  * @param num_values number of elements in values
695  * @param values values returned by MySQL
696  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
697  */
698 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
699                                           unsigned int num_values,
700                                           MYSQL_BIND *values);
701
702
703 /**
704  * Run a prepared SELECT statement.
705  *
706  * @param plugin plugin context
707  * @param s statement to run
708  * @param result_size number of elements in results array
709  * @param results pointer to already initialized MYSQL_BIND
710  *        array (of sufficient size) for passing results
711  * @param processor function to call on each result
712  * @param processor_cls extra argument to processor
713  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
714  *        values (size + buffer-reference for pointers); terminated
715  *        with "-1"
716  * @return GNUNET_SYSERR on error, otherwise
717  *         the number of successfully affected (or queried) rows
718  */
719 static int
720 prepared_statement_run_select (struct Plugin *plugin,
721                                struct GNUNET_MysqlStatementHandle *s,
722                                unsigned int result_size,
723                                MYSQL_BIND *results,
724                                GNUNET_MysqlDataProcessor processor, void *processor_cls,
725                                ...)
726 {
727   va_list ap;
728   int ret;
729   unsigned int rsize;
730   int total;
731
732   if (GNUNET_OK != prepare_statement (plugin, s))
733     {
734       GNUNET_break (0);
735       return GNUNET_SYSERR;
736     }
737   va_start (ap, processor_cls);
738   if (GNUNET_OK != init_params (plugin, s, ap))
739     {
740       GNUNET_break (0);
741       va_end (ap);
742       return GNUNET_SYSERR;
743     }
744   va_end (ap);
745   rsize = mysql_stmt_field_count (s->statement);
746   if (rsize > result_size)
747     {
748       GNUNET_break (0);
749       return GNUNET_SYSERR;
750     }
751   if (mysql_stmt_bind_result (s->statement, results))
752     {
753       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
754                   _("`%s' failed at %s:%d with error: %s\n"),
755                   "mysql_stmt_bind_result",
756                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
757       iclose (plugin);
758       return GNUNET_SYSERR;
759     }
760
761   total = 0;
762   while (1)
763     {
764       ret = mysql_stmt_fetch (s->statement);
765       if (ret == MYSQL_NO_DATA)
766         break;
767       if (ret != 0)
768         {
769           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
770                       _("`%s' failed at %s:%d with error: %s\n"),
771                       "mysql_stmt_fetch",
772                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
773           iclose (plugin);
774           return GNUNET_SYSERR;
775         }
776       if (processor != NULL)
777         if (GNUNET_OK != processor (processor_cls, rsize, results))
778           break;
779       total++;
780     }
781   mysql_stmt_reset (s->statement);
782   return total;
783 }
784
785
786 /**
787  * Run a prepared statement that does NOT produce results.
788  *
789  * @param plugin plugin context
790  * @param s statement to run
791  * @param insert_id NULL or address where to store the row ID of whatever
792  *        was inserted (only for INSERT statements!)
793  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
794  *        values (size + buffer-reference for pointers); terminated
795  *        with "-1"
796  * @return GNUNET_SYSERR on error, otherwise
797  *         the number of successfully affected rows
798  */
799 static int
800 prepared_statement_run (struct Plugin *plugin,
801                         struct GNUNET_MysqlStatementHandle *s,
802                         unsigned long long *insert_id, ...)
803 {
804   va_list ap;
805   int affected;
806
807   if (GNUNET_OK != prepare_statement (plugin, s))
808     return GNUNET_SYSERR;
809   va_start (ap, insert_id);
810   if (GNUNET_OK != init_params (plugin, s, ap))
811     {
812       va_end (ap);
813       return GNUNET_SYSERR;
814     }
815   va_end (ap);
816   affected = mysql_stmt_affected_rows (s->statement);
817   if (NULL != insert_id)
818     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
819   mysql_stmt_reset (s->statement);
820   return affected;
821 }
822
823
824 /**
825  * Delete an entry from the gn090 table.
826  *
827  * @param plugin plugin context
828  * @param uid unique ID of the entry to delete
829  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
830  */
831 static int
832 do_delete_entry (struct Plugin *plugin,
833                  unsigned long long uid)
834 {
835   int ret;
836  
837 #if DEBUG_MYSQL
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "Deleting value %llu from gn090 table\n",
840               uid);
841 #endif
842   ret = prepared_statement_run (plugin,
843                                 plugin->delete_entry_by_uid,
844                                 NULL,
845                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
846                                 -1);
847   if (ret >= 0)
848     return GNUNET_OK;
849   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
850               "Deleting value %llu from gn090 table failed\n",
851               uid);
852   return ret;
853 }
854
855
856 /**
857  * Function that simply returns GNUNET_OK
858  *
859  * @param cls closure, not used
860  * @param num_values not used
861  * @param values not used
862  * @return GNUNET_OK
863  */
864 static int
865 return_ok (void *cls, 
866            unsigned int num_values, 
867            MYSQL_BIND *values)
868 {
869   fprintf (stderr, "Here: %u\n", num_values);
870   return GNUNET_OK;
871 }
872
873
874 /**
875  * Get an estimate of how much space the database is
876  * currently using.
877  *
878  * @param cls our "struct Plugin *"
879  * @return number of bytes used on disk
880  */
881 static unsigned long long
882 mysql_plugin_get_size (void *cls)
883 {
884   struct Plugin *plugin = cls;
885   MYSQL_BIND cbind[1];
886   long long total;
887
888   memset (cbind, 0, sizeof (cbind));
889   total = 0;
890   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
891   cbind[0].buffer = &total;
892   cbind[0].is_unsigned = GNUNET_NO;
893   if (GNUNET_OK != 
894       prepared_statement_run_select (plugin,
895                                      plugin->get_size,
896                                      1, cbind, 
897                                      &return_ok, NULL,
898                                      -1))
899     return 0;
900   return total;
901 }
902
903
904 /**
905  * Store an item in the datastore.
906  *
907  * @param cls closure
908  * @param key key for the item
909  * @param size number of bytes in data
910  * @param data content stored
911  * @param type type of the content
912  * @param priority priority of the content
913  * @param anonymity anonymity-level for the content
914  * @param replication replication-level for the content
915  * @param expiration expiration time for the content
916  * @param msg set to error message
917  * @return GNUNET_OK on success
918  */
919 static int
920 mysql_plugin_put (void *cls,
921                   const GNUNET_HashCode * key,
922                   uint32_t size,
923                   const void *data,
924                   enum GNUNET_BLOCK_Type type,
925                   uint32_t priority,
926                   uint32_t anonymity,
927                   uint32_t replication,
928                   struct GNUNET_TIME_Absolute expiration,
929                   char **msg)
930 {
931   struct Plugin *plugin = cls;
932   unsigned int irepl = replication;
933   unsigned int itype = type;
934   unsigned int ipriority = priority;
935   unsigned int ianonymity = anonymity;
936   unsigned long long lexpiration = expiration.abs_value;
937   unsigned long hashSize;
938   unsigned long hashSize2;
939   unsigned long lsize;
940   GNUNET_HashCode vhash;
941
942   if (size > MAX_DATUM_SIZE)
943     {
944       GNUNET_break (0);
945       return GNUNET_SYSERR;
946     }
947   hashSize = sizeof (GNUNET_HashCode);
948   hashSize2 = sizeof (GNUNET_HashCode);
949   lsize = size;
950   GNUNET_CRYPTO_hash (data, size, &vhash);
951   {
952   fprintf (stderr,
953            "inserting content with key `%s'\n",
954            GNUNET_h2s (key));
955   fprintf (stderr,
956            "inserting %u-byte content with vhash `%s'\n",
957            (unsigned int) size,
958            GNUNET_h2s (&vhash));
959   }
960
961   if (GNUNET_OK !=
962       prepared_statement_run (plugin,
963                               plugin->insert_entry,
964                               NULL,
965                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
966                               MYSQL_TYPE_LONG, &itype, GNUNET_YES,
967                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
968                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
969                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
970                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
971                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
972                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
973                               -1))
974     return GNUNET_SYSERR;    
975 #if DEBUG_MYSQL
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977               "Inserted value `%s' with size %u into gn090 table\n",
978               GNUNET_h2s (key),
979               (unsigned int) size);
980 #endif
981   if (size > 0)
982     plugin->env->duc (plugin->env->cls,
983                       size);
984   return GNUNET_OK;
985 }
986
987
988 /**
989  * Update the priority for a particular key in the datastore.  If
990  * the expiration time in value is different than the time found in
991  * the datastore, the higher value should be kept.  For the
992  * anonymity level, the lower value is to be used.  The specified
993  * priority should be added to the existing priority, ignoring the
994  * priority in value.
995  *
996  * Note that it is possible for multiple values to match this put.
997  * In that case, all of the respective values are updated.
998  *
999  * @param cls our "struct Plugin*"
1000  * @param uid unique identifier of the datum
1001  * @param delta by how much should the priority
1002  *     change?  If priority + delta < 0 the
1003  *     priority should be set to 0 (never go
1004  *     negative).
1005  * @param expire new expiration time should be the
1006  *     MAX of any existing expiration time and
1007  *     this value
1008  * @param msg set to error message
1009  * @return GNUNET_OK on success
1010  */
1011 static int
1012 mysql_plugin_update (void *cls,
1013                      uint64_t uid,
1014                      int delta, 
1015                      struct GNUNET_TIME_Absolute expire,
1016                      char **msg)
1017 {
1018   struct Plugin *plugin = cls;
1019   unsigned long long vkey = uid;
1020   unsigned long long lexpire = expire.abs_value;
1021   int ret;
1022
1023 #if DEBUG_MYSQL
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
1026               vkey,
1027               delta,
1028               lexpire);
1029 #endif
1030   ret = prepared_statement_run (plugin,
1031                                 plugin->update_entry,
1032                                 NULL,
1033                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
1034                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1035                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
1036                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
1037                                 -1);
1038   if (ret != GNUNET_OK)
1039     {
1040       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1041                   "Failed to update value %llu\n",
1042                   vkey);
1043     }
1044   return ret;
1045 }
1046
1047
1048
1049
1050 /**
1051  * Continuation of "mysql_next_request".
1052  *
1053  * @param next_cls the next context
1054  * @param tc the task context (unused)
1055  */
1056 static void 
1057 mysql_next_request_cont (void *next_cls,
1058                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1059 {
1060   struct NextRequestClosure *nrc = next_cls;
1061   struct Plugin *plugin;
1062   int ret;
1063   unsigned int type;
1064   unsigned int priority;
1065   unsigned int anonymity;
1066   unsigned long long exp;
1067   unsigned long hashSize;
1068   unsigned long size;
1069   unsigned long long uid;
1070   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
1071   GNUNET_HashCode key;
1072   struct GNUNET_TIME_Absolute expiration;
1073   MYSQL_BIND *rbind = nrc->rbind;
1074
1075   plugin = nrc->plugin;
1076   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1077   plugin->next_task_nc = NULL;
1078
1079   if (GNUNET_YES == nrc->end_it) 
1080     goto END_SET;
1081   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1082   nrc->now = GNUNET_TIME_absolute_get ();
1083   hashSize = sizeof (GNUNET_HashCode);
1084   memset (nrc->rbind, 0, sizeof (nrc->rbind));
1085   rbind = nrc->rbind;
1086   rbind[0].buffer_type = MYSQL_TYPE_LONG;
1087   rbind[0].buffer = &type;
1088   rbind[0].is_unsigned = 1;
1089   rbind[1].buffer_type = MYSQL_TYPE_LONG;
1090   rbind[1].buffer = &priority;
1091   rbind[1].is_unsigned = 1;
1092   rbind[2].buffer_type = MYSQL_TYPE_LONG;
1093   rbind[2].buffer = &anonymity;
1094   rbind[2].is_unsigned = 1;
1095   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1096   rbind[3].buffer = &exp;
1097   rbind[3].is_unsigned = 1;
1098   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1099   rbind[4].buffer = &key;
1100   rbind[4].buffer_length = hashSize;
1101   rbind[4].length = &hashSize;
1102   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1103   rbind[5].buffer = value;
1104   rbind[5].buffer_length = size = sizeof (value);
1105   rbind[5].length = &size;
1106   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1107   rbind[6].buffer = &uid;
1108   rbind[6].is_unsigned = 1;
1109
1110   if (GNUNET_OK != nrc->prep (nrc->prep_cls,
1111                               nrc))
1112     goto END_SET;
1113   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1114   GNUNET_assert (size <= sizeof(value));
1115   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1116        (hashSize != sizeof (GNUNET_HashCode)) )
1117     {
1118       GNUNET_break (0);
1119       goto END_SET;
1120     }     
1121 #if DEBUG_MYSQL
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1124               (unsigned int) size,
1125               GNUNET_h2s (&key),
1126               priority,
1127               anonymity,
1128               exp);
1129 #endif
1130   expiration.abs_value = exp;
1131
1132   {
1133   GNUNET_HashCode vh;
1134   
1135   GNUNET_CRYPTO_hash (value, size, &vh);
1136   fprintf (stderr,
1137            "found content under with key `%s'\n",
1138            GNUNET_h2s (&key));
1139   fprintf (stderr,
1140            "found %u-byte content with vhash `%s'\n",
1141            (unsigned int) size,
1142            GNUNET_h2s (&vh));
1143 }
1144   ret = nrc->dviter (nrc->dviter_cls, 
1145                      (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
1146                      &key,
1147                      size, value,
1148                      type, priority, anonymity, expiration,
1149                      uid);
1150   if (ret == GNUNET_SYSERR)
1151     {
1152       nrc->end_it = GNUNET_YES;
1153       return;
1154     }
1155   if (ret == GNUNET_NO)
1156     {
1157       do_delete_entry (plugin, uid);
1158       if (size != 0)
1159         plugin->env->duc (plugin->env->cls,
1160                           - size);
1161     }
1162   if (nrc->one_shot == GNUNET_YES)
1163     GNUNET_free (nrc);
1164   return;
1165  END_SET:
1166   /* call dviter with "end of set" */
1167   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1168   nrc->dviter (nrc->dviter_cls, 
1169                NULL, NULL, 0, NULL, 0, 0, 0, 
1170                GNUNET_TIME_UNIT_ZERO_ABS, 0);
1171   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1172   nrc->prep (nrc->prep_cls, NULL);
1173   GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1174   GNUNET_free (nrc);
1175 }
1176
1177
1178 /**
1179  * Function invoked on behalf of a "PluginIterator"
1180  * asking the database plugin to call the iterator
1181  * with the next item.
1182  *
1183  * @param next_cls whatever argument was given
1184  *        to the PluginIterator as "next_cls".
1185  * @param end_it set to GNUNET_YES if we
1186  *        should terminate the iteration early
1187  *        (iterator should be still called once more
1188  *         to signal the end of the iteration).
1189  */
1190 static void 
1191 mysql_plugin_next_request (void *next_cls,
1192                            int end_it)
1193 {
1194   struct NextRequestClosure *nrc = next_cls;
1195
1196   if (GNUNET_YES == end_it)
1197     nrc->end_it = GNUNET_YES;
1198   nrc->plugin->next_task_nc = nrc;
1199   nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1200                                                      nrc);
1201 }  
1202
1203
1204 /**
1205  * Context for 'get_statement_prepare'.
1206  */
1207 struct GetContext
1208 {
1209   GNUNET_HashCode key;
1210   GNUNET_HashCode vhash;
1211
1212   unsigned int prio;
1213   unsigned int anonymity;
1214   unsigned long long expiration;
1215   unsigned long long vkey;
1216   unsigned long long total;
1217   unsigned int off;
1218   unsigned int count;
1219   int have_vhash;
1220 };
1221
1222
1223 static int
1224 get_statement_prepare (void *cls,
1225                        struct NextRequestClosure *nrc)
1226 {
1227   struct GetContext *gc = cls;
1228   struct Plugin *plugin;
1229   int ret;
1230   unsigned long hashSize;
1231   
1232   if (NULL == nrc)
1233     {
1234       GNUNET_free (gc);
1235       return GNUNET_NO;
1236     }
1237   if (gc->count == gc->total)
1238     return GNUNET_NO;
1239   plugin = nrc->plugin;
1240   hashSize = sizeof (GNUNET_HashCode);
1241   if (++gc->off >= gc->total)
1242     gc->off = 0;
1243 #if DEBUG_MYSQL
1244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1245               "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
1246               gc->count+1,
1247               gc->total,
1248               gc->off,
1249               GNUNET_h2s (&gc->key));  
1250 #endif
1251   if (nrc->type != 0)
1252     {
1253       if (gc->have_vhash)
1254         {
1255           ret = prepared_statement_run_select (plugin,
1256                                                plugin->select_entry_by_hash_vhash_and_type, 
1257                                                7, nrc->rbind, 
1258                                                &return_ok, NULL, 
1259                                                MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1260                                                MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1261                                                MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1262                                                MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1263                                                -1);
1264         }
1265       else
1266         {
1267           ret =
1268             prepared_statement_run_select (plugin,
1269                                            plugin->select_entry_by_hash_and_type, 
1270                                            7, nrc->rbind, 
1271                                            &return_ok, NULL,
1272                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1273                                            MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, 
1274                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1275                                            -1);
1276         }
1277     }
1278   else
1279     {
1280       if (gc->have_vhash)
1281         {
1282           fprintf (stderr,
1283                    "Select by key `%s'\n",
1284                    GNUNET_h2s (&gc->key));
1285           fprintf (stderr,
1286                    "Select by vhash `%s'\n",
1287                    GNUNET_h2s (&gc->vhash));
1288           ret =
1289             prepared_statement_run_select (plugin,
1290                                            plugin->select_entry_by_hash_and_vhash, 
1291                                            7, nrc->rbind, 
1292                                            &return_ok, NULL,
1293                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, 
1294                                            MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, 
1295                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1296                                            -1);
1297         }
1298       else
1299         {
1300           ret =
1301             prepared_statement_run_select (plugin,
1302                                            plugin->select_entry_by_hash, 
1303                                            7, nrc->rbind, 
1304                                            &return_ok, NULL,
1305                                            MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1306                                            MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, 
1307                                            -1);
1308         }
1309     }
1310   gc->count++;
1311   return ret;
1312 }
1313
1314
1315 /**
1316  * Iterate over the results for a particular key in the datastore.
1317  *
1318  * @param cls closure
1319  * @param key maybe NULL (to match all entries)
1320  * @param vhash hash of the value, maybe NULL (to
1321  *        match all values that have the right key).
1322  *        Note that for DBlocks there is no difference
1323  *        betwen key and vhash, but for other blocks
1324  *        there may be!
1325  * @param type entries of which type are relevant?
1326  *        Use 0 for any type.
1327  * @param iter function to call on each matching value;
1328  *        will be called once with a NULL value at the end
1329  * @param iter_cls closure for iter
1330  */
1331 static void
1332 mysql_plugin_get (void *cls,
1333                   const GNUNET_HashCode *key,
1334                   const GNUNET_HashCode *vhash,
1335                   enum GNUNET_BLOCK_Type type,
1336                   PluginIterator iter, void *iter_cls)
1337 {
1338   struct Plugin *plugin = cls;
1339   unsigned int itype = type;
1340   int ret;
1341   MYSQL_BIND cbind[1];
1342   struct GetContext *gc;
1343   struct NextRequestClosure *nrc;
1344   long long total;
1345   unsigned long hashSize;
1346   unsigned long hashSize2;
1347
1348   GNUNET_assert (key != NULL);
1349   if (iter == NULL) 
1350     return;
1351   hashSize = sizeof (GNUNET_HashCode);
1352   hashSize2 = sizeof (GNUNET_HashCode);
1353   memset (cbind, 0, sizeof (cbind));
1354   total = -1;
1355   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1356   cbind[0].buffer = &total;
1357   cbind[0].is_unsigned = GNUNET_NO;
1358   if (type != 0)
1359     {
1360       if (vhash != NULL)
1361         {
1362           ret =
1363             prepared_statement_run_select (plugin,
1364                                            plugin->count_entry_by_hash_vhash_and_type, 
1365                                            1, cbind, 
1366                                            &return_ok, NULL,
1367                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1368                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1369                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1370                                            -1);
1371         }
1372       else
1373         {
1374           ret =
1375             prepared_statement_run_select (plugin,
1376                                            plugin->count_entry_by_hash_and_type, 
1377                                            1, cbind, 
1378                                            &return_ok, NULL,
1379                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1380                                            MYSQL_TYPE_LONG, &itype, GNUNET_YES,
1381                                            -1);
1382         }
1383     }
1384   else
1385     {
1386       if (vhash != NULL)
1387         {
1388           fprintf (stderr,
1389                    "Count by key `%s'\n",
1390                    GNUNET_h2s (key));
1391           fprintf (stderr,
1392                    "Count by vhash `%s'\n",
1393                    GNUNET_h2s (vhash));
1394           ret =
1395             prepared_statement_run_select (plugin,
1396                                            plugin->count_entry_by_hash_and_vhash, 
1397                                            1, cbind,
1398                                            &return_ok, NULL,
1399                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1400                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1401                                            -1);
1402
1403         }
1404       else
1405         {
1406           ret =
1407             prepared_statement_run_select (plugin,
1408                                            plugin->count_entry_by_hash,
1409                                            1, cbind, 
1410                                            &return_ok, NULL, 
1411                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1412                                            -1);
1413         }
1414     }
1415   fprintf (stderr,
1416            "Got %u results (ret: %d / `%s')\n",
1417            (unsigned int) total,
1418            ret,
1419            mysql_error (plugin->dbf));
1420
1421   if ((ret != GNUNET_OK) || (0 >= total))
1422     {
1423       iter (iter_cls, 
1424             NULL, NULL, 0, NULL, 0, 0, 0, 
1425             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1426       return;
1427     }
1428 #if DEBUG_MYSQL
1429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1430               "Iterating over %lld results for GET `%s'\n",
1431               total,
1432               GNUNET_h2s (key));
1433 #endif
1434   gc = GNUNET_malloc (sizeof (struct GetContext));
1435   gc->key = *key;
1436   if (vhash != NULL)
1437     {
1438       gc->have_vhash = GNUNET_YES;
1439       gc->vhash = *vhash;
1440     }
1441   gc->total = total;
1442   gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1443   
1444
1445   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1446   nrc->plugin = plugin;
1447   nrc->type = type;  
1448   nrc->dviter = iter;
1449   nrc->dviter_cls = iter_cls;
1450   nrc->prep = &get_statement_prepare;
1451   nrc->prep_cls = gc;
1452   mysql_plugin_next_request (nrc, GNUNET_NO);
1453 }
1454
1455
1456 /**
1457  * Run the prepared statement to get the next data item ready.
1458  * 
1459  * @param cls not used
1460  * @param nrc closure for the next request iterator
1461  * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1462  */
1463 static int
1464 iterator_zero_prepare (void *cls,
1465                        struct NextRequestClosure *nrc)
1466 {
1467   struct Plugin *plugin;
1468   int ret;
1469
1470   if (nrc == NULL)
1471     return GNUNET_NO;
1472   plugin = nrc->plugin;
1473   ret = prepared_statement_run_select (plugin,
1474                                        plugin->zero_iter,
1475                                        7, nrc->rbind,
1476                                        &return_ok, NULL,
1477                                        MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
1478                                        -1);
1479   nrc->count++;
1480   return ret;
1481 }
1482
1483
1484 /**
1485  * Select a subset of the items in the datastore and call
1486  * the given iterator for each of them.
1487  *
1488  * @param cls our "struct Plugin*"
1489  * @param type entries of which type should be considered?
1490  *        Use 0 for any type.
1491  * @param iter function to call on each matching value;
1492  *        will be called once with a NULL value at the end
1493  * @param iter_cls closure for iter
1494  */
1495 static void
1496 mysql_plugin_iter_zero_anonymity (void *cls,
1497                                   enum GNUNET_BLOCK_Type type,
1498                                   PluginIterator iter,
1499                                   void *iter_cls)
1500 {
1501   struct Plugin *plugin = cls;
1502   struct NextRequestClosure *nrc;
1503
1504   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1505   nrc->plugin = plugin;
1506   nrc->type = type;  
1507   nrc->dviter = iter;
1508   nrc->dviter_cls = iter_cls;
1509   nrc->prep = &iterator_zero_prepare;
1510   mysql_plugin_next_request (nrc, GNUNET_NO);
1511 }
1512
1513
1514 /**
1515  * Run the SELECT statement for the replication function.
1516  * 
1517  * @param cls the 'struct Plugin'
1518  * @param nrc the context (not used)
1519  */
1520 static int
1521 replication_prepare (void *cls,
1522                      struct NextRequestClosure *nrc)
1523 {
1524   struct Plugin *plugin = cls;
1525
1526   return prepared_statement_run_select (plugin,
1527                                         plugin->select_replication, 
1528                                         7, nrc->rbind, 
1529                                         &return_ok, NULL,
1530                                         -1);
1531 }
1532
1533
1534
1535 /**
1536  * Context for 'repl_iter' function.
1537  */
1538 struct ReplCtx
1539 {
1540   
1541   /**
1542    * Plugin handle.
1543    */
1544   struct Plugin *plugin;
1545   
1546   /**
1547    * Function to call for the result (or the NULL).
1548    */
1549   PluginIterator iter;
1550   
1551   /**
1552    * Closure for iter.
1553    */
1554   void *iter_cls;
1555 };
1556
1557
1558 /**
1559  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1560  * Decrements the replication counter and calls the original
1561  * iterator.
1562  *
1563  * @param cls closure
1564  * @param next_cls closure to pass to the "next" function.
1565  * @param key key for the content
1566  * @param size number of bytes in data
1567  * @param data content stored
1568  * @param type type of the content
1569  * @param priority priority of the content
1570  * @param anonymity anonymity-level for the content
1571  * @param expiration expiration time for the content
1572  * @param uid unique identifier for the datum;
1573  *        maybe 0 if no unique identifier is available
1574  *
1575  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1576  *         (continue on call to "next", of course),
1577  *         GNUNET_NO to delete the item and continue (if supported)
1578  */
1579 static int
1580 repl_iter (void *cls,
1581            void *next_cls,
1582            const GNUNET_HashCode *key,
1583            uint32_t size,
1584            const void *data,
1585            enum GNUNET_BLOCK_Type type,
1586            uint32_t priority,
1587            uint32_t anonymity,
1588            struct GNUNET_TIME_Absolute expiration, 
1589            uint64_t uid)
1590 {
1591   struct ReplCtx *rc = cls;
1592   struct Plugin *plugin = rc->plugin;
1593   unsigned long long oid;
1594   int ret;
1595
1596   ret = rc->iter (rc->iter_cls,
1597                   next_cls, key,
1598                   size, data, 
1599                   type, priority, anonymity, expiration,
1600                   uid);
1601   if (NULL != key)
1602     {
1603       oid = (unsigned long long) uid;
1604       ret = prepared_statement_run (plugin,
1605                                     plugin->dec_repl,
1606                                     NULL,
1607                                     MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
1608                                     -1);
1609       if (ret == GNUNET_SYSERR)
1610         {
1611           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1612                       "Failed to reduce replication counter\n");
1613           return GNUNET_SYSERR;
1614         }
1615     }
1616   return ret;
1617 }
1618
1619
1620 /**
1621  * Get a random item for replication.  Returns a single, not expired, random item
1622  * from those with the highest replication counters.  The item's 
1623  * replication counter is decremented by one IF it was positive before.
1624  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1625  *
1626  * @param cls closure
1627  * @param iter function to call the value (once only).
1628  * @param iter_cls closure for iter
1629  */
1630 static void
1631 mysql_plugin_replication_get (void *cls,
1632                               PluginIterator iter, void *iter_cls)
1633 {
1634   struct Plugin *plugin = cls;
1635   struct NextRequestClosure *nrc;
1636   struct ReplCtx rc;
1637   
1638   rc.plugin = plugin;
1639   rc.iter = iter;
1640   rc.iter_cls = iter_cls;
1641   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1642   nrc->plugin = plugin;
1643   nrc->now = GNUNET_TIME_absolute_get ();
1644   nrc->prep = &replication_prepare;
1645   nrc->prep_cls = plugin;
1646   nrc->type = 0;
1647   nrc->dviter = &repl_iter;
1648   nrc->dviter_cls = &rc;
1649   nrc->end_it = GNUNET_NO;
1650   nrc->one_shot = GNUNET_YES;
1651   mysql_next_request_cont (nrc, NULL);
1652 }
1653
1654
1655 /**
1656  * Run the SELECT statement for the expiration function.
1657  * 
1658  * @param cls the 'struct Plugin'
1659  * @param nrc the context (not used)
1660  * @return GNUNET_OK on success, GNUNET_NO if there are
1661  *         no more values, GNUNET_SYSERR on error
1662  */
1663 static int
1664 expiration_prepare (void *cls,
1665                     struct NextRequestClosure *nrc)
1666 {
1667   struct Plugin *plugin = cls;
1668   long long nt;
1669
1670   if (NULL == nrc)
1671     return GNUNET_NO;
1672   nt = (long long) nrc->now.abs_value;
1673   return prepared_statement_run_select
1674     (plugin,
1675      plugin->select_expiration, 
1676      7, nrc->rbind, 
1677      &return_ok, NULL,
1678      MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1679      -1);
1680 }
1681
1682
1683 /**
1684  * Get a random item for expiration.
1685  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1686  *
1687  * @param cls closure
1688  * @param iter function to call the value (once only).
1689  * @param iter_cls closure for iter
1690  */
1691 static void
1692 mysql_plugin_expiration_get (void *cls,
1693                              PluginIterator iter, void *iter_cls)
1694 {
1695   struct Plugin *plugin = cls;
1696   struct NextRequestClosure *nrc;
1697
1698   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1699   nrc->plugin = plugin;
1700   nrc->now = GNUNET_TIME_absolute_get ();
1701   nrc->prep = &expiration_prepare;
1702   nrc->prep_cls = plugin;
1703   nrc->type = 0;
1704   nrc->dviter = iter;
1705   nrc->dviter_cls = iter_cls;
1706   nrc->end_it = GNUNET_NO;
1707   nrc->one_shot = GNUNET_YES;
1708   mysql_next_request_cont (nrc, NULL);
1709 }
1710
1711
1712 /**
1713  * Drop database.
1714  *
1715  * @param cls the "struct Plugin*"
1716  */
1717 static void 
1718 mysql_plugin_drop (void *cls)
1719 {
1720   struct Plugin *plugin = cls;
1721
1722   if (GNUNET_OK != run_statement (plugin,
1723                                   "DROP TABLE gn090"))
1724     return;           /* error */
1725   plugin->env->duc (plugin->env->cls, 0);
1726 }
1727
1728
1729 /**
1730  * Entry point for the plugin.
1731  *
1732  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1733  * @return our "struct Plugin*"
1734  */
1735 void *
1736 libgnunet_plugin_datastore_mysql_init (void *cls)
1737 {
1738   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1739   struct GNUNET_DATASTORE_PluginFunctions *api;
1740   struct Plugin *plugin;
1741
1742   plugin = GNUNET_malloc (sizeof (struct Plugin));
1743   plugin->env = env;
1744   plugin->cnffile = get_my_cnf_path (env->cfg);
1745   if (GNUNET_OK != iopen (plugin))
1746     {
1747       iclose (plugin);
1748       GNUNET_free_non_null (plugin->cnffile);
1749       GNUNET_free (plugin);
1750       return NULL;
1751     }
1752 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1753 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1754   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1755              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1756              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1757              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1758              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1759              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1760              " hash BINARY(64) NOT NULL DEFAULT '',"
1761              " vhash BINARY(64) NOT NULL DEFAULT '',"
1762              " value BLOB NOT NULL DEFAULT '',"
1763              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1764              " PRIMARY KEY (uid),"
1765              " INDEX idx_hash (hash(64)),"
1766              " INDEX idx_hash_uid (hash(64),uid),"
1767              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1768              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1769              " INDEX idx_prio (prio),"
1770              " INDEX idx_repl (repl),"
1771              " INDEX idx_expire_prio (expire,prio),"
1772              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1773              ") ENGINE=InnoDB") ||
1774       MRUNS ("SET AUTOCOMMIT = 1") ||
1775       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1776       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1777       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1778       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1779       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1780       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1781                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1782       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1783       || PINIT (plugin->get_size, SELECT_SIZE)
1784       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1785       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1786       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1787                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1788       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1789       || PINIT (plugin->dec_repl, DEC_REPL)
1790       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1791       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1792       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1793     {
1794       iclose (plugin);
1795       GNUNET_free_non_null (plugin->cnffile);
1796       GNUNET_free (plugin);
1797       return NULL;
1798     }
1799 #undef PINIT
1800 #undef MRUNS
1801
1802   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1803   api->cls = plugin;
1804   api->get_size = &mysql_plugin_get_size;
1805   api->put = &mysql_plugin_put;
1806   api->next_request = &mysql_plugin_next_request;
1807   api->get = &mysql_plugin_get;
1808   api->replication_get = &mysql_plugin_replication_get;
1809   api->expiration_get = &mysql_plugin_expiration_get;
1810   api->update = &mysql_plugin_update;
1811   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1812   api->drop = &mysql_plugin_drop;
1813   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1814                    "mysql", _("Mysql database running\n"));
1815   return api;
1816 }
1817
1818
1819 /**
1820  * Exit point from the plugin.
1821  * @param cls our "struct Plugin*"
1822  * @return always NULL
1823  */
1824 void *
1825 libgnunet_plugin_datastore_mysql_done (void *cls)
1826 {
1827   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1828   struct Plugin *plugin = api->cls;
1829
1830   iclose (plugin);
1831   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1832     {
1833       GNUNET_SCHEDULER_cancel (plugin->next_task);
1834       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1835       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1836       GNUNET_free (plugin->next_task_nc);
1837       plugin->next_task_nc = NULL;
1838     }
1839   GNUNET_free_non_null (plugin->cnffile);
1840   GNUNET_free (plugin);
1841   GNUNET_free (api);
1842   mysql_library_end ();
1843   return NULL;
1844 }
1845
1846 /* end of plugin_datastore_mysql.c */