remove send on connect
[oweals/gnunet.git] / src / datastore / plugin_datastore_mysql.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_mysql.c
23  * @brief mysql-based datastore backend
24  * @author Igor Wronsky
25  * @author Christian Grothoff
26  *
27  * NOTE: This db module does NOT work with mysql prior to 4.1 since
28  * it uses prepared statements.  MySQL 5.0.46 promises to fix a bug
29  * in MyISAM that is causing us grief.  At the time of this writing,
30  * that version is yet to be released.  In anticipation, the code
31  * will use MyISAM with 5.0.46 (and higher).  If you run such a
32  * version, please run "make check" to verify that the MySQL bug
33  * was actually fixed in your version (and if not, change the
34  * code below to use MyISAM for gn071).
35  *
36  * HIGHLIGHTS
37  *
38  * Pros
39  * + On up-to-date hardware where mysql can be used comfortably, this
40  *   module will have better performance than the other db choices
41  *   (according to our tests).
42  * + Its often possible to recover the mysql database from internal
43  *   inconsistencies. The other db choices do not support repair!
44  * Cons
45  * - Memory usage (Comment: "I have 1G and it never caused me trouble")
46  * - Manual setup
47  *
48  * MANUAL SETUP INSTRUCTIONS
49  *
50  * 1) in /etc/gnunet.conf, set
51  * @verbatim
52        [datastore]
53        DATABASE = "mysql"
54    @endverbatim
55  * 2) Then access mysql as root,
56  * @verbatim
57      $ mysql -u root -p
58    @endverbatim
59  *    and do the following. [You should replace $USER with the username
60  *    that will be running the gnunetd process].
61  * @verbatim
62       CREATE DATABASE gnunet;
63       GRANT select,insert,update,delete,create,alter,drop,create temporary tables
64          ON gnunet.* TO $USER@localhost;
65       SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like');
66       FLUSH PRIVILEGES;
67    @endverbatim
68  * 3) In the $HOME directory of $USER, create a ".my.cnf" file
69  *    with the following lines
70  * @verbatim
71       [client]
72       user=$USER
73       password=$the_password_you_like
74    @endverbatim
75  *
76  * Thats it. Note that .my.cnf file is a security risk unless its on
77  * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic
78  * link. Even greater security risk can be achieved by setting no
79  * password for $USER.  Luckily $USER has only priviledges to mess
80  * up GNUnet's tables, nothing else (unless you give him more,
81  * of course).<p>
82  *
83  * 4) Still, perhaps you should briefly try if the DB connection
84  *    works. First, login as $USER. Then use,
85  *
86  * @verbatim
87      $ mysql -u $USER -p $the_password_you_like
88      mysql> use gnunet;
89    @endverbatim
90  *
91  *    If you get the message &quot;Database changed&quot; it probably works.
92  *
93  *    [If you get &quot;ERROR 2002: Can't connect to local MySQL server
94  *     through socket '/tmp/mysql.sock' (2)&quot; it may be resolvable by
95  *     &quot;ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock&quot;
96  *     so there may be some additional trouble depending on your mysql setup.]
97  *
98  * REPAIRING TABLES
99  *
100  * - Its probably healthy to check your tables for inconsistencies
101  *   every now and then.
102  * - If you get odd SEGVs on gnunetd startup, it might be that the mysql
103  *   databases have been corrupted.
104  * - The tables can be verified/fixed in two ways;
105  *   1) by running mysqlcheck -A, or
106  *   2) by executing (inside of mysql using the GNUnet database):
107  * @verbatim
108      mysql> REPAIR TABLE gn090;
109    @endverbatim
110  *
111  * PROBLEMS?
112  *
113  * If you have problems related to the mysql module, your best
114  * friend is probably the mysql manual. The first thing to check
115  * is that mysql is basically operational, that you can connect
116  * to it, create tables, issue queries etc.
117  */
118
119 #include "platform.h"
120 #include "gnunet_datastore_plugin.h"
121 #include "gnunet_util_lib.h"
122 #include <mysql/mysql.h>
123
124 #define DEBUG_MYSQL GNUNET_NO
125
126 #define MAX_DATUM_SIZE 65536
127
128 /**
129  * Maximum number of supported parameters for a prepared
130  * statement.  Increase if needed.
131  */
132 #define MAX_PARAM 16
133
134 /**
135  * Die with an error message that indicates
136  * a failure of the command 'cmd' with the message given
137  * by strerror(errno).
138  */
139 #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0);
140
141 /**
142  * Log an error message at log-level 'level' that indicates
143  * a failure of the command 'cmd' on file 'filename'
144  * with the message given by strerror(errno).
145  */
146 #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
147
148
149 struct GNUNET_MysqlStatementHandle
150 {
151   struct GNUNET_MysqlStatementHandle *next;
152
153   struct GNUNET_MysqlStatementHandle *prev;
154
155   char *query;
156
157   MYSQL_STMT *statement;
158
159   int valid;
160
161 };
162
163
164 /**
165  * Context for all functions in this plugin.
166  */
167 struct Plugin 
168 {
169   /**
170    * Our execution environment.
171    */
172   struct GNUNET_DATASTORE_PluginEnvironment *env;
173
174   /**
175    * Handle to talk to MySQL.
176    */
177   MYSQL *dbf;
178   
179   /**
180    * We keep all prepared statements in a DLL.  This is the head.
181    */
182   struct GNUNET_MysqlStatementHandle *shead;
183
184   /**
185    * We keep all prepared statements in a DLL.  This is the tail.
186    */
187   struct GNUNET_MysqlStatementHandle *stail;
188
189   /**
190    * Filename of "my.cnf" (msyql configuration).
191    */
192   char *cnffile;
193
194   /**
195    * Prepared statements.
196    */
197 #define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,rvalue,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?,?)"
198   struct GNUNET_MysqlStatementHandle *insert_entry;
199   
200 #define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
201   struct GNUNET_MysqlStatementHandle *delete_entry_by_uid;
202
203 #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 WHERE hash=?"
204   struct GNUNET_MysqlStatementHandle *count_entry_by_hash;
205   
206 #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
207   struct GNUNET_MysqlStatementHandle *select_entry_by_hash;
208
209 #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?"
210   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash;
211
212 #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
213   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash;
214  
215 #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND type=?"
216   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type;
217
218 #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
219   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type;
220  
221 #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?"
222   struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type;
223   
224 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
225   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
226
227 #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
228   struct GNUNET_MysqlStatementHandle *update_entry;
229
230 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
231   struct GNUNET_MysqlStatementHandle *dec_repl;
232
233 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
234   struct GNUNET_MysqlStatementHandle *get_size;
235
236 #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND type=? ORDER BY uid DESC LIMIT 1 OFFSET ?"
237   struct GNUNET_MysqlStatementHandle *zero_iter;
238
239 #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
240   "UNION "\
241   "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY prio ASC LIMIT 1) "\
242   "ORDER BY expire ASC LIMIT 1"
243   struct GNUNET_MysqlStatementHandle *select_expiration;
244
245 #define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 ORDER BY repl DESC,RAND() LIMIT 1"
246   struct GNUNET_MysqlStatementHandle *select_replication;
247
248 };
249
250
251 /**
252  * Obtain the location of ".my.cnf".
253  *
254  * @param cfg our configuration
255  * @return NULL on error
256  */
257 static char *
258 get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
259 {
260   char *cnffile;
261   char *home_dir;
262   struct stat st;
263 #ifndef WINDOWS
264   struct passwd *pw;
265 #endif
266   int configured;
267
268 #ifndef WINDOWS
269   pw = getpwuid (getuid ());
270   if (!pw)
271     {
272       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 
273                            "getpwuid");
274       return NULL;
275     }
276   if (GNUNET_YES ==
277       GNUNET_CONFIGURATION_have_value (cfg,
278                                        "datastore-mysql", "CONFIG"))
279     {
280       GNUNET_assert (GNUNET_OK == 
281                      GNUNET_CONFIGURATION_get_value_filename (cfg,
282                                                               "datastore-mysql", "CONFIG", &cnffile));
283       configured = GNUNET_YES;
284     }
285   else
286     {
287       home_dir = GNUNET_strdup (pw->pw_dir);
288 #else
289       home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
290       plibc_conv_to_win_path ("~/", home_dir);
291 #endif
292       GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir);
293       GNUNET_free (home_dir);
294       configured = GNUNET_NO;
295     }
296   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
297               _("Trying to use file `%s' for MySQL configuration.\n"),
298               cnffile);
299   if ((0 != STAT (cnffile, &st)) ||
300       (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode)))
301     {
302       if (configured == GNUNET_YES)
303         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
304                     _("Could not access file `%s': %s\n"), cnffile,
305                     STRERROR (errno));
306       GNUNET_free (cnffile);
307       return NULL;
308     }
309   return cnffile;
310 }
311
312
313 /**
314  * Close database connection and all prepared statements (we got a DB
315  * disconnect error).
316  * 
317  * @param plugin plugin context
318  */
319 static int
320 iclose (struct Plugin *plugin)
321 {
322   struct GNUNET_MysqlStatementHandle *s;
323
324   for (s = plugin->shead; s != NULL; s = s->next)
325     {
326       if (s->valid)
327         {
328           mysql_stmt_close (s->statement);
329           s->valid = GNUNET_NO;
330         }
331     }
332   if (plugin->dbf != NULL)
333     {
334       mysql_close (plugin->dbf);
335       plugin->dbf = NULL;
336     }
337   return GNUNET_OK;
338 }
339
340
341 /**
342  * Open the connection with the database (and initialize
343  * our default options).
344  *
345  * @param plugin plugin context
346  * @return GNUNET_OK on success
347  */
348 static int
349 iopen (struct Plugin *plugin)
350 {
351   char *mysql_dbname;
352   char *mysql_server;
353   char *mysql_user;
354   char *mysql_password;
355   unsigned long long mysql_port;
356   my_bool reconnect;
357   unsigned int timeout;
358
359   plugin->dbf = mysql_init (NULL);
360   if (plugin->dbf == NULL)
361     return GNUNET_SYSERR;
362   if (plugin->cnffile != NULL)
363     mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile);
364   mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
365   reconnect = 0;
366   mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect);
367   timeout = 120; /* in seconds */
368   mysql_options (plugin->dbf,
369                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
370   mysql_options(plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
371   timeout = 60; /* in seconds */
372   mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
373   mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
374   mysql_dbname = NULL;
375   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
376                                                      "datastore-mysql", "DATABASE"))
377     GNUNET_assert (GNUNET_OK == 
378                    GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
379                                                           "datastore-mysql", "DATABASE", 
380                                                           &mysql_dbname));
381   else
382     mysql_dbname = GNUNET_strdup ("gnunet");
383   mysql_user = NULL;
384   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
385                                                      "datastore-mysql", "USER"))
386     {
387       GNUNET_assert (GNUNET_OK == 
388                     GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
389                                                            "datastore-mysql", "USER", 
390                                                            &mysql_user));
391     }
392   mysql_password = NULL;
393   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
394                                                      "datastore-mysql", "PASSWORD"))
395     {
396       GNUNET_assert (GNUNET_OK ==
397                     GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
398                                                            "datastore-mysql", "PASSWORD",
399                                                            &mysql_password));
400     }
401   mysql_server = NULL;
402   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
403                                                      "datastore-mysql", "HOST"))
404     {
405       GNUNET_assert (GNUNET_OK == 
406                     GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
407                                                            "datastore-mysql", "HOST", 
408                                                            &mysql_server));
409     }
410   mysql_port = 0;
411   if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
412                                                      "datastore-mysql", "PORT"))
413     {
414       GNUNET_assert (GNUNET_OK ==
415                     GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, "datastore-mysql",
416                                                            "PORT", &mysql_port));
417     }
418
419   GNUNET_assert (mysql_dbname != NULL);
420   mysql_real_connect (plugin->dbf, 
421                       mysql_server, 
422                       mysql_user, mysql_password,
423                       mysql_dbname, 
424                       (unsigned int) mysql_port, NULL,
425                       CLIENT_IGNORE_SIGPIPE);
426   GNUNET_free_non_null (mysql_server);
427   GNUNET_free_non_null (mysql_user);
428   GNUNET_free_non_null (mysql_password);
429   GNUNET_free (mysql_dbname);
430   if (mysql_error (plugin->dbf)[0])
431     {
432       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
433                  "mysql_real_connect", plugin);
434       return GNUNET_SYSERR;
435     }
436   return GNUNET_OK;
437 }
438
439
440 /**
441  * Run the given MySQL statement.
442  *
443  * @param plugin plugin context
444  * @param statement SQL statement to run
445  * @return GNUNET_OK on success, GNUNET_SYSERR on error
446  */
447 static int
448 run_statement (struct Plugin *plugin,
449                const char *statement)
450 {
451   if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin)))
452     return GNUNET_SYSERR;
453   mysql_query (plugin->dbf, statement);
454   if (mysql_error (plugin->dbf)[0])
455     {
456       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
457                  "mysql_query", plugin);
458       iclose (plugin);
459       return GNUNET_SYSERR;
460     }
461   return GNUNET_OK;
462 }
463
464
465 /**
466  * Create a prepared statement.
467  *
468  * @param plugin plugin context
469  * @param statement SQL statement text to prepare
470  * @return NULL on error
471  */
472 static struct GNUNET_MysqlStatementHandle *
473 prepared_statement_create (struct Plugin *plugin, 
474                            const char *statement)
475 {
476   struct GNUNET_MysqlStatementHandle *ret;
477
478   ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle));
479   ret->query = GNUNET_strdup (statement);
480   GNUNET_CONTAINER_DLL_insert (plugin->shead,
481                                plugin->stail,
482                                ret);
483   return ret;
484 }
485
486
487 /**
488  * Prepare a statement for running.
489  *
490  * @param plugin plugin context
491  * @param ret handle to prepared statement
492  * @return GNUNET_OK on success
493  */
494 static int
495 prepare_statement (struct Plugin *plugin, 
496                    struct GNUNET_MysqlStatementHandle *ret)
497 {
498   if (GNUNET_YES == ret->valid)
499     return GNUNET_OK;
500   if ((NULL == plugin->dbf) && 
501       (GNUNET_OK != iopen (plugin)))
502     return GNUNET_SYSERR;
503   ret->statement = mysql_stmt_init (plugin->dbf);
504   if (ret->statement == NULL)
505     {
506       iclose (plugin);
507       return GNUNET_SYSERR;
508     }
509   if (mysql_stmt_prepare (ret->statement, 
510                           ret->query,
511                           strlen (ret->query)))
512     {
513       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
514                        "mysql",
515                        _("Failed to prepare statement `%s'\n"),
516                        ret->query);
517       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
518                  "mysql_stmt_prepare", 
519                  plugin);
520       mysql_stmt_close (ret->statement);
521       ret->statement = NULL;
522       iclose (plugin);
523       return GNUNET_SYSERR;
524     }
525   ret->valid = GNUNET_YES;
526   return GNUNET_OK;
527
528 }
529
530
531 /**
532  * Bind the parameters for the given MySQL statement
533  * and run it.
534  *
535  * @param plugin plugin context
536  * @param s statement to bind and run
537  * @param ap arguments for the binding
538  * @return GNUNET_SYSERR on error, GNUNET_OK on success
539  */
540 static int
541 init_params (struct Plugin *plugin,
542              struct GNUNET_MysqlStatementHandle *s,
543              va_list ap)
544 {
545   MYSQL_BIND qbind[MAX_PARAM];
546   unsigned int pc;
547   unsigned int off;
548   enum enum_field_types ft;
549
550   pc = mysql_stmt_param_count (s->statement);
551   if (pc > MAX_PARAM)
552     {
553       /* increase internal constant! */
554       GNUNET_break (0);
555       return GNUNET_SYSERR;
556     }
557   memset (qbind, 0, sizeof (qbind));
558   off = 0;
559   ft = 0;
560   while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types))))
561     {
562       qbind[off].buffer_type = ft;
563       switch (ft)
564         {
565         case MYSQL_TYPE_FLOAT:
566           qbind[off].buffer = va_arg (ap, float *);
567           break;
568         case MYSQL_TYPE_LONGLONG:
569           qbind[off].buffer = va_arg (ap, unsigned long long *);
570           qbind[off].is_unsigned = va_arg (ap, int);
571           break;
572         case MYSQL_TYPE_LONG:
573           qbind[off].buffer = va_arg (ap, unsigned int *);
574           qbind[off].is_unsigned = va_arg (ap, int);
575           break;
576         case MYSQL_TYPE_VAR_STRING:
577         case MYSQL_TYPE_STRING:
578         case MYSQL_TYPE_BLOB:
579           qbind[off].buffer = va_arg (ap, void *);
580           qbind[off].buffer_length = va_arg (ap, unsigned long);
581           qbind[off].length = va_arg (ap, unsigned long *);
582           break;
583         default:
584           /* unsupported type */
585           GNUNET_break (0);
586           return GNUNET_SYSERR;
587         }
588       pc--;
589       off++;
590     }
591   if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) )
592     {
593       GNUNET_assert (0);
594       return GNUNET_SYSERR;
595     }
596   if (mysql_stmt_bind_param (s->statement, qbind))
597     {
598       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
599                   _("`%s' failed at %s:%d with error: %s\n"),
600                   "mysql_stmt_bind_param",
601                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
602       iclose (plugin);
603       return GNUNET_SYSERR;
604     }
605   if (mysql_stmt_execute (s->statement))
606     {
607       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
608                   _("`%s' for `%s' failed at %s:%d with error: %s\n"),
609                   "mysql_stmt_execute",
610                   s->query,
611                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
612       iclose (plugin);
613       return GNUNET_SYSERR;
614     }
615   return GNUNET_OK;
616 }
617
618
619 /**
620  * Run a prepared SELECT statement.
621  *
622  * @param plugin plugin context
623  * @param s statement to run
624  * @param result_size number of elements in results array
625  * @param results pointer to already initialized MYSQL_BIND
626  *        array (of sufficient size) for passing results
627  * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
628  *        values (size + buffer-reference for pointers); terminated
629  *        with "-1"
630  * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result)
631  */
632 static int
633 prepared_statement_run_select_va (struct Plugin *plugin,
634                                   struct GNUNET_MysqlStatementHandle *s,
635                                   unsigned int result_size,
636                                   MYSQL_BIND *results,
637                                   va_list ap)
638 {
639   int ret;
640   unsigned int rsize;
641
642   if (GNUNET_OK != prepare_statement (plugin, s))
643     {
644       GNUNET_break (0);
645       return GNUNET_SYSERR;
646     }
647   if (GNUNET_OK != init_params (plugin, s, ap))
648     {
649       GNUNET_break (0);
650       return GNUNET_SYSERR;
651     }
652   rsize = mysql_stmt_field_count (s->statement);
653   if (rsize > result_size)
654     {
655       GNUNET_break (0);
656       return GNUNET_SYSERR;
657     }
658   if (mysql_stmt_bind_result (s->statement, results))
659     {
660       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
661                   _("`%s' failed at %s:%d with error: %s\n"),
662                   "mysql_stmt_bind_result",
663                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
664       iclose (plugin);
665       return GNUNET_SYSERR;
666     }
667   ret = mysql_stmt_fetch (s->statement);
668   if (ret == MYSQL_NO_DATA)
669     return GNUNET_NO;
670   if (ret != 0)
671     {
672       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
673                   _("`%s' failed at %s:%d with error: %s\n"),
674                   "mysql_stmt_fetch",
675                   __FILE__, __LINE__, mysql_stmt_error (s->statement));
676       iclose (plugin);
677       return GNUNET_SYSERR;
678     }
679   mysql_stmt_reset (s->statement);
680   return GNUNET_OK;
681 }
682
683
684 /**
685  * Run a prepared SELECT statement.
686  *
687  * @param plugin plugin context
688  * @param s statement to run
689  * @param result_size number of elements in results array
690  * @param results pointer to already initialized MYSQL_BIND
691  *        array (of sufficient size) for passing results
692  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
693  *        values (size + buffer-reference for pointers); terminated
694  *        with "-1"
695  * @return GNUNET_SYSERR on error, otherwise
696  *         the number of successfully affected (or queried) rows
697  */
698 static int
699 prepared_statement_run_select (struct Plugin *plugin,
700                                struct GNUNET_MysqlStatementHandle *s,
701                                unsigned int result_size,
702                                MYSQL_BIND *results,
703                                ...)
704 {
705   va_list ap;
706   int ret;
707
708   va_start (ap, results);
709   ret = prepared_statement_run_select_va (plugin, s, 
710                                           result_size, results,
711                                           ap);
712   va_end (ap);
713   return ret;
714 }
715
716
717 /**
718  * Run a prepared statement that does NOT produce results.
719  *
720  * @param plugin plugin context
721  * @param s statement to run
722  * @param insert_id NULL or address where to store the row ID of whatever
723  *        was inserted (only for INSERT statements!)
724  * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
725  *        values (size + buffer-reference for pointers); terminated
726  *        with "-1"
727  * @return GNUNET_SYSERR on error, otherwise
728  *         the number of successfully affected rows
729  */
730 static int
731 prepared_statement_run (struct Plugin *plugin,
732                         struct GNUNET_MysqlStatementHandle *s,
733                         unsigned long long *insert_id, ...)
734 {
735   va_list ap;
736   int affected;
737
738   if (GNUNET_OK != prepare_statement (plugin, s))
739     return GNUNET_SYSERR;
740   va_start (ap, insert_id);
741   if (GNUNET_OK != init_params (plugin, s, ap))
742     {
743       va_end (ap);
744       return GNUNET_SYSERR;
745     }
746   va_end (ap);
747   affected = mysql_stmt_affected_rows (s->statement);
748   if (NULL != insert_id)
749     *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement);
750   mysql_stmt_reset (s->statement);
751   return affected;
752 }
753
754
755 /**
756  * Delete an entry from the gn090 table.
757  *
758  * @param plugin plugin context
759  * @param uid unique ID of the entry to delete
760  * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error
761  */
762 static int
763 do_delete_entry (struct Plugin *plugin,
764                  unsigned long long uid)
765 {
766   int ret;
767  
768 #if DEBUG_MYSQL
769   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
770               "Deleting value %llu from gn090 table\n",
771               uid);
772 #endif
773   ret = prepared_statement_run (plugin,
774                                 plugin->delete_entry_by_uid,
775                                 NULL,
776                                 MYSQL_TYPE_LONGLONG, &uid, GNUNET_YES,
777                                 -1);
778   if (ret >= 0)
779     return GNUNET_OK;
780   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
781               "Deleting value %llu from gn090 table failed\n",
782               uid);
783   return ret;
784 }
785
786
787 /**
788  * Get an estimate of how much space the database is
789  * currently using.
790  *
791  * @param cls our "struct Plugin *"
792  * @return number of bytes used on disk
793  */
794 static unsigned long long
795 mysql_plugin_estimate_size (void *cls)
796 {
797   struct Plugin *plugin = cls;
798   MYSQL_BIND cbind[1];
799   long long total;
800
801   memset (cbind, 0, sizeof (cbind));
802   total = 0;
803   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
804   cbind[0].buffer = &total;
805   cbind[0].is_unsigned = GNUNET_NO;
806   if (GNUNET_OK != 
807       prepared_statement_run_select (plugin,
808                                      plugin->get_size,
809                                      1, cbind, 
810                                      -1))
811     return 0;
812   return total;
813 }
814
815
816 /**
817  * Store an item in the datastore.
818  *
819  * @param cls closure
820  * @param key key for the item
821  * @param size number of bytes in data
822  * @param data content stored
823  * @param type type of the content
824  * @param priority priority of the content
825  * @param anonymity anonymity-level for the content
826  * @param replication replication-level for the content
827  * @param expiration expiration time for the content
828  * @param msg set to error message
829  * @return GNUNET_OK on success
830  */
831 static int
832 mysql_plugin_put (void *cls,
833                   const GNUNET_HashCode * key,
834                   uint32_t size,
835                   const void *data,
836                   enum GNUNET_BLOCK_Type type,
837                   uint32_t priority,
838                   uint32_t anonymity,
839                   uint32_t replication,
840                   struct GNUNET_TIME_Absolute expiration,
841                   char **msg)
842 {
843   struct Plugin *plugin = cls;
844   unsigned int irepl = replication;
845   unsigned int ipriority = priority;
846   unsigned int ianonymity = anonymity;
847   unsigned long long lexpiration = expiration.abs_value;
848   unsigned long hashSize;
849   unsigned long hashSize2;
850   unsigned long lsize;
851   unsigned long rvalue;
852   GNUNET_HashCode vhash;
853
854   if (size > MAX_DATUM_SIZE)
855     {
856       GNUNET_break (0);
857       return GNUNET_SYSERR;
858     }
859   hashSize = sizeof (GNUNET_HashCode);
860   hashSize2 = sizeof (GNUNET_HashCode);
861   lsize = size;
862   GNUNET_CRYPTO_hash (data, size, &vhash);
863   rvalue = (unsigned long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
864   if (GNUNET_OK !=
865       prepared_statement_run (plugin,
866                               plugin->insert_entry,
867                               NULL,
868                               MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
869                               MYSQL_TYPE_LONG, &type, GNUNET_YES,
870                               MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
871                               MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
872                               MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
873                               MYSQL_TYPE_LONGLONG, &rvalue, GNUNET_YES,
874                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
875                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
876                               MYSQL_TYPE_BLOB, data, lsize, &lsize, 
877                               -1))
878     return GNUNET_SYSERR;    
879 #if DEBUG_MYSQL
880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881               "Inserted value `%s' with size %u into gn090 table\n",
882               GNUNET_h2s (key),
883               (unsigned int) size);
884 #endif
885   if (size > 0)
886     plugin->env->duc (plugin->env->cls,
887                       size);
888   return GNUNET_OK;
889 }
890
891
892 /**
893  * Update the priority for a particular key in the datastore.  If
894  * the expiration time in value is different than the time found in
895  * the datastore, the higher value should be kept.  For the
896  * anonymity level, the lower value is to be used.  The specified
897  * priority should be added to the existing priority, ignoring the
898  * priority in value.
899  *
900  * Note that it is possible for multiple values to match this put.
901  * In that case, all of the respective values are updated.
902  *
903  * @param cls our "struct Plugin*"
904  * @param uid unique identifier of the datum
905  * @param delta by how much should the priority
906  *     change?  If priority + delta < 0 the
907  *     priority should be set to 0 (never go
908  *     negative).
909  * @param expire new expiration time should be the
910  *     MAX of any existing expiration time and
911  *     this value
912  * @param msg set to error message
913  * @return GNUNET_OK on success
914  */
915 static int
916 mysql_plugin_update (void *cls,
917                      uint64_t uid,
918                      int delta, 
919                      struct GNUNET_TIME_Absolute expire,
920                      char **msg)
921 {
922   struct Plugin *plugin = cls;
923   unsigned long long vkey = uid;
924   unsigned long long lexpire = expire.abs_value;
925   int ret;
926
927 #if DEBUG_MYSQL
928   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929               "Updating value %llu adding %d to priority and maxing exp at %llu\n",
930               vkey,
931               delta,
932               lexpire);
933 #endif
934   ret = prepared_statement_run (plugin,
935                                 plugin->update_entry,
936                                 NULL,
937                                 MYSQL_TYPE_LONG, &delta, GNUNET_NO,
938                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
939                                 MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES,
940                                 MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, 
941                                 -1);
942   if (ret != GNUNET_OK)
943     {
944       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
945                   "Failed to update value %llu\n",
946                   vkey);
947     }
948   return ret;
949 }
950
951
952 /**
953  * Run the given select statement and call 'proc' on the resulting
954  * values (which must be in particular positions).
955  *
956  * @param plugin the plugin handle
957  * @param stmt select statement to run
958  * @param proc function to call on result
959  * @param proc_cls closure for proc
960  * @param ... arguments to initialize stmt
961  */
962 static void 
963 execute_select (struct Plugin *plugin,
964                 struct GNUNET_MysqlStatementHandle *stmt,
965                 PluginDatumProcessor proc, void *proc_cls,
966                 ...)
967 {
968   va_list ap;
969   int ret;
970   unsigned int type;
971   unsigned int priority;
972   unsigned int anonymity;
973   unsigned long long exp;
974   unsigned long hashSize;
975   unsigned long size;
976   unsigned long long uid;
977   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
978   GNUNET_HashCode key;
979   struct GNUNET_TIME_Absolute expiration;
980   MYSQL_BIND rbind[7];
981
982   hashSize = sizeof (GNUNET_HashCode);
983   memset (rbind, 0, sizeof (rbind));
984   rbind[0].buffer_type = MYSQL_TYPE_LONG;
985   rbind[0].buffer = &type;
986   rbind[0].is_unsigned = 1;
987   rbind[1].buffer_type = MYSQL_TYPE_LONG;
988   rbind[1].buffer = &priority;
989   rbind[1].is_unsigned = 1;
990   rbind[2].buffer_type = MYSQL_TYPE_LONG;
991   rbind[2].buffer = &anonymity;
992   rbind[2].is_unsigned = 1;
993   rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
994   rbind[3].buffer = &exp;
995   rbind[3].is_unsigned = 1;
996   rbind[4].buffer_type = MYSQL_TYPE_BLOB;
997   rbind[4].buffer = &key;
998   rbind[4].buffer_length = hashSize;
999   rbind[4].length = &hashSize;
1000   rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1001   rbind[5].buffer = value;
1002   rbind[5].buffer_length = size = sizeof (value);
1003   rbind[5].length = &size;
1004   rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1005   rbind[6].buffer = &uid;
1006   rbind[6].is_unsigned = 1;
1007
1008   va_start (ap, proc_cls);
1009   ret = prepared_statement_run_select_va (plugin,
1010                                           stmt,
1011                                           7, rbind,
1012                                           ap);
1013   va_end (ap);
1014   if (ret <= 0)
1015     {
1016       proc (proc_cls, 
1017             NULL, 0, NULL, 0, 0, 0, 
1018             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1019       return;
1020     }
1021   GNUNET_assert (size <= sizeof(value));
1022   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1023        (hashSize != sizeof (GNUNET_HashCode)) )
1024     {
1025       GNUNET_break (0);
1026       proc (proc_cls, 
1027             NULL, 0, NULL, 0, 0, 0, 
1028             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1029       return;
1030     }     
1031 #if DEBUG_MYSQL
1032   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033               "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1034               (unsigned int) size,
1035               GNUNET_h2s (&key),
1036               priority,
1037               anonymity,
1038               exp);
1039 #endif
1040   GNUNET_assert (size < MAX_DATUM_SIZE);
1041   expiration.abs_value = exp;
1042   ret = proc (proc_cls, 
1043               &key,
1044               size, value,
1045               type, priority, anonymity, expiration,
1046               uid);
1047   if (ret == GNUNET_NO)
1048     {
1049       do_delete_entry (plugin, uid);
1050       if (size != 0)
1051         plugin->env->duc (plugin->env->cls,
1052                           - size);
1053     }
1054 }
1055
1056
1057
1058 /**
1059  * Get one of the results for a particular key in the datastore.
1060  *
1061  * @param cls closure
1062  * @param offset offset of the result (modulo num-results); 
1063  *               specific ordering does not matter for the offset
1064  * @param key key to match, never NULL
1065  * @param vhash hash of the value, maybe NULL (to
1066  *        match all values that have the right key).
1067  *        Note that for DBlocks there is no difference
1068  *        betwen key and vhash, but for other blocks
1069  *        there may be!
1070  * @param type entries of which type are relevant?
1071  *     Use 0 for any type.
1072  * @param proc function to call on the matching value, 
1073  *        with NULL for if no value matches
1074  * @param proc_cls closure for proc
1075  */
1076 static void
1077 mysql_plugin_get_key (void *cls,
1078                       uint64_t offset,
1079                       const GNUNET_HashCode *key,
1080                       const GNUNET_HashCode *vhash,
1081                       enum GNUNET_BLOCK_Type type,                    
1082                       PluginDatumProcessor proc, void *proc_cls)
1083 {
1084   struct Plugin *plugin = cls;
1085   int ret;
1086   MYSQL_BIND cbind[1];
1087   long long total;
1088   unsigned long hashSize;
1089   unsigned long hashSize2;
1090   unsigned long long off;
1091
1092   GNUNET_assert (key != NULL);
1093   GNUNET_assert (NULL != proc);
1094   hashSize = sizeof (GNUNET_HashCode);
1095   hashSize2 = sizeof (GNUNET_HashCode);
1096   memset (cbind, 0, sizeof (cbind));
1097   total = -1;
1098   cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
1099   cbind[0].buffer = &total;
1100   cbind[0].is_unsigned = GNUNET_NO;
1101   if (type != 0)
1102     {
1103       if (vhash != NULL)
1104         {
1105           ret =
1106             prepared_statement_run_select (plugin,
1107                                            plugin->count_entry_by_hash_vhash_and_type, 
1108                                            1, cbind, 
1109                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1110                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1111                                            MYSQL_TYPE_LONG, &type, GNUNET_YES,
1112                                            -1);
1113         }
1114       else
1115         {
1116           ret =
1117             prepared_statement_run_select (plugin,
1118                                            plugin->count_entry_by_hash_and_type, 
1119                                            1, cbind, 
1120                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1121                                            MYSQL_TYPE_LONG, &type, GNUNET_YES,
1122                                            -1);
1123         }
1124     }
1125   else
1126     {
1127       if (vhash != NULL)
1128         {
1129           ret =
1130             prepared_statement_run_select (plugin,
1131                                            plugin->count_entry_by_hash_and_vhash, 
1132                                            1, cbind,
1133                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1134                                            MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 
1135                                            -1);
1136
1137         }
1138       else
1139         {
1140           ret =
1141             prepared_statement_run_select (plugin,
1142                                            plugin->count_entry_by_hash,
1143                                            1, cbind, 
1144                                            MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1145                                            -1);
1146         }
1147     }
1148   if ((ret != GNUNET_OK) || (0 >= total))
1149     {
1150       proc (proc_cls, 
1151             NULL, 0, NULL, 0, 0, 0, 
1152             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1153       return;
1154     }
1155   offset = offset % total;
1156   off = (unsigned long long) offset;
1157 #if DEBUG_MYSQL
1158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159               "Obtaining %llu/%lld result for GET `%s'\n",
1160               off,
1161               total,
1162               GNUNET_h2s (key));
1163 #endif
1164
1165   if (type != GNUNET_BLOCK_TYPE_ANY)
1166     {
1167       if (NULL != vhash)
1168         {
1169           execute_select (plugin,
1170                           plugin->select_entry_by_hash_vhash_and_type, 
1171                           proc, proc_cls,
1172                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1173                           MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
1174                           MYSQL_TYPE_LONG, &type, GNUNET_YES, 
1175                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1176                           -1);
1177         }
1178       else
1179         {
1180           execute_select (plugin,
1181                           plugin->select_entry_by_hash_and_type, 
1182                           proc, proc_cls,
1183                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1184                           MYSQL_TYPE_LONG, &type, GNUNET_YES, 
1185                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1186                           -1);
1187         }
1188     }
1189   else
1190     {
1191       if (NULL != vhash)
1192         {
1193           execute_select (plugin,
1194                           plugin->select_entry_by_hash_and_vhash, 
1195                           proc, proc_cls,
1196                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
1197                           MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
1198                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, 
1199                           -1);
1200         }
1201       else
1202         {
1203           execute_select (plugin,
1204                           plugin->select_entry_by_hash, 
1205                           proc, proc_cls,
1206                           MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1207                           MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, 
1208                           -1);
1209         }
1210     }
1211 }
1212
1213
1214 /**
1215  * Get a zero-anonymity datum from the datastore.
1216  *
1217  * @param cls our "struct Plugin*"
1218  * @param offset offset of the result
1219  * @param type entries of which type should be considered?
1220  *        Use 0 for any type.
1221  * @param proc function to call on a matching value or NULL
1222  * @param proc_cls closure for iter
1223  */
1224 static void
1225 mysql_plugin_get_zero_anonymity (void *cls,
1226                                  uint64_t offset,
1227                                  enum GNUNET_BLOCK_Type type,
1228                                  PluginDatumProcessor proc, void *proc_cls)
1229 {
1230   struct Plugin *plugin = cls;
1231   unsigned long long off;
1232
1233   off = (unsigned long long) offset;
1234   execute_select (plugin,
1235                   plugin->zero_iter,
1236                   proc, proc_cls,
1237                   MYSQL_TYPE_LONG, &type, GNUNET_YES,
1238                   MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1239                   -1);
1240
1241 }
1242
1243
1244 /**
1245  * Context for 'repl_proc' function.
1246  */
1247 struct ReplCtx
1248 {
1249   
1250   /**
1251    * Plugin handle.
1252    */
1253   struct Plugin *plugin;
1254   
1255   /**
1256    * Function to call for the result (or the NULL).
1257    */
1258   PluginDatumProcessor proc;
1259   
1260   /**
1261    * Closure for proc.
1262    */
1263   void *proc_cls;
1264 };
1265
1266
1267 /**
1268  * Wrapper for the processor for 'mysql_plugin_get_replication'.
1269  * Decrements the replication counter and calls the original
1270  * iterator.
1271  *
1272  * @param cls closure
1273  * @param key key for the content
1274  * @param size number of bytes in data
1275  * @param data content stored
1276  * @param type type of the content
1277  * @param priority priority of the content
1278  * @param anonymity anonymity-level for the content
1279  * @param expiration expiration time for the content
1280  * @param uid unique identifier for the datum;
1281  *        maybe 0 if no unique identifier is available
1282  *
1283  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1284  *         (continue on call to "next", of course),
1285  *         GNUNET_NO to delete the item and continue (if supported)
1286  */
1287 static int
1288 repl_proc (void *cls,
1289            const GNUNET_HashCode *key,
1290            uint32_t size,
1291            const void *data,
1292            enum GNUNET_BLOCK_Type type,
1293            uint32_t priority,
1294            uint32_t anonymity,
1295            struct GNUNET_TIME_Absolute expiration, 
1296            uint64_t uid)
1297 {
1298   struct ReplCtx *rc = cls;
1299   struct Plugin *plugin = rc->plugin;
1300   unsigned long long oid;
1301   int ret;
1302   int iret;
1303
1304   ret = rc->proc (rc->proc_cls,
1305                   key,
1306                   size, data, 
1307                   type, priority, anonymity, expiration,
1308                   uid);
1309   if (NULL != key)
1310     {
1311       oid = (unsigned long long) uid;
1312       iret = prepared_statement_run (plugin,
1313                                      plugin->dec_repl,
1314                                      NULL,
1315                                      MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
1316                                      -1);
1317       if (iret == GNUNET_SYSERR)
1318         {
1319           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1320                       "Failed to reduce replication counter\n");
1321           return GNUNET_SYSERR;
1322         }
1323     }
1324   return ret;
1325 }
1326
1327
1328 /**
1329  * Get a random item for replication.  Returns a single, not expired,
1330  * random item from those with the highest replication counters.  The
1331  * item's replication counter is decremented by one IF it was positive
1332  * before.  Call 'proc' with all values ZERO or NULL if the datastore
1333  * is empty.
1334  *
1335  * @param cls closure
1336  * @param proc function to call the value (once only).
1337  * @param proc_cls closure for proc
1338  */
1339 static void
1340 mysql_plugin_get_replication (void *cls,
1341                               PluginDatumProcessor proc, void *proc_cls)
1342 {
1343   struct Plugin *plugin = cls;
1344   struct ReplCtx rc;
1345   
1346   rc.plugin = plugin;
1347   rc.proc = proc;
1348   rc.proc_cls = proc_cls;
1349   execute_select (plugin,
1350                   plugin->select_replication, 
1351                   &repl_proc, &rc,
1352                   -1);
1353
1354 }
1355
1356
1357 /**
1358  * Get a random item for expiration.
1359  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1360  *
1361  * @param cls closure
1362  * @param proc function to call the value (once only).
1363  * @param proc_cls closure for proc
1364  */
1365 static void
1366 mysql_plugin_get_expiration (void *cls,
1367                              PluginDatumProcessor proc, void *proc_cls)
1368 {
1369   struct Plugin *plugin = cls;
1370   long long nt;
1371
1372   nt = (long long) GNUNET_TIME_absolute_get().abs_value;
1373   execute_select (plugin,
1374                   plugin->select_expiration, 
1375                   proc, proc_cls,
1376                   MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
1377                   -1);
1378
1379 }
1380
1381
1382 /**
1383  * Drop database.
1384  *
1385  * @param cls the "struct Plugin*"
1386  */
1387 static void 
1388 mysql_plugin_drop (void *cls)
1389 {
1390   struct Plugin *plugin = cls;
1391
1392   if (GNUNET_OK != run_statement (plugin,
1393                                   "DROP TABLE gn090"))
1394     return;           /* error */
1395   plugin->env->duc (plugin->env->cls, 0);
1396 }
1397
1398
1399 /**
1400  * Entry point for the plugin.
1401  *
1402  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1403  * @return our "struct Plugin*"
1404  */
1405 void *
1406 libgnunet_plugin_datastore_mysql_init (void *cls)
1407 {
1408   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1409   struct GNUNET_DATASTORE_PluginFunctions *api;
1410   struct Plugin *plugin;
1411
1412   plugin = GNUNET_malloc (sizeof (struct Plugin));
1413   plugin->env = env;
1414   plugin->cnffile = get_my_cnf_path (env->cfg);
1415   if (GNUNET_OK != iopen (plugin))
1416     {
1417       iclose (plugin);
1418       GNUNET_free_non_null (plugin->cnffile);
1419       GNUNET_free (plugin);
1420       return NULL;
1421     }
1422 #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
1423 #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
1424   if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
1425              " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1426              " type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1427              " prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1428              " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
1429              " expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
1430              " rvalue BIGINT UNSIGNED NOT NULL,"
1431              " hash BINARY(64) NOT NULL DEFAULT '',"
1432              " vhash BINARY(64) NOT NULL DEFAULT '',"
1433              " value BLOB NOT NULL DEFAULT '',"
1434              " uid BIGINT NOT NULL AUTO_INCREMENT,"
1435              " PRIMARY KEY (uid),"
1436              " INDEX idx_hash (hash(64)),"
1437              " INDEX idx_hash_uid (hash(64),uid),"
1438              " INDEX idx_hash_vhash (hash(64),vhash(64)),"
1439              " INDEX idx_hash_type_uid (hash(64),type,uid),"
1440              " INDEX idx_prio (prio),"
1441              " INDEX idx_repl_rvalue (repl,rvalue),"
1442              " INDEX idx_expire_prio (expire,prio),"
1443              " INDEX idx_anonLevel_uid (anonLevel,uid)"
1444              ") ENGINE=InnoDB") ||
1445       MRUNS ("SET AUTOCOMMIT = 1") ||
1446       PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1447       PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1448       PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1449       PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH)
1450       || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE)
1451       || PINIT (plugin->select_entry_by_hash_vhash_and_type,
1452                 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1453       || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
1454       || PINIT (plugin->get_size, SELECT_SIZE)
1455       || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
1456       || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1457       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1458                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1459       || PINIT (plugin->update_entry, UPDATE_ENTRY)
1460       || PINIT (plugin->dec_repl, DEC_REPL)
1461       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
1462       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
1463       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
1464     {
1465       iclose (plugin);
1466       GNUNET_free_non_null (plugin->cnffile);
1467       GNUNET_free (plugin);
1468       return NULL;
1469     }
1470 #undef PINIT
1471 #undef MRUNS
1472
1473   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1474   api->cls = plugin;
1475   api->estimate_size = &mysql_plugin_estimate_size;
1476   api->put = &mysql_plugin_put;
1477   api->update = &mysql_plugin_update;
1478   api->get_key = &mysql_plugin_get_key;
1479   api->get_replication = &mysql_plugin_get_replication;
1480   api->get_expiration = &mysql_plugin_get_expiration;
1481   api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1482   api->drop = &mysql_plugin_drop;
1483   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1484                    "mysql", _("Mysql database running\n"));
1485   return api;
1486 }
1487
1488
1489 /**
1490  * Exit point from the plugin.
1491  * @param cls our "struct Plugin*"
1492  * @return always NULL
1493  */
1494 void *
1495 libgnunet_plugin_datastore_mysql_done (void *cls)
1496 {
1497   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1498   struct Plugin *plugin = api->cls;
1499   struct GNUNET_MysqlStatementHandle *s;
1500
1501   iclose (plugin);
1502   while (NULL != (s = plugin->shead))
1503     {
1504       GNUNET_CONTAINER_DLL_remove (plugin->shead,
1505                                    plugin->stail,
1506                                    s);
1507       GNUNET_free (s->query);
1508       GNUNET_free (s);
1509     }
1510   GNUNET_free_non_null (plugin->cnffile);
1511   GNUNET_free (plugin);
1512   GNUNET_free (api);
1513   mysql_library_end ();
1514   return NULL;
1515 }
1516
1517 /* end of plugin_datastore_mysql.c */