(no commit message)
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  *    <pre>
52  *
53  *     sqstore = "sqstore_mysql"
54  *
55  *    </pre>
56  * 2) Then access mysql as root,
57  *    <pre>
58  *
59  *    $ mysql -u root -p
60  *
61  *    </pre>
62  *    and do the following. [You should replace $USER with the username
63  *    that will be running the gnunetd process].
64  *    <pre>
65  *
66       CREATE DATABASE gnunet;
67       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
68          ON gnunet.* TO $USER@localhost;
69       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
70       FLUSH PRIVILEGES;
71  *
72  *    </pre>
73  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
74  *    with the following lines
75  *    <pre>
76
77       [client]
78       user=$USER
79       password=$the_password_you_like
80
81  *    </pre>
82  *
83  * Thats it. Note that .my.cnf file is a security risk unless its on
84  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
85  * link. Even greater security risk can be achieved by setting no
86  * password for $USER.  Luckily $USER has only priviledges to mess
87  * up GNUnet's tables, nothing else (unless you give him more,
88  * of course).<p>
89  *
90  * 4) Still, perhaps you should briefly try if the DB connection
91  *    works. First, login as $USER. Then use,
92  *
93  *    <pre>
94  *    $ mysql -u $USER -p $the_password_you_like
95  *    mysql> use gnunet;
96  *    </pre>
97  *
98  *    If you get the message &quot;Database changed&quot; it probably works.
99  *
100  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
101  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
102  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
103  *     so there may be some additional trouble depending on your mysql setup.]
104  *
105  * REPAIRING TABLES
106  *
107  * - Its probably healthy to check your tables for inconsistencies
108  *   every now and then.
109  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
110  *   databases have been corrupted.
111  * - The tables can be verified/fixed in two ways;
112  *   1) by running mysqlcheck -A, or
113  *   2) by executing (inside of mysql using the GNUnet database):
114  *   mysql> REPAIR TABLE gn080;
115  *   mysql> REPAIR TABLE gn072;
116  *
117  * PROBLEMS?
118  *
119  * If you have problems related to the mysql module, your best
120  * friend is probably the mysql manual. The first thing to check
121  * is that mysql is basically operational, that you can connect
122  * to it, create tables, issue queries etc.
123  *
124  */
125
126 #include "platform.h"
127 #include "plugin_datastore.h"
128
129 #define DEBUG_MYSQL GNUNET_NO
130
131 #define MAX_DATUM_SIZE 65536
132
133 /**
134  * Maximum number of supported parameters for a prepared
135  * statement.  Increase if needed.
136  */
137 #define MAX_PARAM 16
138
139 /**
140  * Die with an error message that indicates
141  * a failure of the command 'cmd' with the message given
142  * by strerror(errno).
143  */
144 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
145
146 /**
147  * Log an error message at log-level 'level' that indicates
148  * a failure of the command 'cmd' on file 'filename'
149  * with the message given by strerror(errno).
150  */
151 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
152
153
154 /* warning, slighly crazy mysql statements ahead.  Essentially, MySQL does not handle
155    "OR" very well, so we need to use UNION instead.  And UNION does not
156    automatically apply a LIMIT on the outermost clause, so we need to
157    repeat ourselves quite a bit.  All hail the performance gods (and thanks
158    to #mysql on freenode) */
159 #define SELECT_IT_LOW_PRIORITY "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
160                                "ORDER BY prio ASC,vkey ASC LIMIT 1) "\
161                                "UNION "\
162                                "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
163                                "ORDER BY prio ASC,vkey ASC LIMIT 1)"\
164                                "ORDER BY prio ASC,vkey ASC LIMIT 1"
165
166 #define SELECT_IT_NON_ANONYMOUS "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
167                                 " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
168                                 "UNION "\
169                                 "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
170                                 " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
171                                 "ORDER BY prio DESC,vkey DESC LIMIT 1"
172
173 #define SELECT_IT_EXPIRATION_TIME "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
174                                   "ORDER BY expire ASC,vkey ASC LIMIT 1) "\
175                                   "UNION "\
176                                   "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
177                                   "ORDER BY expire ASC,vkey ASC LIMIT 1)"\
178                                   "ORDER BY expire ASC,vkey ASC LIMIT 1"
179
180
181 #define SELECT_IT_MIGRATION_ORDER "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
182                                   " AND expire > ? AND type!=3"\
183                                   " ORDER BY expire DESC,vkey DESC LIMIT 1) "\
184                                   "UNION "\
185                                   "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
186                                   " AND expire > ? AND type!=3"\
187                                   " ORDER BY expire DESC,vkey DESC LIMIT 1)"\
188                                   "ORDER BY expire DESC,vkey DESC LIMIT 1"
189
190 #define SELECT_SIZE "SELECT sum(size) FROM gn080"
191
192
193 struct GNUNET_MysqlStatementHandle
194 {
195   struct GNUNET_MysqlStatementHandle *next;
196
197   struct GNUNET_MysqlStatementHandle *prev;
198
199   char *query;
200
201   MYSQL_STMT *statement;
202
203   int valid;
204
205 };
206
207
208 /**
209  * Context for all functions in this plugin.
210  */
211 struct Plugin 
212 {
213   /**
214    * Our execution environment.
215    */
216   struct GNUNET_DATASTORE_PluginEnvironment *env;
217
218   MYSQL *dbf;
219   
220   struct GNUNET_MysqlStatementHandle *shead;
221
222   struct GNUNET_MysqlStatementHandle *stail;
223
224   /**
225    * Filename of "my.cnf" (msyql configuration).
226    */
227   char *cnffile;
228
229   /**
230    * Statements dealing with gn072 table 
231    */
232 #define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?"
233   struct GNUNET_MysqlStatementHandle *select_value;
234
235 #define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"o
236   struct GNUNET_MysqlStatementHandle *delete_value;
237
238 #define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)"
239   struct GNUNET_MysqlStatementHandle *insert_value;
240
241   /**
242    * Statements dealing with gn080 table 
243    */
244 #define INSERT_ENTRY "INSERT INTO gn080 (size,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)"
245   struct GNUNET_MysqlStatementHandle *insert_entry;
246   
247 #define DELETE_ENTRY_BY_VKEY "DELETE FROM gn080 WHERE vkey=?"
248   struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey;
249   
250 #define SELECT_ENTRY_BY_HASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
251   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
252   
253 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
254   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
255
256 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
257   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
258   
259 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?"
260   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
261   
262 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=?"
263   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
264
265 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?"
266   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
267
268 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=? AND type=?"
269   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
270
271 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?"
272   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
273
274 #define UPDATE_ENTRY "UPDATE gn080 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
275   struct GNUNET_MysqlStatementHandle *update_entry;
276
277   struct GNUNET_MysqlStatementHandle *iter[4];
278
279   //static unsigned int stat_size;
280
281   /**
282    * Size of the mysql database on disk.
283    */
284   unsigned long long content_size;
285
286 };
287
288
289 /**
290  * Obtain the location of ".my.cnf".
291  * @return NULL on error
292  */
293 static char *
294 get_my_cnf_path (struct GNUNET_ConfigurationHandle *cfg)
295 {
296   char *cnffile;
297   char *home_dir;
298   struct stat st;
299 #ifndef WINDOWS
300   struct passwd *pw;
301 #endif
302
303 #ifndef WINDOWS
304   pw = getpwuid (getuid ());
305   if (!pw)
306     {
307       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
308                            "getpwuid");
309       return NULL;
310     }
311   home_dir = GNUNET_strdup (pw->pw_dir);
312 #else
313   home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
314   plibc_conv_to_win_path ("~/", home_dir);
315 #endif
316   GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
317   GNUNET_free (home_dir);
318   GNUNET_CONFIUGRATION_get_value_filename (cfg,
319                                            "MYSQL", "CONFIG", cnffile,
320                                            &home_dir);
321   GNUNET_free (cnffile);
322   cnffile = home_dir;
323   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
324               _("Trying to use file `%s' for MySQL configuration.\n"),
325               cnffile);
326   if ((0 != STAT (cnffile, &st)) ||
327       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
328     {
329       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
330                   _("Could not access file `%s': %s\n"), cnffile,
331                   STRERROR (errno));
332       GNUNET_free (cnffile);
333       return NULL;
334     }
335   return cnffile;
336 }
337
338
339 /**
340  * Close database connection and all prepared statements (we got a DB
341  * disconnect error).
342  */
343 static int
344 iclose (struct Plugin *plugin)
345 {
346   struct GNUNET_MysqlStatementHandle *spos;
347
348   spos = plugin->shead;
349   while (spos != NULL)
350     {
351       if (spos->statement != NULL)
352         {
353           mysql_stmt_close (spos->statement);
354           spos->statement = NULL;
355         }
356       spos->valid = GNUNET_NO;
357       spos = spos->next;
358     }
359   if (plugin->dbf != NULL)
360     {
361       mysql_close (plugin->dbf);
362       plugin->dbf = NULL;
363     }
364   return GNUNET_OK;
365 }
366
367
368 /**
369  * Open the connection with the database (and initialize
370  * our default options).
371  *
372  * @return GNUNET_OK on success
373  */
374 static int
375 iopen (struct Plugin *ret)
376 {
377   char *mysql_dbname;
378   char *mysql_server;
379   char *mysql_user;
380   char *mysql_password;
381   unsigned long long mysql_port;
382   my_bool reconnect;
383   unsigned int timeout;
384
385   ret->dbf = mysql_init (NULL);
386   if (ret->dbf == NULL)
387     return GNUNET_SYSERR;
388   if (ret->cnffile != NULL)
389     mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
390   mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
391   reconnect = 0;
392   mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
393   mysql_options (ret->dbf,
394                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
395   timeout = 60; /* in seconds */
396   mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
397   mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
398   mysql_dbname = NULL;
399   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
400                                                      "MYSQL", "DATABASE"))
401     GNUNET_assert (GNUNET_OK == 
402                    GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
403                                                           "MYSQL", "DATABASE", 
404                                                           &mysql_dbname));
405   else
406     mysql_dbname = GNUNET_strdup ("gnunet");
407   mysql_user = NULL;
408   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
409                                                      "MYSQL", "USER"))
410     {
411       GNUNET_break (GNUNET_OK == 
412                     GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
413                                                            "MYSQL", "USER", 
414                                                            &mysql_user));
415     }
416   mysql_password = NULL;
417   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
418                                                      "MYSQL", "PASSWORD"))
419     {
420       GNUNET_break (GNUNET_OK ==
421                     GNUNET_CONFIGURATION_get_value_string (ret->cfg,
422                                                            "MYSQL", "PASSWORD",
423                                                            &mysql_password));
424     }
425   mysql_server = NULL;
426   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->cfg,
427                                                      "MYSQL", "HOST"))
428     {
429       GNUNET_break (GNUNET_OK == 
430                     GNUNET_CONFIGURATION_get_value_string (ret->cfg,
431                                                            "MYSQL", "HOST", "",
432                                                            &mysql_server));
433     }
434   mysql_port = 0;
435   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->cfg,
436                                                      "MYSQL", "PORT"))
437     {
438       GNUNET_break (GNUNET_OK ==
439                     GNUNET_CONFIGURATION_get_value_number (ret->cfg, "MYSQL",
440                                                            "PORT", &mysql_port));
441     }
442
443   GNUNET_assert (mysql_dbname != NULL);
444   mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password,
445                       mysql_dbname, (unsigned int) mysql_port, NULL, 0);
446   GNUNET_free (mysql_dbname);
447   if (mysql_error (ret->dbf)[0])
448     {
449       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
450                  "mysql_real_connect", ret);
451       return GNUNET_SYSERR;
452     }
453   ret->valid = GNUNET_YES;
454   return GNUNET_OK;
455 }
456
457
458 /**
459  * Run the given MySQL statement.
460  *
461  * @return GNUNET_OK on success, GNUNET_SYSERR on error
462  */
463 static int
464 run_statement (struct Plugin *plugin,
465                const char *statement)
466 {
467   if ((NULL == plugin->dbh) && (GNUNET_OK != iopen (plugin)))
468     return GNUNET_SYSERR;
469   mysql_query (plugin->dbf, statement);
470   if (mysql_error (plugin->dbf)[0])
471     {
472       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
473                  "mysql_query", plugin);
474       iclose (plugin);
475       return GNUNET_SYSERR;
476     }
477   return GNUNET_OK;
478 }
479
480
481 /**
482  * Run the given MySQL SELECT statement.  The statement
483  * must have only a single result (one column, one row).
484  *
485  * @return result on success, NULL on error
486  */
487 static char *
488 run_statement_select (struct Plugin *plugin,
489                       const char *statement)
490 {
491   MYSQL_RES *sql_res;
492   MYSQL_ROW sql_row;
493   char *ret;
494   
495   if ((NULL == plugin->dbh) && (GNUNET_OK != iopen (plugin)))
496     return NULL;
497   mysql_query (plugin->dbf, statement);
498   if ((mysql_error (plugin->dbf)[0]) ||
499       (!(sql_res = mysql_use_result (plugin->dbf))) ||
500       (!(sql_row = mysql_fetch_row (sql_res))))
501     {
502       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
503                  "mysql_query", plugin);
504       return NULL;
505     }
506   if ((mysql_num_fields (sql_res) != 1) || (sql_row[0] == NULL))
507     {
508       GNUNET_break (mysql_num_fields (sql_res) == 1);
509       if (sql_res != NULL)
510         mysql_free_result (sql_res);
511       return NULL;
512     }
513   ret = GNUNET_strdup (sql_row[0]);
514   mysql_free_result (sql_res);
515   return ret;
516 }
517
518
519 /**
520  * Create a prepared statement.
521  *
522  * @return NULL on error
523  */
524 static struct GNUNET_MysqlStatementHandle *
525 prepared_statement_create (struct Plugin *plugin, 
526                            const char *statement)
527 {
528   struct GNUNET_MysqlStatementHandle *ret;
529
530   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
531   ret->query = GNUNET_strdup (statement);
532   GNUNET_CONTAINER_DLL_insert (plugin->shead,
533                                plugin->stail,
534                                ret);
535   return ret;
536 }
537
538
539 /**
540  * Prepare a statement for running.
541  *
542  * @return GNUNET_OK on success
543  */
544 static int
545 prepare_statement (struct Plugin *plugin, 
546                    struct GNUNET_MysqlStatementHandle *ret)
547 {
548   if (GNUNET_YES == ret->valid)
549     return GNUNET_OK;
550   if ((NULL == plugin->dbh) && 
551       (GNUNET_OK != iopen (plugin)))
552     return GNUNET_SYSERR;
553   ret->statement = mysql_stmt_init (plugin->dbf);
554   if (ret->statement == NULL)
555     {
556       iclose (plugin);
557       return GNUNET_SYSERR;
558     }
559   if (mysql_stmt_prepare (ret->statement, 
560                           ret->query,
561                           strlen (ret->query)))
562     {
563       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
564                  "mysql_stmt_prepare", 
565                  plugin);
566       mysql_stmt_close (ret->statement);
567       ret->statement = NULL;
568       iclose (plugin);
569       return GNUNET_SYSERR;
570     }
571   ret->valid = GNUNET_YES;
572   return GNUNET_OK;
573 }
574
575
576 /**
577  * Free a prepared statement.
578  */
579 static void
580 prepared_statement_destroy (struct Plugin *plugin, 
581                             struct GNUNET_MysqlStatementHandle
582                             *s)
583 {
584   GNUNET_CONTAINER_DLL_remove (plugin->shead,
585                                plugin->stail,
586                                s);
587   if (s->valid)
588     mysql_stmt_close (s->statement);
589   GNUNET_free (s->query);
590   GNUNET_free (s);
591 }
592
593
594 /**
595  * Bind the parameters for the given MySQL statement
596  * and run it.
597  *
598  * @param s statement to bind and run
599  * @param ap arguments for the binding
600  * @return GNUNET_SYSERR on error, GNUNET_OK on success
601  */
602 static int
603 init_params (struct Plugin *plugin,
604              struct GNUNET_MysqlStatementHandle *s,
605              va_list ap)
606 {
607   MYSQL_BIND qbind[MAX_PARAM];
608   unsigned int pc;
609   unsigned int off;
610   enum enum_field_types ft;
611
612   pc = mysql_stmt_param_count (s->statement);
613   if (pc > MAX_PARAM)
614     {
615       /* increase internal constant! */
616       GNUNET_break (0);
617       return GNUNET_SYSERR;
618     }
619   memset (qbind, 0, sizeof (qbind));
620   off = 0;
621   ft = 0;
622   while ((pc > 0) && (-1 != (ft = va_arg (ap, enum enum_field_types))))
623     {
624       qbind[off].buffer_type = ft;
625       switch (ft)
626         {
627         case MYSQL_TYPE_FLOAT:
628           qbind[off].buffer = va_arg (ap, float *);
629           break;
630         case MYSQL_TYPE_LONGLONG:
631           qbind[off].buffer = va_arg (ap, unsigned long long *);
632           qbind[off].is_unsigned = va_arg (ap, int);
633           break;
634         case MYSQL_TYPE_LONG:
635           qbind[off].buffer = va_arg (ap, unsigned int *);
636           qbind[off].is_unsigned = va_arg (ap, int);
637           break;
638         case MYSQL_TYPE_VAR_STRING:
639         case MYSQL_TYPE_STRING:
640         case MYSQL_TYPE_BLOB:
641           qbind[off].buffer = va_arg (ap, void *);
642           qbind[off].buffer_length = va_arg (ap, unsigned long);
643           qbind[off].length = va_arg (ap, unsigned long *);
644           break;
645         default:
646           /* unsupported type */
647           GNUNET_break (0);
648           return GNUNET_SYSERR;
649         }
650       pc--;
651       off++;
652     }
653   if (!((pc == 0) && (ft != -1) && (va_arg (ap, int) == -1)))
654     {
655       GNUNET_break (0);
656       return GNUNET_SYSERR;
657     }
658   if (mysql_stmt_bind_param (s->statement, qbind))
659     {
660       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
661                   _("`%s' failed at %s:%d with error: %s\n"),
662                   "mysql_stmt_bind_param",
663                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
664       iclose (plugin);
665       return GNUNET_SYSERR;
666     }
667   if (mysql_stmt_execute (s->statement))
668     {
669       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
670                   _("`%s' failed at %s:%d with error: %s\n"),
671                   "mysql_stmt_execute",
672                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
673       iclose (plugin);
674       return GNUNET_SYSERR;
675     }
676   return GNUNET_OK;
677 }
678
679 /**
680  * Type of a callback that will be called for each
681  * data set returned from MySQL.
682  *
683  * @param cls user-defined argument
684  * @param num_values number of elements in values
685  * @param values values returned by MySQL
686  * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
687  */
688 typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
689                                           unsigned int num_values,
690                                           MYSQL_BIND * values);
691
692
693 /**
694  * Run a prepared SELECT statement.
695  *
696  * @param result_size number of elements in results array
697  * @param results pointer to already initialized MYSQL_BIND
698  *        array (of sufficient size) for passing results
699  * @param processor function to call on each result
700  * @param processor_cls extra argument to processor
701  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
702  *        values (size + buffer-reference for pointers); terminated
703  *        with "-1"
704  * @return GNUNET_SYSERR on error, otherwise
705  *         the number of successfully affected (or queried) rows
706  */
707 static int
708 prepared_statement_run_select (struct Plugin *plugin,
709                                struct GNUNET_MysqlStatementHandle
710                                *s,
711                                unsigned int result_size,
712                                MYSQL_BIND * results,
713                                GNUNET_MysqlDataProcessor
714                                processor, void *processor_cls,
715                                ...)
716 {
717   va_list ap;
718   int ret;
719   unsigned int rsize;
720   int total;
721
722   if (GNUNET_OK != prepare_statement (plugin, s))
723     {
724       GNUNET_break (0);
725       return GNUNET_SYSERR;
726     }
727   va_start (ap, processor_cls);
728   if (GNUNET_OK != init_params (plugin, s, ap))
729     {
730       GNUNET_break (0);
731       va_end (ap);
732       return GNUNET_SYSERR;
733     }
734   va_end (ap);
735   rsize = mysql_stmt_field_count (s->statement);
736   if (rsize > result_size)
737     {
738       GNUNET_break (0);
739       return GNUNET_SYSERR;
740     }
741   if (mysql_stmt_bind_result (s->statement, results))
742     {
743       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
744                   _("`%s' failed at %s:%d with error: %s\n"),
745                   "mysql_stmt_bind_result",
746                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
747       iclose (plugin);
748       return GNUNET_SYSERR;
749     }
750
751   total = 0;
752   while (1)
753     {
754       ret = mysql_stmt_fetch (s->statement);
755       if (ret == MYSQL_NO_DATA)
756         break;
757       if (ret != 0)
758         {
759           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
760                       _("`%s' failed at %s:%d with error: %s\n"),
761                       "mysql_stmt_fetch",
762                       __FILE__, __LINE__, mysql_stmt_error (s->statement));
763           iclose (plugin);
764           return GNUNET_SYSERR;
765         }
766       if (processor != NULL)
767         if (GNUNET_OK != processor (processor_cls, rsize, results))
768           break;
769       total++;
770     }
771   mysql_stmt_reset (s->statement);
772   return total;
773 }
774
775
776 /**
777  * Run a prepared statement that does NOT produce results.
778  *
779  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
780  *        values (size + buffer-reference for pointers); terminated
781  *        with "-1"
782  * @param insert_id NULL or address where to store the row ID of whatever
783  *        was inserted (only for INSERT statements!)
784  * @return GNUNET_SYSERR on error, otherwise
785  *         the number of successfully affected rows
786  */
787 static int
788 prepared_statement_run (struct Plugin *plugin,
789                         struct GNUNET_MysqlStatementHandle *s,
790                         unsigned long long *insert_id, ...)
791 {
792   va_list ap;
793   int affected;
794
795   if (GNUNET_OK != prepare_statement (plugin, s))
796     return GNUNET_SYSERR;
797   va_start (ap, insert_id);
798   if (GNUNET_OK != init_params (plugin, s, ap))
799     {
800       va_end (ap);
801       return GNUNET_SYSERR;
802     }
803   va_end (ap);
804   affected = mysql_stmt_affected_rows (s->statement);
805   if (NULL != insert_id)
806     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
807   mysql_stmt_reset (s->statement);
808   return affected;
809 }
810
811
812 /**
813  * Delete an value from the gn072 table.
814  *
815  * @param vkey vkey identifying the value to delete
816  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
817  */
818 static int
819 do_delete_value (unsigned long long vkey)
820 {
821   int ret;
822
823   ret = GNUNET_MYSQL_prepared_statement_run (delete_value,
824                                              NULL,
825                                              MYSQL_TYPE_LONGLONG,
826                                              &vkey, GNUNET_YES, -1);
827   if (ret > 0)
828     ret = GNUNET_OK;
829   return ret;
830 }
831
832 /**
833  * Insert a value into the gn072 table.
834  *
835  * @param value the value to insert
836  * @param size size of the value
837  * @param vkey vkey identifying the value henceforth (set)
838  * @return GNUNET_OK on success, GNUNET_SYSERR on error
839  */
840 static int
841 do_insert_value (const void *value, unsigned int size,
842                  unsigned long long *vkey)
843 {
844   unsigned long length = size;
845
846   return GNUNET_MYSQL_prepared_statement_run (insert_value,
847                                               vkey,
848                                               MYSQL_TYPE_BLOB,
849                                               value, length, &length, -1);
850 }
851
852 /**
853  * Delete an entry from the gn080 table.
854  *
855  * @param vkey vkey identifying the entry to delete
856  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
857  */
858 static int
859 do_delete_entry_by_vkey (unsigned long long vkey)
860 {
861   int ret;
862
863   ret = GNUNET_MYSQL_prepared_statement_run (delete_entry_by_vkey,
864                                              NULL,
865                                              MYSQL_TYPE_LONGLONG,
866                                              &vkey, GNUNET_YES, -1);
867   if (ret > 0)
868     ret = GNUNET_OK;
869   return ret;
870 }
871
872 static int
873 return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values)
874 {
875   return GNUNET_OK;
876 }
877
878 /**
879  * Given a full (SELECT *) result set from gn080 table,
880  * assemble it into a GNUNET_DatastoreValue representation.
881  *
882  * Call *without* holding the lock, but while within
883  * mysql_thread_start/end.
884  *
885  * @param result location where mysql_stmt_fetch stored the results
886  * @return NULL on error
887  */
888 static GNUNET_DatastoreValue *
889 assembleDatum (MYSQL_BIND * result)
890 {
891   GNUNET_DatastoreValue *datum;
892   unsigned int contentSize;
893   unsigned int type;
894   unsigned int prio;
895   unsigned int level;
896   unsigned long long exp;
897   unsigned long long vkey;
898   unsigned long length;
899   MYSQL_BIND rbind[1];
900   int ret;
901
902   if ((result[0].buffer_type != MYSQL_TYPE_LONG) ||
903       (!result[0].is_unsigned) ||
904       (result[1].buffer_type != MYSQL_TYPE_LONG) ||
905       (!result[1].is_unsigned) ||
906       (result[2].buffer_type != MYSQL_TYPE_LONG) ||
907       (!result[2].is_unsigned) ||
908       (result[3].buffer_type != MYSQL_TYPE_LONG) ||
909       (!result[3].is_unsigned) ||
910       (result[4].buffer_type != MYSQL_TYPE_LONGLONG) ||
911       (!result[4].is_unsigned) ||
912       (result[5].buffer_type != MYSQL_TYPE_BLOB) ||
913       (result[5].buffer_length != sizeof (GNUNET_HashCode)) ||
914       (*result[5].length != sizeof (GNUNET_HashCode)) ||
915       (result[6].buffer_type != MYSQL_TYPE_LONGLONG) ||
916       (!result[6].is_unsigned))
917     {
918       GNUNET_break (0);
919       return NULL;              /* error */
920     }
921
922   contentSize = *(unsigned int *) result[0].buffer;
923   if (contentSize < sizeof (GNUNET_DatastoreValue))
924     return NULL;                /* error */
925   if (contentSize > GNUNET_MAX_BUFFER_SIZE)
926     {
927        GNUNET_break (0); /* far too big */
928        return NULL;
929     }
930   contentSize -= sizeof (GNUNET_DatastoreValue);
931   type = *(unsigned int *) result[1].buffer;
932   prio = *(unsigned int *) result[2].buffer;
933   level = *(unsigned int *) result[3].buffer;
934   exp = *(unsigned long long *) result[4].buffer;
935   vkey = *(unsigned long long *) result[6].buffer;
936   datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) + contentSize);
937   datum->size = htonl (contentSize + sizeof (GNUNET_DatastoreValue));
938   datum->type = htonl (type);
939   datum->priority = htonl (prio);
940   datum->anonymity_level = htonl (level);
941   datum->expiration_time = GNUNET_htonll (exp);
942
943   /* now do query on gn072 */
944   length = contentSize;
945   memset (rbind, 0, sizeof (rbind));
946   rbind[0].buffer_type = MYSQL_TYPE_BLOB;
947   rbind[0].buffer_length = contentSize;
948   rbind[0].length = &length;
949   rbind[0].buffer = &datum[1];
950   ret = GNUNET_MYSQL_prepared_statement_run_select (select_value,
951                                                     1,
952                                                     rbind,
953                                                     &return_ok,
954                                                     NULL,
955                                                     MYSQL_TYPE_LONGLONG,
956                                                     &vkey, GNUNET_YES, -1);
957   GNUNET_break (ret <= 1);     /* should only have one result! */
958   if (ret > 0)
959     ret = GNUNET_OK;
960   if ((ret != GNUNET_OK) ||
961       (rbind[0].buffer_length != contentSize) || (length != contentSize))
962     {
963       GNUNET_break (ret != 0); /* should have one result! */
964       GNUNET_break (length == contentSize);    /* length should match! */
965       GNUNET_break (rbind[0].buffer_length == contentSize);    /* length should be internally consistent! */
966       do_delete_value (vkey);
967       if (ret != 0)
968         do_delete_entry_by_vkey (vkey);
969       content_size -= ntohl (datum->size);
970       GNUNET_free (datum);
971       return NULL;
972     }
973   return datum;
974 }
975
976
977 /**
978  * Iterate over the items in the datastore
979  * using the given query to select and order
980  * the items.
981  *
982  * @param type entries of which type should be considered?
983  *        Use 0 for any type.
984  * @param iter never NULL
985  * @param is_asc are we using ascending order?
986  * @return the number of results, GNUNET_SYSERR if the
987  *   iter is non-NULL and aborted the iteration
988  */
989 static int
990 iterateHelper (struct Plugin *plugin,
991                unsigned int type,
992                int is_asc,
993                unsigned int iter_select, GNUNET_DatastoreValueIterator dviter,
994                void *closure)
995 {
996   GNUNET_DatastoreValue *datum;
997   int count;
998   int ret;
999   unsigned int last_prio;
1000   unsigned long long last_expire;
1001   unsigned long long last_vkey;
1002   unsigned int size;
1003   unsigned int rtype;
1004   unsigned int prio;
1005   unsigned int level;
1006   unsigned long long expiration;
1007   unsigned long long vkey;
1008   unsigned long hashSize;
1009   GNUNET_HashCode key;
1010   GNUNET_CronTime now;
1011   MYSQL_BIND rbind[7];
1012
1013   if (is_asc)
1014     {
1015       last_prio = 0;
1016       last_vkey = 0;
1017       last_expire = 0;
1018     }
1019   else
1020     {
1021       last_prio = 0x7FFFFFFFL;
1022       last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
1023       last_expire = 0x7FFFFFFFFFFFFFFFLL;       /* MySQL only supports 63 bits */
1024     }
1025   hashSize = sizeof (GNUNET_HashCode);
1026   memset (rbind, 0, sizeof (rbind));
1027   rbind[0].buffer_type = MYSQL_TYPE_LONG;
1028   rbind[0].buffer = &size;
1029   rbind[0].is_unsigned = 1;
1030   rbind[1].buffer_type = MYSQL_TYPE_LONG;
1031   rbind[1].buffer = &rtype;
1032   rbind[1].is_unsigned = 1;
1033   rbind[2].buffer_type = MYSQL_TYPE_LONG;
1034   rbind[2].buffer = &prio;
1035   rbind[2].is_unsigned = 1;
1036   rbind[3].buffer_type = MYSQL_TYPE_LONG;
1037   rbind[3].buffer = &level;
1038   rbind[3].is_unsigned = 1;
1039   rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
1040   rbind[4].buffer = &expiration;
1041   rbind[4].is_unsigned = 1;
1042   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1043   rbind[5].buffer = &key;
1044   rbind[5].buffer_length = hashSize;
1045   rbind[5].length = &hashSize;
1046   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1047   rbind[6].buffer = &vkey;
1048   rbind[6].is_unsigned = GNUNET_YES;
1049
1050   now = GNUNET_get_time ();
1051   count = 0;
1052   while (1)
1053     {
1054       switch (iter_select)
1055         {
1056         case 0:
1057         case 1:
1058           ret = prepared_statement_run_select (iter[iter_select],
1059                                                7,
1060                                                rbind,
1061                                                &return_ok,
1062                                                NULL,
1063                                                MYSQL_TYPE_LONG,
1064                                                &last_prio,
1065                                                GNUNET_YES,
1066                                                MYSQL_TYPE_LONGLONG,
1067                                                &last_vkey,
1068                                                GNUNET_YES,
1069                                                MYSQL_TYPE_LONG,
1070                                                &last_prio,
1071                                                GNUNET_YES,
1072                                                MYSQL_TYPE_LONGLONG,
1073                                                &last_vkey,
1074                                                GNUNET_YES, -1);
1075           break;
1076         case 2:
1077           ret = prepared_statement_run_select (iter[iter_select],
1078                                                7,
1079                                                rbind,
1080                                                &return_ok,
1081                                                NULL,
1082                                                MYSQL_TYPE_LONGLONG,
1083                                                &last_expire,
1084                                                GNUNET_YES,
1085                                                MYSQL_TYPE_LONGLONG,
1086                                                &last_vkey,
1087                                                GNUNET_YES,
1088                                                MYSQL_TYPE_LONGLONG,
1089                                                &last_expire,
1090                                                GNUNET_YES,
1091                                                MYSQL_TYPE_LONGLONG,
1092                                                &last_vkey,
1093                                                GNUNET_YES, -1);
1094           break;
1095         case 3:
1096           ret = prepared_statement_run_select (iter[iter_select],
1097                                                7,
1098                                                rbind,
1099                                                &return_ok,
1100                                                NULL,
1101                                                MYSQL_TYPE_LONGLONG,
1102                                                &last_expire,
1103                                                GNUNET_YES,
1104                                                MYSQL_TYPE_LONGLONG,
1105                                                &last_vkey,
1106                                                GNUNET_YES,
1107                                                MYSQL_TYPE_LONGLONG,
1108                                                &now,
1109                                                GNUNET_YES,
1110                                                MYSQL_TYPE_LONGLONG,
1111                                                &last_expire,
1112                                                GNUNET_YES,
1113                                                MYSQL_TYPE_LONGLONG,
1114                                                &last_vkey,
1115                                                GNUNET_YES,
1116                                                MYSQL_TYPE_LONGLONG,
1117                                                &now,
1118                                                GNUNET_YES, -1);
1119           break;
1120         default:
1121           GNUNET_break (0);
1122           return GNUNET_SYSERR;
1123         }
1124       if (ret != GNUNET_OK)
1125         break;
1126       last_vkey = vkey;
1127       last_prio = prio;
1128       last_expire = expiration;
1129       count++;
1130       if (dviter != NULL)
1131         {
1132           datum = assembleDatum (rbind);
1133           if (datum == NULL)
1134             continue;
1135           ret = dviter (&key, datum, closure, vkey);
1136           if (ret == GNUNET_SYSERR)
1137             {
1138               GNUNET_free (datum);
1139               break;
1140             }
1141           if (ret == GNUNET_NO)
1142             {
1143               do_delete_value (vkey);
1144               do_delete_entry_by_vkey (vkey);
1145               content_size -= ntohl (datum->size);
1146             }
1147           GNUNET_free (datum);
1148         }
1149     }
1150   return count;
1151 }
1152
1153
1154
1155 /**
1156  * Get an estimate of how much space the database is
1157  * currently using.
1158  *
1159  * @param cls our "struct Plugin*"
1160  * @return number of bytes used on disk
1161  */
1162 static unsigned long long
1163 mysql_plugin_get_size (void *cls)
1164 {
1165   struct Plugin *plugin = cls;
1166   return plugin->content_size;
1167 }
1168
1169
1170 /**
1171  * Store an item in the datastore.
1172  *
1173  * @param cls closure
1174  * @param key key for the item
1175  * @param size number of bytes in data
1176  * @param data content stored
1177  * @param type type of the content
1178  * @param priority priority of the content
1179  * @param anonymity anonymity-level for the content
1180  * @param expiration expiration time for the content
1181  * @param msg set to error message
1182  * @return GNUNET_OK on success
1183  */
1184 static int
1185 mysql_plugin_put (void *cls,
1186                      const GNUNET_HashCode * key,
1187                      uint32_t size,
1188                      const void *data,
1189                      enum GNUNET_BLOCK_Type type,
1190                      uint32_t priority,
1191                      uint32_t anonymity,
1192                      struct GNUNET_TIME_Absolute expiration,
1193                      char **msg)
1194 {
1195   struct Plugin *plugin = cls;
1196
1197   unsigned long contentSize;
1198   unsigned long hashSize;
1199   unsigned long hashSize2;
1200   unsigned int size;
1201   unsigned int type;
1202   unsigned int prio;
1203   unsigned int level;
1204   unsigned long long expiration;
1205   unsigned long long vkey;
1206   GNUNET_HashCode vhash;
1207
1208   if (((ntohl (value->size) < sizeof (GNUNET_DatastoreValue))) ||
1209       ((ntohl (value->size) - sizeof (GNUNET_DatastoreValue)) >
1210        MAX_DATUM_SIZE))
1211     {
1212       GNUNET_break (0);
1213       return GNUNET_SYSERR;
1214     }
1215   hashSize = sizeof (GNUNET_HashCode);
1216   hashSize2 = sizeof (GNUNET_HashCode);
1217   size = ntohl (value->size);
1218   type = ntohl (value->type);
1219   prio = ntohl (value->priority);
1220   level = ntohl (value->anonymity_level);
1221   expiration = GNUNET_ntohll (value->expiration_time);
1222   contentSize = ntohl (value->size) - sizeof (GNUNET_DatastoreValue);
1223   GNUNET_hash (&value[1], contentSize, &vhash);
1224
1225   if (GNUNET_OK != do_insert_value (&value[1], contentSize, &vkey))
1226     return GNUNET_SYSERR;
1227   if (GNUNET_OK !=
1228       prepared_statement_run (insert_entry,
1229                               NULL,
1230                               MYSQL_TYPE_LONG,
1231                               &size,
1232                               GNUNET_YES,
1233                               MYSQL_TYPE_LONG,
1234                               &type,
1235                               GNUNET_YES,
1236                               MYSQL_TYPE_LONG,
1237                               &prio,
1238                               GNUNET_YES,
1239                               MYSQL_TYPE_LONG,
1240                               &level,
1241                               GNUNET_YES,
1242                               MYSQL_TYPE_LONGLONG,
1243                               &expiration,
1244                               GNUNET_YES,
1245                               MYSQL_TYPE_BLOB,
1246                               key,
1247                               hashSize,
1248                               &hashSize,
1249                               MYSQL_TYPE_BLOB,
1250                               &vhash,
1251                               hashSize2,
1252                               &hashSize2,
1253                               MYSQL_TYPE_LONGLONG,
1254                               &vkey, GNUNET_YES, -1))
1255     {
1256       do_delete_value (vkey);
1257       return GNUNET_SYSERR;
1258     }
1259   plugin->content_size += ntohl (value->size);
1260   return GNUNET_OK;
1261 }
1262
1263
1264 /**
1265  * Function invoked on behalf of a "PluginIterator"
1266  * asking the database plugin to call the iterator
1267  * with the next item.
1268  *
1269  * @param next_cls whatever argument was given
1270  *        to the PluginIterator as "next_cls".
1271  * @param end_it set to GNUNET_YES if we
1272  *        should terminate the iteration early
1273  *        (iterator should be still called once more
1274  *         to signal the end of the iteration).
1275  */
1276 static void 
1277 mysql_plugin_next_request (void *next_cls,
1278                        int end_it)
1279 {
1280   struct Plugin *plugin = cls;
1281   GNUNET_break (0);
1282 }
1283
1284
1285 /**
1286  * Iterate over the results for a particular key
1287  * in the datastore.
1288  *
1289  * @param cls closure
1290  * @param key maybe NULL (to match all entries)
1291  * @param vhash hash of the value, maybe NULL (to
1292  *        match all values that have the right key).
1293  *        Note that for DBlocks there is no difference
1294  *        betwen key and vhash, but for other blocks
1295  *        there may be!
1296  * @param type entries of which type are relevant?
1297  *     Use 0 for any type.
1298  * @param iter function to call on each matching value;
1299  *        will be called once with a NULL value at the end
1300  * @param iter_cls closure for iter
1301  */
1302 static void
1303 mysql_plugin_get (void *cls,
1304                   const GNUNET_HashCode * key,
1305                   const GNUNET_HashCode * vhash,
1306                   enum GNUNET_BLOCK_Type type,
1307                   PluginIterator iter, void *iter_cls)
1308 {
1309   struct Plugin *plugin = cls;
1310   int count;
1311   unsigned long long total;
1312   int off;
1313   int ret;
1314   unsigned int size;
1315   unsigned int rtype;
1316   unsigned int prio;
1317   unsigned int level;
1318   unsigned int limit_off;
1319   unsigned long long expiration;
1320   unsigned long long vkey;
1321   unsigned long long last_vkey;
1322   GNUNET_DatastoreValue *datum;
1323   GNUNET_HashCode key;
1324   unsigned long hashSize;
1325   unsigned long hashSize2;
1326   MYSQL_BIND rbind[7];
1327
1328   if (key == NULL)
1329     return iterateLowPriority (type, iter, closure);
1330   hashSize = sizeof (GNUNET_HashCode);
1331   hashSize2 = sizeof (GNUNET_HashCode);
1332   memset (rbind, 0, sizeof (rbind));
1333   total = -1;
1334   rbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1335   rbind[0].buffer = &total;
1336   rbind[0].is_unsigned = GNUNET_YES;
1337   if (type != 0)
1338     {
1339       if (vhash != NULL)
1340         {
1341           ret =
1342             prepared_statement_run_select
1343             (count_entry_by_hash_vhash_and_type, 1, rbind, &return_ok, NULL,
1344              MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_BLOB,
1345              vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONG, &type, GNUNET_YES,
1346              -1);
1347         }
1348       else
1349         {
1350           ret =
1351             prepared_statement_run_select
1352             (count_entry_by_hash_and_type, 1, rbind, &return_ok, NULL,
1353              MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_LONG,
1354              &type, GNUNET_YES, -1);
1355
1356         }
1357     }
1358   else
1359     {
1360       if (vhash != NULL)
1361         {
1362           ret =
1363             prepared_statement_run_select
1364             (count_entry_by_hash_and_vhash, 1, rbind, &return_ok, NULL,
1365              MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_BLOB,
1366              vhash, hashSize2, &hashSize2, -1);
1367
1368         }
1369       else
1370         {
1371           ret =
1372             GNUNET_MYSQL_prepared_statement_run_select (count_entry_by_hash,
1373                                                         1, rbind, &return_ok,
1374                                                         NULL, MYSQL_TYPE_BLOB,
1375                                                         key, hashSize2,
1376                                                         &hashSize2, -1);
1377         }
1378     }
1379   if ((ret != GNUNET_OK) || (-1 == total))
1380     return GNUNET_SYSERR;
1381   if ((iter == NULL) || (total == 0))
1382     return (int) total;
1383
1384   last_vkey = 0;
1385   count = 0;
1386   off = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, total);
1387
1388   memset (rbind, 0, sizeof (rbind));
1389   rbind[0].buffer_type = MYSQL_TYPE_LONG;
1390   rbind[0].buffer = &size;
1391   rbind[0].is_unsigned = GNUNET_YES;
1392   rbind[1].buffer_type = MYSQL_TYPE_LONG;
1393   rbind[1].buffer = &rtype;
1394   rbind[1].is_unsigned = GNUNET_YES;
1395   rbind[2].buffer_type = MYSQL_TYPE_LONG;
1396   rbind[2].buffer = &prio;
1397   rbind[2].is_unsigned = GNUNET_YES;
1398   rbind[3].buffer_type = MYSQL_TYPE_LONG;
1399   rbind[3].buffer = &level;
1400   rbind[3].is_unsigned = GNUNET_YES;
1401   rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
1402   rbind[4].buffer = &expiration;
1403   rbind[4].is_unsigned = GNUNET_YES;
1404   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1405   rbind[5].buffer = &key;
1406   rbind[5].buffer_length = hashSize;
1407   rbind[5].length = &hashSize;
1408   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1409   rbind[6].buffer = &vkey;
1410   rbind[6].is_unsigned = GNUNET_YES;
1411   while (1)
1412     {
1413       if (count == 0)
1414         limit_off = off;
1415       else
1416         limit_off = 0;
1417       if (type != 0)
1418         {
1419           if (vhash != NULL)
1420             {
1421               ret =
1422                 prepared_statement_run_select
1423                 (select_entry_by_hash_vhash_and_type, 7, rbind, &return_ok,
1424                  NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1425                  MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
1426                  MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
1427                  &type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
1428                  -1);
1429             }
1430           else
1431             {
1432               ret =
1433                 prepared_statement_run_select
1434                 (select_entry_by_hash_and_type, 7, rbind, &return_ok, NULL,
1435                  MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1436                  MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
1437                  &type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
1438                  -1);
1439             }
1440         }
1441       else
1442         {
1443           if (vhash != NULL)
1444             {
1445               ret =
1446                 prepared_statement_run_select
1447                 (select_entry_by_hash_and_vhash, 7, rbind, &return_ok, NULL,
1448                  MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
1449                  vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONGLONG,
1450                  &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
1451                  GNUNET_YES, -1);
1452             }
1453           else
1454             {
1455               ret =
1456                 prepared_statement_run_select
1457                 (select_entry_by_hash, 7, rbind, &return_ok, NULL,
1458                  MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1459                  MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
1460                  &limit_off, GNUNET_YES, -1);
1461             }
1462         }
1463       if (ret != GNUNET_OK)
1464         break;
1465       last_vkey = vkey;
1466       datum = assembleDatum (rbind);
1467       if (datum == NULL)
1468         continue;
1469       count++;
1470       ret = iter (&key, datum, closure, vkey);
1471       if (ret == GNUNET_SYSERR)
1472         {
1473           GNUNET_free (datum);
1474           break;
1475         }
1476       if (ret == GNUNET_NO)
1477         {
1478           do_delete_value (vkey);
1479           do_delete_entry_by_vkey (vkey);
1480           content_size -= ntohl (datum->size);
1481         }
1482       GNUNET_free (datum);
1483       if (count + off == total)
1484         last_vkey = 0;          /* back to start */
1485       if (count == total)
1486         break;
1487     }
1488 }
1489
1490
1491 /**
1492  * Update the priority for a particular key in the datastore.  If
1493  * the expiration time in value is different than the time found in
1494  * the datastore, the higher value should be kept.  For the
1495  * anonymity level, the lower value is to be used.  The specified
1496  * priority should be added to the existing priority, ignoring the
1497  * priority in value.
1498  *
1499  * Note that it is possible for multiple values to match this put.
1500  * In that case, all of the respective values are updated.
1501  *
1502  * @param cls our "struct Plugin*"
1503  * @param uid unique identifier of the datum
1504  * @param delta by how much should the priority
1505  *     change?  If priority + delta < 0 the
1506  *     priority should be set to 0 (never go
1507  *     negative).
1508  * @param expire new expiration time should be the
1509  *     MAX of any existing expiration time and
1510  *     this value
1511  * @param msg set to error message
1512  * @return GNUNET_OK on success
1513  */
1514 static int
1515 mysql_plugin_update (void *cls,
1516                      uint64_t uid,
1517                      int delta, 
1518                      struct GNUNET_TIME_Absolute expire,
1519                      char **msg)
1520 {
1521   struct Plugin *plugin = cls;
1522   unsigned long long vkey = uid;
1523
1524   return prepared_statement_run (plugin,
1525                                  plugin->update_entry,
1526                                  NULL,
1527                                  MYSQL_TYPE_LONG,
1528                                  &delta,
1529                                  GNUNET_NO,
1530                                  MYSQL_TYPE_LONGLONG,
1531                                  &expire.value,
1532                                  GNUNET_YES,
1533                                  MYSQL_TYPE_LONGLONG,
1534                                  &expire.value,
1535                                  GNUNET_YES,
1536                                  MYSQL_TYPE_LONGLONG,
1537                                  &vkey,
1538                                  GNUNET_YES, -1);
1539 }
1540
1541
1542 /**
1543  * Select a subset of the items in the datastore and call
1544  * the given iterator for each of them.
1545  *
1546  * @param cls our "struct Plugin*"
1547  * @param type entries of which type should be considered?
1548  *        Use 0 for any type.
1549  * @param iter function to call on each matching value;
1550  *        will be called once with a NULL value at the end
1551  * @param iter_cls closure for iter
1552  */
1553 static void
1554 mysql_plugin_iter_low_priority (void *cls,
1555                                    enum GNUNET_BLOCK_Type type,
1556                                    PluginIterator iter,
1557                                    void *iter_cls)
1558 {
1559   struct Plugin *plugin = cls;
1560   iterateHelper (plugin, type, GNUNET_YES, 
1561                  0, iter, iter_cls); 
1562 }
1563
1564
1565 /**
1566  * Select a subset of the items in the datastore and call
1567  * the given iterator for each of them.
1568  *
1569  * @param cls our "struct Plugin*"
1570  * @param type entries of which type should be considered?
1571  *        Use 0 for any type.
1572  * @param iter function to call on each matching value;
1573  *        will be called once with a NULL value at the end
1574  * @param iter_cls closure for iter
1575  */
1576 static void
1577 mysql_plugin_iter_zero_anonymity (void *cls,
1578                                      enum GNUNET_BLOCK_Type type,
1579                                      PluginIterator iter,
1580                                      void *iter_cls)
1581 {
1582   struct Plugin *plugin = cls;
1583   iterateHelper (plugin, type, GNUNET_NO, 1, iter, closure);
1584 }
1585
1586
1587 /**
1588  * Select a subset of the items in the datastore and call
1589  * the given iterator for each of them.
1590  *
1591  * @param cls our "struct Plugin*"
1592  * @param type entries of which type should be considered?
1593  *        Use 0 for any type.
1594  * @param iter function to call on each matching value;
1595  *        will be called once with a NULL value at the end
1596  * @param iter_cls closure for iter
1597  */
1598 static void
1599 mysql_plugin_iter_ascending_expiration (void *cls,
1600                                            enum GNUNET_BLOCK_Type type,
1601                                            PluginIterator iter,
1602                                         void *iter_cls)
1603 {
1604   struct Plugin *plugin = cls;
1605   iterateHelper (plugin, type, GNUNET_YES, 2, iter, closure);
1606 }
1607
1608
1609 /**
1610  * Select a subset of the items in the datastore and call
1611  * the given iterator for each of them.
1612  *
1613  * @param cls our "struct Plugin*"
1614  * @param type entries of which type should be considered?
1615  *        Use 0 for any type.
1616  * @param iter function to call on each matching value;
1617  *        will be called once with a NULL value at the end
1618  * @param iter_cls closure for iter
1619  */
1620 static void
1621 mysql_plugin_iter_migration_order (void *cls,
1622                                       enum GNUNET_BLOCK_Type type,
1623                                       PluginIterator iter,
1624                                       void *iter_cls)
1625 {
1626   struct Plugin *plugin = cls;
1627   iterateHelper (plugin, 0, GNUNET_NO, 3, iter, closure);
1628 }
1629
1630
1631 /**
1632  * Select a subset of the items in the datastore and call
1633  * the given iterator for each of them.
1634  *
1635  * @param cls our "struct Plugin*"
1636  * @param type entries of which type should be considered?
1637  *        Use 0 for any type.
1638  * @param iter function to call on each matching value;
1639  *        will be called once with a NULL value at the end
1640  * @param iter_cls closure for iter
1641  */
1642 static void
1643 mysql_plugin_iter_all_now (void *cls,
1644                               enum GNUNET_BLOCK_Type type,
1645                               PluginIterator iter,
1646                               void *iter_cls)
1647 {
1648   struct Plugin *plugin = cls;
1649   iterateHelper (plugin, 0, GNUNET_YES, 0, iter, closure);
1650 }
1651
1652
1653 /**
1654  * Drop database.
1655  */
1656 static void 
1657 mysql_plugin_drop (void *cls)
1658 {
1659   struct Plugin *plugin = cls;
1660
1661   if ((GNUNET_OK != run_statement (plugin,
1662                                    "DROP TABLE gn080")) ||
1663       (GNUNET_OK != run_statement (plugin,
1664                                    "DROP TABLE gn072")))
1665     return;                     /* error */
1666   plugin->content_size = 0;
1667 }
1668
1669
1670 /**
1671  * Entry point for the plugin.
1672  *
1673  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1674  * @return our "struct Plugin*"
1675  */
1676 void *
1677 libgnunet_plugin_datastore_mysql_init (void *cls)
1678 {
1679   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1680   struct GNUNET_DATASTORE_PluginFunctions *api;
1681   struct Plugin *plugin;
1682
1683   plugin = GNUNET_malloc (sizeof (struct Plugin));
1684   plugin->env = env;
1685   plugin->cnffile = get_my_cnf_path (env->cfg);
1686   if (GNUNET_OK != iopen (plugin))
1687     {
1688       iclose (plugin);
1689       GNUNET_free_non_null (plugin->cnffile);
1690       GNUNET_free (plugin);
1691       return NULL;
1692     }
1693 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1694 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1695   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn080 ("
1696              " size INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1697              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1698              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1699              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1700              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1701              " hash BINARY(64) NOT NULL DEFAULT '',"
1702              " vhash BINARY(64) NOT NULL DEFAULT '',"
1703              " vkey BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1704              " INDEX hash (hash(64)),"
1705              " INDEX hash_vhash_vkey (hash(64),vhash(64),vkey),"
1706              " INDEX hash_vkey (hash(64),vkey),"
1707              " INDEX vkey (vkey),"
1708              " INDEX prio (prio,vkey),"
1709              " INDEX expire (expire,vkey,type),"
1710              " INDEX anonLevel (anonLevel,prio,vkey,type)"
1711              ") ENGINE=InnoDB") ||
1712       MRUNS ("CREATE TABLE IF NOT EXISTS gn072 ("
1713              " vkey BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,"
1714              " value BLOB NOT NULL DEFAULT '') ENGINE=MyISAM") ||
1715       MRUNS ("SET AUTOCOMMIT = 1") ||
1716       PINIT (plugin->select_value, SELECT_VALUE) ||
1717       PINIT (plugin->delete_value, DELETE_VALUE) ||
1718       PINIT (plugin->insert_value, INSERT_VALUE) ||
1719       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1720       PINIT (plugin->delete_entry_by_vkey, DELETE_ENTRY_BY_VKEY) ||
1721       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1722       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1723       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1724       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1725                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1726       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1727       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1728       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1729       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1730                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1731       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1732       || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY)
1733       || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS)
1734       || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME)
1735       || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER))
1736     {
1737       GNUNET_MYSQL_database_close (db);
1738       db = NULL;
1739       return GNUNET_SYSERR;
1740     }
1741 #undef PINIT
1742 #undef MRUNS
1743
1744
1745   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1746   api->cls = plugin;
1747   api->get_size = &mysql_plugin_get_size;
1748   api->put = &mysql_plugin_put;
1749   api->next_request = &mysql_plugin_next_request;
1750   api->get = &mysql_plugin_get;
1751   api->update = &mysql_plugin_update;
1752   api->iter_low_priority = &mysql_plugin_iter_low_priority;
1753   api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
1754   api->iter_ascending_expiration = &mysql_plugin_iter_ascending_expiration;
1755   api->iter_migration_order = &mysql_plugin_iter_migration_order;
1756   api->iter_all_now = &mysql_plugin_iter_all_now;
1757   api->drop = &mysql_plugin_drop;
1758   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1759                    "mysql", _("Mysql database running\n"));
1760   return api;
1761 }
1762
1763
1764 /**
1765  * Exit point from the plugin.
1766  * @param cls our "struct Plugin*"
1767  * @return always NULL
1768  */
1769 void *
1770 libgnunet_plugin_datastore_mysql_done (void *cls)
1771 {
1772   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1773   struct Plugin *plugin = api->cls;
1774
1775   iclose (plugin);
1776   GNUNET_free_non_null (plugin->cnffile);
1777   GNUNET_free (plugin);
1778   GNUNET_free (plugin);
1779   GNUNET_free (api);
1780   mysql_library_end ();
1781   return NULL;
1782 }
1783
1784 /* end of plugin_datastore_mysql.c */