make TODO better grepable
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163
164 /**
165  * Context for all functions in this plugin.
166  */
167 struct Plugin 
168 {
169   /**
170    * Our execution environment.
171    */
172   struct GNUNET_DATASTORE_PluginEnvironment *env;
173
174   /**
175    * Handle to talk to MySQL.
176    */
177   MYSQL *dbf;
178   
179   /**
180    * We keep all prepared statements in a DLL.  This is the head.
181    */
182   struct GNUNET_MysqlStatementHandle *shead;
183
184   /**
185    * We keep all prepared statements in a DLL.  This is the tail.
186    */
187   struct GNUNET_MysqlStatementHandle *stail;
188
189   /**
190    * Filename of "my.cnf" (msyql configuration).
191    */
192   char *cnffile;
193
194   /**
195    * Prepared statements.
196    */
197 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
198   struct GNUNET_MysqlStatementHandle *insert_entry;
199   
200 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
201   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
202
203 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 WHERE hash=?"
204   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
205   
206 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
207   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
208
209 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?"
210   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
211
212 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
213   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
214  
215 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND type=?"
216   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
217
218 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
219   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
220  
221 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?"
222   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
223   
224 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
225   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
226
227 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
228   struct GNUNET_MysqlStatementHandle *update_entry;
229
230 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
231   struct GNUNET_MysqlStatementHandle *dec_repl;
232
233 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
234   struct GNUNET_MysqlStatementHandle *get_size;
235
236 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND type=? ORDER BY uid DESC LIMIT 1 OFFSET ?"
237   struct GNUNET_MysqlStatementHandle *zero_iter;
238
239 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
240   "UNION "\
241   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY prio ASC LIMIT 1) "\
242   "ORDER BY expire ASC LIMIT 1"
243   struct GNUNET_MysqlStatementHandle *select_expiration;
244
245 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY repl DESC,RAND() LIMIT 1"
246   struct GNUNET_MysqlStatementHandle *select_replication;
247
248 };
249
250
251 /**
252  * Obtain the location of ".my.cnf".
253  *
254  * @param cfg our configuration
255  * @return NULL on error
256  */
257 static char *
258 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
259 {
260   char *cnffile;
261   char *home_dir;
262   struct stat st;
263 #ifndef WINDOWS
264   struct passwd *pw;
265 #endif
266   int configured;
267
268 #ifndef WINDOWS
269   pw = getpwuid (getuid ());
270   if (!pw)
271     {
272       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
273                            "getpwuid");
274       return NULL;
275     }
276   if (GNUNET_YES ==
277       GNUNET_CONFIGURATION_have_value (cfg,
278                                        "datastore-mysql", "CONFIG"))
279     {
280       GNUNET_assert (GNUNET_OK == 
281                      GNUNET_CONFIGURATION_get_value_filename (cfg,
282                                                               "datastore-mysql", "CONFIG", &cnffile));
283       configured = GNUNET_YES;
284     }
285   else
286     {
287       home_dir = GNUNET_strdup (pw->pw_dir);
288 #else
289       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
290       plibc_conv_to_win_path ("~/", home_dir);
291 #endif
292       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
293       GNUNET_free (home_dir);
294       configured = GNUNET_NO;
295     }
296   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
297               _("Trying to use file `%s' for MySQL configuration.\n"),
298               cnffile);
299   if ((0 != STAT (cnffile, &st)) ||
300       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
301     {
302       if (configured == GNUNET_YES)
303         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
304                     _("Could not access file `%s': %s\n"), cnffile,
305                     STRERROR (errno));
306       GNUNET_free (cnffile);
307       return NULL;
308     }
309   return cnffile;
310 }
311
312
313 /**
314  * Free a prepared statement.
315  *
316  * @param plugin plugin context
317  * @param s prepared statement
318  */
319 static void
320 prepared_statement_destroy (struct Plugin *plugin, 
321                             struct GNUNET_MysqlStatementHandle *s)
322 {
323   GNUNET_CONTAINER_DLL_remove (plugin->shead,
324                                plugin->stail,
325                                s);
326   if (s->valid)
327     mysql_stmt_close (s->statement);
328   GNUNET_free (s->query);
329   GNUNET_free (s);
330 }
331
332
333 /**
334  * Close database connection and all prepared statements (we got a DB
335  * disconnect error).
336  * 
337  * @param plugin plugin context
338  */
339 static int
340 iclose (struct Plugin *plugin)
341 {
342   struct GNUNET_MysqlStatementHandle *spos;
343
344   spos = plugin->shead;
345   while (NULL != plugin->shead)
346     prepared_statement_destroy (plugin,
347                                 plugin->shead);
348   if (plugin->dbf != NULL)
349     {
350       mysql_close (plugin->dbf);
351       plugin->dbf = NULL;
352     }
353   return GNUNET_OK;
354 }
355
356
357 /**
358  * Open the connection with the database (and initialize
359  * our default options).
360  *
361  * @param plugin plugin context
362  * @return GNUNET_OK on success
363  */
364 static int
365 iopen (struct Plugin *plugin)
366 {
367   char *mysql_dbname;
368   char *mysql_server;
369   char *mysql_user;
370   char *mysql_password;
371   unsigned long long mysql_port;
372   my_bool reconnect;
373   unsigned int timeout;
374
375   plugin->dbf = mysql_init (NULL);
376   if (plugin->dbf == NULL)
377     return GNUNET_SYSERR;
378   if (plugin->cnffile != NULL)
379     mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile);
380   mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
381   reconnect = 0;
382   mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect);
383   mysql_options (plugin->dbf,
384                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
385   mysql_options(plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
386   timeout = 60; /* in seconds */
387   mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
388   mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
389   mysql_dbname = NULL;
390   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
391                                                      "datastore-mysql", "DATABASE"))
392     GNUNET_assert (GNUNET_OK == 
393                    GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
394                                                           "datastore-mysql", "DATABASE", 
395                                                           &mysql_dbname));
396   else
397     mysql_dbname = GNUNET_strdup ("gnunet");
398   mysql_user = NULL;
399   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
400                                                      "datastore-mysql", "USER"))
401     {
402       GNUNET_assert (GNUNET_OK == 
403                     GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
404                                                            "datastore-mysql", "USER", 
405                                                            &mysql_user));
406     }
407   mysql_password = NULL;
408   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
409                                                      "datastore-mysql", "PASSWORD"))
410     {
411       GNUNET_assert (GNUNET_OK ==
412                     GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
413                                                            "datastore-mysql", "PASSWORD",
414                                                            &mysql_password));
415     }
416   mysql_server = NULL;
417   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
418                                                      "datastore-mysql", "HOST"))
419     {
420       GNUNET_assert (GNUNET_OK == 
421                     GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
422                                                            "datastore-mysql", "HOST", 
423                                                            &mysql_server));
424     }
425   mysql_port = 0;
426   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
427                                                      "datastore-mysql", "PORT"))
428     {
429       GNUNET_assert (GNUNET_OK ==
430                     GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, "datastore-mysql",
431                                                            "PORT", &mysql_port));
432     }
433
434   GNUNET_assert (mysql_dbname != NULL);
435   mysql_real_connect (plugin->dbf, 
436                       mysql_server, 
437                       mysql_user, mysql_password,
438                       mysql_dbname, 
439                       (unsigned int) mysql_port, NULL,
440                       CLIENT_IGNORE_SIGPIPE);
441   GNUNET_free_non_null (mysql_server);
442   GNUNET_free_non_null (mysql_user);
443   GNUNET_free_non_null (mysql_password);
444   GNUNET_free (mysql_dbname);
445   if (mysql_error (plugin->dbf)[0])
446     {
447       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
448                  "mysql_real_connect", plugin);
449       return GNUNET_SYSERR;
450     }
451   return GNUNET_OK;
452 }
453
454
455 /**
456  * Run the given MySQL statement.
457  *
458  * @param plugin plugin context
459  * @param statement SQL statement to run
460  * @return GNUNET_OK on success, GNUNET_SYSERR on error
461  */
462 static int
463 run_statement (struct Plugin *plugin,
464                const char *statement)
465 {
466   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
467     return GNUNET_SYSERR;
468   mysql_query (plugin->dbf, statement);
469   if (mysql_error (plugin->dbf)[0])
470     {
471       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
472                  "mysql_query", plugin);
473       iclose (plugin);
474       return GNUNET_SYSERR;
475     }
476   return GNUNET_OK;
477 }
478
479
480 /**
481  * Create a prepared statement.
482  *
483  * @param plugin plugin context
484  * @param statement SQL statement text to prepare
485  * @return NULL on error
486  */
487 static struct GNUNET_MysqlStatementHandle *
488 prepared_statement_create (struct Plugin *plugin, 
489                            const char *statement)
490 {
491   struct GNUNET_MysqlStatementHandle *ret;
492
493   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
494   ret->query = GNUNET_strdup (statement);
495   GNUNET_CONTAINER_DLL_insert (plugin->shead,
496                                plugin->stail,
497                                ret);
498   return ret;
499 }
500
501
502 /**
503  * Prepare a statement for running.
504  *
505  * @param plugin plugin context
506  * @param ret handle to prepared statement
507  * @return GNUNET_OK on success
508  */
509 static int
510 prepare_statement (struct Plugin *plugin, 
511                    struct GNUNET_MysqlStatementHandle *ret)
512 {
513   if (GNUNET_YES == ret->valid)
514     return GNUNET_OK;
515   if ((NULL == plugin->dbf) && 
516       (GNUNET_OK != iopen (plugin)))
517     return GNUNET_SYSERR;
518   ret->statement = mysql_stmt_init (plugin->dbf);
519   if (ret->statement == NULL)
520     {
521       iclose (plugin);
522       return GNUNET_SYSERR;
523     }
524   if (mysql_stmt_prepare (ret->statement, 
525                           ret->query,
526                           strlen (ret->query)))
527     {
528       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
529                  "mysql_stmt_prepare", 
530                  plugin);
531       mysql_stmt_close (ret->statement);
532       ret->statement = NULL;
533       iclose (plugin);
534       return GNUNET_SYSERR;
535     }
536   ret->valid = GNUNET_YES;
537   return GNUNET_OK;
538
539 }
540
541
542 /**
543  * Bind the parameters for the given MySQL statement
544  * and run it.
545  *
546  * @param plugin plugin context
547  * @param s statement to bind and run
548  * @param ap arguments for the binding
549  * @return GNUNET_SYSERR on error, GNUNET_OK on success
550  */
551 static int
552 init_params (struct Plugin *plugin,
553              struct GNUNET_MysqlStatementHandle *s,
554              va_list ap)
555 {
556   MYSQL_BIND qbind[MAX_PARAM];
557   unsigned int pc;
558   unsigned int off;
559   enum enum_field_types ft;
560
561   pc = mysql_stmt_param_count (s->statement);
562   if (pc > MAX_PARAM)
563     {
564       /* increase internal constant! */
565       GNUNET_break (0);
566       return GNUNET_SYSERR;
567     }
568   memset (qbind, 0, sizeof (qbind));
569   off = 0;
570   ft = 0;
571   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
572     {
573       qbind[off].buffer_type = ft;
574       switch (ft)
575         {
576         case MYSQL_TYPE_FLOAT:
577           qbind[off].buffer = va_arg (ap, float *);
578           break;
579         case MYSQL_TYPE_LONGLONG:
580           qbind[off].buffer = va_arg (ap, unsigned long long *);
581           qbind[off].is_unsigned = va_arg (ap, int);
582           break;
583         case MYSQL_TYPE_LONG:
584           qbind[off].buffer = va_arg (ap, unsigned int *);
585           qbind[off].is_unsigned = va_arg (ap, int);
586           break;
587         case MYSQL_TYPE_VAR_STRING:
588         case MYSQL_TYPE_STRING:
589         case MYSQL_TYPE_BLOB:
590           qbind[off].buffer = va_arg (ap, void *);
591           qbind[off].buffer_length = va_arg (ap, unsigned long);
592           qbind[off].length = va_arg (ap, unsigned long *);
593           break;
594         default:
595           /* unsupported type */
596           GNUNET_break (0);
597           return GNUNET_SYSERR;
598         }
599       pc--;
600       off++;
601     }
602   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
603     {
604       GNUNET_assert (0);
605       return GNUNET_SYSERR;
606     }
607   if (mysql_stmt_bind_param (s->statement, qbind))
608     {
609       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
610                   _("`%s' failed at %s:%d with error: %s\n"),
611                   "mysql_stmt_bind_param",
612                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
613       iclose (plugin);
614       return GNUNET_SYSERR;
615     }
616   if (mysql_stmt_execute (s->statement))
617     {
618       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
619                   _("`%s' failed at %s:%d with error: %s\n"),
620                   "mysql_stmt_execute",
621                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
622       iclose (plugin);
623       return GNUNET_SYSERR;
624     }
625   return GNUNET_OK;
626 }
627
628
629 /**
630  * Run a prepared SELECT statement.
631  *
632  * @param plugin plugin context
633  * @param s statement to run
634  * @param result_size number of elements in results array
635  * @param results pointer to already initialized MYSQL_BIND
636  *        array (of sufficient size) for passing results
637  * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
638  *        values (size + buffer-reference for pointers); terminated
639  *        with "-1"
640  * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result)
641  */
642 static int
643 prepared_statement_run_select_va (struct Plugin *plugin,
644                                   struct GNUNET_MysqlStatementHandle *s,
645                                   unsigned int result_size,
646                                   MYSQL_BIND *results,
647                                   va_list ap)
648 {
649   int ret;
650   unsigned int rsize;
651
652   if (GNUNET_OK != prepare_statement (plugin, s))
653     {
654       GNUNET_break (0);
655       return GNUNET_SYSERR;
656     }
657   if (GNUNET_OK != init_params (plugin, s, ap))
658     {
659       GNUNET_break (0);
660       return GNUNET_SYSERR;
661     }
662   rsize = mysql_stmt_field_count (s->statement);
663   if (rsize > result_size)
664     {
665       GNUNET_break (0);
666       return GNUNET_SYSERR;
667     }
668   if (mysql_stmt_bind_result (s->statement, results))
669     {
670       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671                   _("`%s' failed at %s:%d with error: %s\n"),
672                   "mysql_stmt_bind_result",
673                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
674       iclose (plugin);
675       return GNUNET_SYSERR;
676     }
677   ret = mysql_stmt_fetch (s->statement);
678   if (ret == MYSQL_NO_DATA)
679     return GNUNET_NO;
680   if (ret != 0)
681     {
682       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
683                   _("`%s' failed at %s:%d with error: %s\n"),
684                   "mysql_stmt_fetch",
685                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
686       iclose (plugin);
687       return GNUNET_SYSERR;
688     }
689   mysql_stmt_reset (s->statement);
690   return GNUNET_OK;
691 }
692
693
694 /**
695  * Run a prepared SELECT statement.
696  *
697  * @param plugin plugin context
698  * @param s statement to run
699  * @param result_size number of elements in results array
700  * @param results pointer to already initialized MYSQL_BIND
701  *        array (of sufficient size) for passing results
702  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
703  *        values (size + buffer-reference for pointers); terminated
704  *        with "-1"
705  * @return GNUNET_SYSERR on error, otherwise
706  *         the number of successfully affected (or queried) rows
707  */
708 static int
709 prepared_statement_run_select (struct Plugin *plugin,
710                                struct GNUNET_MysqlStatementHandle *s,
711                                unsigned int result_size,
712                                MYSQL_BIND *results,
713                                ...)
714 {
715   va_list ap;
716   int ret;
717
718   va_start (ap, results);
719   ret = prepared_statement_run_select_va (plugin, s, 
720                                           result_size, results,
721                                           ap);
722   va_end (ap);
723   return ret;
724 }
725
726
727 /**
728  * Run a prepared statement that does NOT produce results.
729  *
730  * @param plugin plugin context
731  * @param s statement to run
732  * @param insert_id NULL or address where to store the row ID of whatever
733  *        was inserted (only for INSERT statements!)
734  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
735  *        values (size + buffer-reference for pointers); terminated
736  *        with "-1"
737  * @return GNUNET_SYSERR on error, otherwise
738  *         the number of successfully affected rows
739  */
740 static int
741 prepared_statement_run (struct Plugin *plugin,
742                         struct GNUNET_MysqlStatementHandle *s,
743                         unsigned long long *insert_id, ...)
744 {
745   va_list ap;
746   int affected;
747
748   if (GNUNET_OK != prepare_statement (plugin, s))
749     return GNUNET_SYSERR;
750   va_start (ap, insert_id);
751   if (GNUNET_OK != init_params (plugin, s, ap))
752     {
753       va_end (ap);
754       return GNUNET_SYSERR;
755     }
756   va_end (ap);
757   affected = mysql_stmt_affected_rows (s->statement);
758   if (NULL != insert_id)
759     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
760   mysql_stmt_reset (s->statement);
761   return affected;
762 }
763
764
765 /**
766  * Delete an entry from the gn090 table.
767  *
768  * @param plugin plugin context
769  * @param uid unique ID of the entry to delete
770  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
771  */
772 static int
773 do_delete_entry (struct Plugin *plugin,
774                  unsigned long long uid)
775 {
776   int ret;
777  
778 #if DEBUG_MYSQL
779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
780               "Deleting value %llu from gn090 table\n",
781               uid);
782 #endif
783   ret = prepared_statement_run (plugin,
784                                 plugin->delete_entry_by_uid,
785                                 NULL,
786                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
787                                 -1);
788   if (ret >= 0)
789     return GNUNET_OK;
790   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
791               "Deleting value %llu from gn090 table failed\n",
792               uid);
793   return ret;
794 }
795
796
797 /**
798  * Get an estimate of how much space the database is
799  * currently using.
800  *
801  * @param cls our "struct Plugin *"
802  * @return number of bytes used on disk
803  */
804 static unsigned long long
805 mysql_plugin_estimate_size (void *cls)
806 {
807   struct Plugin *plugin = cls;
808   MYSQL_BIND cbind[1];
809   long long total;
810
811   memset (cbind, 0, sizeof (cbind));
812   total = 0;
813   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
814   cbind[0].buffer = &total;
815   cbind[0].is_unsigned = GNUNET_NO;
816   if (GNUNET_OK != 
817       prepared_statement_run_select (plugin,
818                                      plugin->get_size,
819                                      1, cbind, 
820                                      -1))
821     return 0;
822   return total;
823 }
824
825
826 /**
827  * Store an item in the datastore.
828  *
829  * @param cls closure
830  * @param key key for the item
831  * @param size number of bytes in data
832  * @param data content stored
833  * @param type type of the content
834  * @param priority priority of the content
835  * @param anonymity anonymity-level for the content
836  * @param replication replication-level for the content
837  * @param expiration expiration time for the content
838  * @param msg set to error message
839  * @return GNUNET_OK on success
840  */
841 static int
842 mysql_plugin_put (void *cls,
843                   const GNUNET_HashCode * key,
844                   uint32_t size,
845                   const void *data,
846                   enum GNUNET_BLOCK_Type type,
847                   uint32_t priority,
848                   uint32_t anonymity,
849                   uint32_t replication,
850                   struct GNUNET_TIME_Absolute expiration,
851                   char **msg)
852 {
853   struct Plugin *plugin = cls;
854   unsigned int irepl = replication;
855   unsigned int ipriority = priority;
856   unsigned int ianonymity = anonymity;
857   unsigned long long lexpiration = expiration.abs_value;
858   unsigned long hashSize;
859   unsigned long hashSize2;
860   unsigned long lsize;
861   GNUNET_HashCode vhash;
862
863   if (size > MAX_DATUM_SIZE)
864     {
865       GNUNET_break (0);
866       return GNUNET_SYSERR;
867     }
868   hashSize = sizeof (GNUNET_HashCode);
869   hashSize2 = sizeof (GNUNET_HashCode);
870   lsize = size;
871   GNUNET_CRYPTO_hash (data, size, &vhash);
872   if (GNUNET_OK !=
873       prepared_statement_run (plugin,
874                               plugin->insert_entry,
875                               NULL,
876                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
877                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
878                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
879                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
880                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
881                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
882                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
883                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
884                               -1))
885     return GNUNET_SYSERR;    
886 #if DEBUG_MYSQL
887   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888               "Inserted value `%s' with size %u into gn090 table\n",
889               GNUNET_h2s (key),
890               (unsigned int) size);
891 #endif
892   if (size > 0)
893     plugin->env->duc (plugin->env->cls,
894                       size);
895   return GNUNET_OK;
896 }
897
898
899 /**
900  * Update the priority for a particular key in the datastore.  If
901  * the expiration time in value is different than the time found in
902  * the datastore, the higher value should be kept.  For the
903  * anonymity level, the lower value is to be used.  The specified
904  * priority should be added to the existing priority, ignoring the
905  * priority in value.
906  *
907  * Note that it is possible for multiple values to match this put.
908  * In that case, all of the respective values are updated.
909  *
910  * @param cls our "struct Plugin*"
911  * @param uid unique identifier of the datum
912  * @param delta by how much should the priority
913  *     change?  If priority + delta < 0 the
914  *     priority should be set to 0 (never go
915  *     negative).
916  * @param expire new expiration time should be the
917  *     MAX of any existing expiration time and
918  *     this value
919  * @param msg set to error message
920  * @return GNUNET_OK on success
921  */
922 static int
923 mysql_plugin_update (void *cls,
924                      uint64_t uid,
925                      int delta, 
926                      struct GNUNET_TIME_Absolute expire,
927                      char **msg)
928 {
929   struct Plugin *plugin = cls;
930   unsigned long long vkey = uid;
931   unsigned long long lexpire = expire.abs_value;
932   int ret;
933
934 #if DEBUG_MYSQL
935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
937               vkey,
938               delta,
939               lexpire);
940 #endif
941   ret = prepared_statement_run (plugin,
942                                 plugin->update_entry,
943                                 NULL,
944                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
945                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
946                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
947                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
948                                 -1);
949   if (ret != GNUNET_OK)
950     {
951       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
952                   "Failed to update value %llu\n",
953                   vkey);
954     }
955   return ret;
956 }
957
958
959 /**
960  * Run the given select statement and call 'proc' on the resulting
961  * values (which must be in particular positions).
962  *
963  * @param plugin the plugin handle
964  * @param stmt select statement to run
965  * @param proc function to call on result
966  * @param proc_cls closure for proc
967  * @param ... arguments to initialize stmt
968  */
969 static void 
970 execute_select (struct Plugin *plugin,
971                 struct GNUNET_MysqlStatementHandle *stmt,
972                 PluginDatumProcessor proc, void *proc_cls,
973                 ...)
974 {
975   va_list ap;
976   int ret;
977   unsigned int type;
978   unsigned int priority;
979   unsigned int anonymity;
980   unsigned long long exp;
981   unsigned long hashSize;
982   unsigned long size;
983   unsigned long long uid;
984   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
985   GNUNET_HashCode key;
986   struct GNUNET_TIME_Absolute expiration;
987   MYSQL_BIND rbind[7];
988
989   hashSize = sizeof (GNUNET_HashCode);
990   memset (rbind, 0, sizeof (rbind));
991   rbind[0].buffer_type = MYSQL_TYPE_LONG;
992   rbind[0].buffer = &type;
993   rbind[0].is_unsigned = 1;
994   rbind[1].buffer_type = MYSQL_TYPE_LONG;
995   rbind[1].buffer = &priority;
996   rbind[1].is_unsigned = 1;
997   rbind[2].buffer_type = MYSQL_TYPE_LONG;
998   rbind[2].buffer = &anonymity;
999   rbind[2].is_unsigned = 1;
1000   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1001   rbind[3].buffer = &exp;
1002   rbind[3].is_unsigned = 1;
1003   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1004   rbind[4].buffer = &key;
1005   rbind[4].buffer_length = hashSize;
1006   rbind[4].length = &hashSize;
1007   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1008   rbind[5].buffer = value;
1009   rbind[5].buffer_length = size = sizeof (value);
1010   rbind[5].length = &size;
1011   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1012   rbind[6].buffer = &uid;
1013   rbind[6].is_unsigned = 1;
1014
1015   va_start (ap, proc_cls);
1016   ret = prepared_statement_run_select_va (plugin,
1017                                           stmt,
1018                                           7, rbind,
1019                                           ap);
1020   va_end (ap);
1021   if (ret <= 0)
1022     {
1023       proc (proc_cls, 
1024             NULL, 0, NULL, 0, 0, 0, 
1025             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1026       return;
1027     }
1028   GNUNET_assert (size <= sizeof(value));
1029   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1030        (hashSize != sizeof (GNUNET_HashCode)) )
1031     {
1032       GNUNET_break (0);
1033       proc (proc_cls, 
1034             NULL, 0, NULL, 0, 0, 0, 
1035             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1036       return;
1037     }     
1038 #if DEBUG_MYSQL
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1041               (unsigned int) size,
1042               GNUNET_h2s (&key),
1043               priority,
1044               anonymity,
1045               exp);
1046 #endif
1047   GNUNET_assert (size < MAX_DATUM_SIZE);
1048   expiration.abs_value = exp;
1049   ret = proc (proc_cls, 
1050               &key,
1051               size, value,
1052               type, priority, anonymity, expiration,
1053               uid);
1054   if (ret == GNUNET_NO)
1055     {
1056       do_delete_entry (plugin, uid);
1057       if (size != 0)
1058         plugin->env->duc (plugin->env->cls,
1059                           - size);
1060     }
1061 }
1062
1063
1064
1065 /**
1066  * Get one of the results for a particular key in the datastore.
1067  *
1068  * @param cls closure
1069  * @param offset offset of the result (mod #num-results); 
1070  *               specific ordering does not matter for the offset
1071  * @param key key to match, never NULL
1072  * @param vhash hash of the value, maybe NULL (to
1073  *        match all values that have the right key).
1074  *        Note that for DBlocks there is no difference
1075  *        betwen key and vhash, but for other blocks
1076  *        there may be!
1077  * @param type entries of which type are relevant?
1078  *     Use 0 for any type.
1079  * @param proc function to call on each matching value; however,
1080  *        after the first call to "proc", the plugin must wait
1081  *        until "NextRequest" was called before giving the processor
1082  *        the next item; finally, the "proc" should be called once
1083  *        once with a NULL value at the end ("next_cls" should be NULL
1084  *        for that last call)
1085  * @param proc_cls closure for proc
1086  */
1087 static void
1088 mysql_plugin_get_key (void *cls,
1089                       uint64_t offset,
1090                       const GNUNET_HashCode *key,
1091                       const GNUNET_HashCode *vhash,
1092                       enum GNUNET_BLOCK_Type type,                    
1093                       PluginDatumProcessor proc, void *proc_cls)
1094 {
1095   struct Plugin *plugin = cls;
1096   int ret;
1097   MYSQL_BIND cbind[1];
1098   long long total;
1099   unsigned long hashSize;
1100   unsigned long hashSize2;
1101   unsigned long long off;
1102
1103   GNUNET_assert (key != NULL);
1104   GNUNET_assert (NULL != proc);
1105   hashSize = sizeof (GNUNET_HashCode);
1106   hashSize2 = sizeof (GNUNET_HashCode);
1107   memset (cbind, 0, sizeof (cbind));
1108   total = -1;
1109   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1110   cbind[0].buffer = &total;
1111   cbind[0].is_unsigned = GNUNET_NO;
1112   if (type != 0)
1113     {
1114       if (vhash != NULL)
1115         {
1116           ret =
1117             prepared_statement_run_select (plugin,
1118                                            plugin->count_entry_by_hash_vhash_and_type, 
1119                                            1, cbind, 
1120                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1121                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1122                                            MYSQL_TYPE_LONG, &type, GNUNET_YES,
1123                                            -1);
1124         }
1125       else
1126         {
1127           ret =
1128             prepared_statement_run_select (plugin,
1129                                            plugin->count_entry_by_hash_and_type, 
1130                                            1, cbind, 
1131                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1132                                            MYSQL_TYPE_LONG, &type, GNUNET_YES,
1133                                            -1);
1134         }
1135     }
1136   else
1137     {
1138       if (vhash != NULL)
1139         {
1140           ret =
1141             prepared_statement_run_select (plugin,
1142                                            plugin->count_entry_by_hash_and_vhash, 
1143                                            1, cbind,
1144                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1145                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1146                                            -1);
1147
1148         }
1149       else
1150         {
1151           ret =
1152             prepared_statement_run_select (plugin,
1153                                            plugin->count_entry_by_hash,
1154                                            1, cbind, 
1155                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1156                                            -1);
1157         }
1158     }
1159   if ((ret != GNUNET_OK) || (0 >= total))
1160     {
1161       proc (proc_cls, 
1162             NULL, 0, NULL, 0, 0, 0, 
1163             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1164       return;
1165     }
1166   offset = offset % total;
1167   off = (unsigned long long) offset;
1168 #if DEBUG_MYSQL
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Obtaining %llu/%lld result for GET `%s'\n",
1171               off,
1172               total,
1173               GNUNET_h2s (key));
1174 #endif
1175
1176   if (type != GNUNET_BLOCK_TYPE_ANY)
1177     {
1178       if (NULL != vhash)
1179         {
1180           execute_select (plugin,
1181                           plugin->select_entry_by_hash_vhash_and_type, 
1182                           proc, proc_cls,
1183                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1184                           MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
1185                           MYSQL_TYPE_LONG, &type, GNUNET_YES, 
1186                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1187                           -1);
1188         }
1189       else
1190         {
1191           execute_select (plugin,
1192                           plugin->select_entry_by_hash_and_type, 
1193                           proc, proc_cls,
1194                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1195                           MYSQL_TYPE_LONG, &type, GNUNET_YES, 
1196                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1197                           -1);
1198         }
1199     }
1200   else
1201     {
1202       if (NULL != vhash)
1203         {
1204           execute_select (plugin,
1205                           plugin->select_entry_by_hash_and_vhash, 
1206                           proc, proc_cls,
1207                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1208                           MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1209                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, 
1210                           -1);
1211         }
1212       else
1213         {
1214           execute_select (plugin,
1215                           plugin->select_entry_by_hash, 
1216                           proc, proc_cls,
1217                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1218                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, 
1219                           -1);
1220         }
1221     }
1222 }
1223
1224
1225 /**
1226  * Get a zero-anonymity datum from the datastore.
1227  *
1228  * @param cls our "struct Plugin*"
1229  * @param offset offset of the result
1230  * @param type entries of which type should be considered?
1231  *        Use 0 for any type.
1232  * @param iter function to call on each matching value;
1233  *        will be called once with a NULL value at the end
1234  * @param iter_cls closure for iter
1235  */
1236 static void
1237 mysql_plugin_get_zero_anonymity (void *cls,
1238                                  uint64_t offset,
1239                                  enum GNUNET_BLOCK_Type type,
1240                                  PluginDatumProcessor proc, void *proc_cls)
1241 {
1242   struct Plugin *plugin = cls;
1243   unsigned long long off;
1244
1245   off = (unsigned long long) offset;
1246   execute_select (plugin,
1247                   plugin->zero_iter,
1248                   proc, proc_cls,
1249                   MYSQL_TYPE_LONG, &type, GNUNET_YES,
1250                   MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1251                   -1);
1252
1253 }
1254
1255
1256 /**
1257  * Context for 'repl_proc' function.
1258  */
1259 struct ReplCtx
1260 {
1261   
1262   /**
1263    * Plugin handle.
1264    */
1265   struct Plugin *plugin;
1266   
1267   /**
1268    * Function to call for the result (or the NULL).
1269    */
1270   PluginDatumProcessor proc;
1271   
1272   /**
1273    * Closure for proc.
1274    */
1275   void *proc_cls;
1276 };
1277
1278
1279 /**
1280  * Wrapper for the processor for 'mysql_plugin_get_replication'.
1281  * Decrements the replication counter and calls the original
1282  * iterator.
1283  *
1284  * @param cls closure
1285  * @param key key for the content
1286  * @param size number of bytes in data
1287  * @param data content stored
1288  * @param type type of the content
1289  * @param priority priority of the content
1290  * @param anonymity anonymity-level for the content
1291  * @param expiration expiration time for the content
1292  * @param uid unique identifier for the datum;
1293  *        maybe 0 if no unique identifier is available
1294  *
1295  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1296  *         (continue on call to "next", of course),
1297  *         GNUNET_NO to delete the item and continue (if supported)
1298  */
1299 static int
1300 repl_proc (void *cls,
1301            const GNUNET_HashCode *key,
1302            uint32_t size,
1303            const void *data,
1304            enum GNUNET_BLOCK_Type type,
1305            uint32_t priority,
1306            uint32_t anonymity,
1307            struct GNUNET_TIME_Absolute expiration, 
1308            uint64_t uid)
1309 {
1310   struct ReplCtx *rc = cls;
1311   struct Plugin *plugin = rc->plugin;
1312   unsigned long long oid;
1313   int ret;
1314   int iret;
1315
1316   ret = rc->proc (rc->proc_cls,
1317                   key,
1318                   size, data, 
1319                   type, priority, anonymity, expiration,
1320                   uid);
1321   if (NULL != key)
1322     {
1323       oid = (unsigned long long) uid;
1324       iret = prepared_statement_run (plugin,
1325                                      plugin->dec_repl,
1326                                      NULL,
1327                                      MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
1328                                      -1);
1329       if (iret == GNUNET_SYSERR)
1330         {
1331           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1332                       "Failed to reduce replication counter\n");
1333           return GNUNET_SYSERR;
1334         }
1335     }
1336   return ret;
1337 }
1338
1339
1340 /**
1341  * Get a random item for replication.  Returns a single, not expired,
1342  * random item from those with the highest replication counters.  The
1343  * item's replication counter is decremented by one IF it was positive
1344  * before.  Call 'proc' with all values ZERO or NULL if the datastore
1345  * is empty.
1346  *
1347  * @param cls closure
1348  * @param proc function to call the value (once only).
1349  * @param iter_cls closure for proc
1350  */
1351 static void
1352 mysql_plugin_get_replication (void *cls,
1353                               PluginDatumProcessor proc, void *proc_cls)
1354 {
1355   struct Plugin *plugin = cls;
1356   struct ReplCtx rc;
1357   
1358   rc.plugin = plugin;
1359   rc.proc = proc;
1360   rc.proc_cls = proc_cls;
1361   execute_select (plugin,
1362                   plugin->select_replication, 
1363                   &repl_proc, &rc,
1364                   -1);
1365
1366 }
1367
1368
1369 /**
1370  * Get a random item for expiration.
1371  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1372  *
1373  * @param cls closure
1374  * @param proc function to call the value (once only).
1375  * @param proc_cls closure for proc
1376  */
1377 static void
1378 mysql_plugin_get_expiration (void *cls,
1379                              PluginDatumProcessor proc, void *proc_cls)
1380 {
1381   struct Plugin *plugin = cls;
1382   long long nt;
1383
1384   nt = (long long) GNUNET_TIME_absolute_get().abs_value;
1385   execute_select (plugin,
1386                   plugin->select_expiration, 
1387                   proc, proc_cls,
1388                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1389                   -1);
1390
1391 }
1392
1393
1394 /**
1395  * Drop database.
1396  *
1397  * @param cls the "struct Plugin*"
1398  */
1399 static void 
1400 mysql_plugin_drop (void *cls)
1401 {
1402   struct Plugin *plugin = cls;
1403
1404   if (GNUNET_OK != run_statement (plugin,
1405                                   "DROP TABLE gn090"))
1406     return;           /* error */
1407   plugin->env->duc (plugin->env->cls, 0);
1408 }
1409
1410
1411 /**
1412  * Entry point for the plugin.
1413  *
1414  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1415  * @return our "struct Plugin*"
1416  */
1417 void *
1418 libgnunet_plugin_datastore_mysql_init (void *cls)
1419 {
1420   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1421   struct GNUNET_DATASTORE_PluginFunctions *api;
1422   struct Plugin *plugin;
1423
1424   plugin = GNUNET_malloc (sizeof (struct Plugin));
1425   plugin->env = env;
1426   plugin->cnffile = get_my_cnf_path (env->cfg);
1427   if (GNUNET_OK != iopen (plugin))
1428     {
1429       iclose (plugin);
1430       GNUNET_free_non_null (plugin->cnffile);
1431       GNUNET_free (plugin);
1432       return NULL;
1433     }
1434 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1435 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1436   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1437              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1438              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1439              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1440              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1441              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1442              " hash BINARY(64) NOT NULL DEFAULT '',"
1443              " vhash BINARY(64) NOT NULL DEFAULT '',"
1444              " value BLOB NOT NULL DEFAULT '',"
1445              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1446              " PRIMARY KEY (uid),"
1447              " INDEX idx_hash (hash(64)),"
1448              " INDEX idx_hash_uid (hash(64),uid),"
1449              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1450              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1451              " INDEX idx_prio (prio),"
1452              " INDEX idx_repl (repl),"
1453              " INDEX idx_expire_prio (expire,prio),"
1454              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1455              ") ENGINE=InnoDB") ||
1456       MRUNS ("SET AUTOCOMMIT = 1") ||
1457       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1458       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1459       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1460       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1461       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1462       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1463                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1464       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1465       || PINIT (plugin->get_size, SELECT_SIZE)
1466       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1467       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1468       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1469                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1470       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1471       || PINIT (plugin->dec_repl, DEC_REPL)
1472       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1473       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1474       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1475     {
1476       iclose (plugin);
1477       GNUNET_free_non_null (plugin->cnffile);
1478       GNUNET_free (plugin);
1479       return NULL;
1480     }
1481 #undef PINIT
1482 #undef MRUNS
1483
1484   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1485   api->cls = plugin;
1486   api->estimate_size = &mysql_plugin_estimate_size;
1487   api->put = &mysql_plugin_put;
1488   api->update = &mysql_plugin_update;
1489   api->get_key = &mysql_plugin_get_key;
1490   api->get_replication = &mysql_plugin_get_replication;
1491   api->get_expiration = &mysql_plugin_get_expiration;
1492   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1493   api->drop = &mysql_plugin_drop;
1494   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1495                    "mysql", _("Mysql database running\n"));
1496   return api;
1497 }
1498
1499
1500 /**
1501  * Exit point from the plugin.
1502  * @param cls our "struct Plugin*"
1503  * @return always NULL
1504  */
1505 void *
1506 libgnunet_plugin_datastore_mysql_done (void *cls)
1507 {
1508   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1509   struct Plugin *plugin = api->cls;
1510
1511   iclose (plugin);
1512   GNUNET_free_non_null (plugin->cnffile);
1513   GNUNET_free (plugin);
1514   GNUNET_free (api);
1515   mysql_library_end ();
1516   return NULL;
1517 }
1518
1519 /* end of plugin_datastore_mysql.c */