fixing datastore schema for future change for improved performance
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 /**
32  * Enable or disable logging debug messages.
33  */
34 #define DEBUG_SQLITE GNUNET_NO
35
36 /**
37  * We allocate items on the stack at times.  To prevent a stack
38  * overflow, we impose a limit on the maximum size for the data per
39  * item.  64k should be enough.
40  */
41 #define MAX_ITEM_SIZE 65536
42
43 /**
44  * After how many ms "busy" should a DB operation fail for good?
45  * A low value makes sure that we are more responsive to requests
46  * (especially PUTs).  A high value guarantees a higher success
47  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
48  *
49  * The default value of 250ms should ensure that users do not experience
50  * huge latencies while at the same time allowing operations to succeed
51  * with reasonable probability.
52  */
53 #define BUSY_TIMEOUT_MS 250
54
55
56 /**
57  * Log an error message at log-level 'level' that indicates
58  * a failure of the command 'cmd' on file 'filename'
59  * with the message given by strerror(errno).
60  */
61 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
62
63
64
65 /**
66  * Context for all functions in this plugin.
67  */
68 struct Plugin 
69 {
70   /**
71    * Our execution environment.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment *env;
74
75   /**
76    * Database filename.
77    */
78   char *fn;
79
80   /**
81    * Native SQLite database handle.
82    */
83   sqlite3 *dbh;
84
85   /**
86    * Precompiled SQL for deletion.
87    */
88   sqlite3_stmt *delRow;
89
90   /**
91    * Precompiled SQL for update.
92    */
93   sqlite3_stmt *updPrio;
94
95   /**
96    * Precompiled SQL for replication decrement.
97    */
98   sqlite3_stmt *updRepl;
99
100   /**
101    * Precompiled SQL for replication selection.
102    */
103   sqlite3_stmt *selRepl;
104
105   /**
106    * Precompiled SQL for expiration selection.
107    */
108   sqlite3_stmt *selExpi;
109
110   /**
111    * Precompiled SQL for expiration selection.
112    */
113   sqlite3_stmt *selZeroAnon;
114
115   /**
116    * Precompiled SQL for insertion.
117    */
118   sqlite3_stmt *insertContent;
119
120   /**
121    * Should the database be dropped on shutdown?
122    */
123   int drop_on_shutdown;
124
125 };
126
127
128 /**
129  * @brief Prepare a SQL statement
130  *
131  * @param dbh handle to the database
132  * @param zSql SQL statement, UTF-8 encoded
133  * @param ppStmt set to the prepared statement
134  * @return 0 on success
135  */
136 static int
137 sq_prepare (sqlite3 * dbh, 
138             const char *zSql,
139             sqlite3_stmt ** ppStmt)
140 {
141   char *dummy;
142   int result;
143
144   result = sqlite3_prepare_v2 (dbh,
145                                zSql,
146                                strlen (zSql), 
147                                ppStmt,
148                                (const char **) &dummy);
149 #if DEBUG_SQLITE && 0
150   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
151                    "sqlite",
152                    "Prepared `%s' / %p: %d\n",
153                    zSql,
154                    *ppStmt, 
155                    result);
156 #endif
157   return result;
158 }
159
160
161 /**
162  * Create our database indices.
163  * 
164  * @param dbh handle to the database
165  */
166 static void
167 create_indices (sqlite3 * dbh)
168 {
169   /* create indices */
170   sqlite3_exec (dbh,
171                 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
172   sqlite3_exec (dbh,
173                 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
174                 NULL, NULL);
175   sqlite3_exec (dbh, "CREATE INDEX idx_expire_repl ON gn090 (expire ASC,repl DESC)", NULL, NULL,
176                 NULL);
177   sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (anonLevel ASC,expire ASC,prio,type,hash)",
178                 NULL, NULL, NULL);
179   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)",
180                 NULL, NULL, NULL);
181   sqlite3_exec (dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)",
182                 NULL, NULL, NULL);
183 }
184
185
186 #if 0
187 #define CHECK(a) GNUNET_break(a)
188 #define ENULL NULL
189 #else
190 #define ENULL &e
191 #define ENULL_DEFINED 1
192 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
193 #endif
194
195
196 /**
197  * Initialize the database connections and associated
198  * data structures (create tables and indices
199  * as needed as well).
200  *
201  * @param cfg our configuration
202  * @param plugin the plugin context (state for this module)
203  * @return GNUNET_OK on success
204  */
205 static int
206 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
207                 struct Plugin *plugin)
208 {
209   sqlite3_stmt *stmt;
210   char *afsdir;
211 #if ENULL_DEFINED
212   char *e;
213 #endif
214   
215   if (GNUNET_OK != 
216       GNUNET_CONFIGURATION_get_value_filename (cfg,
217                                                "datastore-sqlite",
218                                                "FILENAME",
219                                                &afsdir))
220     {
221       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
222                        "sqlite",
223                        _("Option `%s' in section `%s' missing in configuration!\n"),
224                        "FILENAME",
225                        "datastore-sqlite");
226       return GNUNET_SYSERR;
227     }
228   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
229     {
230       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
231         {
232           GNUNET_break (0);
233           GNUNET_free (afsdir);
234           return GNUNET_SYSERR;
235         }
236       /* database is new or got deleted, reset payload to zero! */
237       plugin->env->duc (plugin->env->cls, 0);
238     }
239 #ifdef ENABLE_NLS
240   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
241                                        nl_langinfo (CODESET));
242 #else
243   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
244                                        "UTF-8");   /* good luck */
245 #endif
246   GNUNET_free (afsdir);
247   
248   /* Open database and precompile statements */
249   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
250     {
251       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
252                        "sqlite",
253                        _("Unable to initialize SQLite: %s.\n"),
254                        sqlite3_errmsg (plugin->dbh));
255       return GNUNET_SYSERR;
256     }
257   CHECK (SQLITE_OK ==
258          sqlite3_exec (plugin->dbh,
259                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
260   CHECK (SQLITE_OK ==
261          sqlite3_exec (plugin->dbh,
262                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
263   CHECK (SQLITE_OK ==
264          sqlite3_exec (plugin->dbh,
265                        "PRAGMA legacy_file_format=OFF", NULL, NULL, ENULL));
266   CHECK (SQLITE_OK ==
267          sqlite3_exec (plugin->dbh,
268                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
269   CHECK (SQLITE_OK ==
270          sqlite3_exec (plugin->dbh,
271                        "PRAGMA locking_mode=EXCLUSIVE", NULL, NULL, ENULL));
272   CHECK (SQLITE_OK ==
273          sqlite3_exec (plugin->dbh,
274                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
275   CHECK (SQLITE_OK ==
276          sqlite3_exec (plugin->dbh, 
277                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
278
279   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
280
281
282   /* We have to do it here, because otherwise precompiling SQL might fail */
283   CHECK (SQLITE_OK ==
284          sq_prepare (plugin->dbh,
285                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
286                      &stmt));
287   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
288        (sqlite3_exec (plugin->dbh,
289                       "CREATE TABLE gn090 ("
290                       "  repl INT4 NOT NULL DEFAULT 0,"
291                       "  type INT4 NOT NULL DEFAULT 0,"
292                       "  prio INT4 NOT NULL DEFAULT 0,"
293                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
294                       "  expire INT8 NOT NULL DEFAULT 0,"
295                       "  rvalue INT8 NOT NULL,"
296                       "  hash TEXT NOT NULL DEFAULT '',"
297                       "  vhash TEXT NOT NULL DEFAULT '',"
298                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
299                       NULL) != SQLITE_OK) )
300     {
301       LOG_SQLITE (plugin, NULL,
302                   GNUNET_ERROR_TYPE_ERROR, 
303                   "sqlite3_exec");
304       sqlite3_finalize (stmt);
305       return GNUNET_SYSERR;
306     }
307   sqlite3_finalize (stmt);
308   create_indices (plugin->dbh);
309
310   if ((sq_prepare (plugin->dbh,
311                    "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE _ROWID_ = ?",
312                    &plugin->updPrio) != SQLITE_OK) ||
313       (sq_prepare (plugin->dbh,
314                    "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?",
315                    &plugin->updRepl) != SQLITE_OK) ||
316       (sq_prepare (plugin->dbh,
317                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090"
318                    " ORDER BY repl DESC, Random() LIMIT 1",
319                    &plugin->selRepl) != SQLITE_OK) ||
320       (sq_prepare (plugin->dbh,
321                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
322                    " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 "
323                    " ORDER BY prio ASC LIMIT 1",
324                    &plugin->selExpi) != SQLITE_OK) ||
325       (sq_prepare (plugin->dbh, 
326                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
327                    "WHERE (anonLevel = 0 AND type=?1) "
328                    "ORDER BY hash DESC LIMIT 1 OFFSET ?2",
329                    &plugin->selZeroAnon) != SQLITE_OK) ||
330       (sq_prepare (plugin->dbh,
331                    "INSERT INTO gn090 (repl, type, prio, "
332                    "anonLevel, expire, rvalue, hash, vhash, value) "
333                    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
334                    &plugin->insertContent) != SQLITE_OK) ||
335       (sq_prepare (plugin->dbh,
336                    "DELETE FROM gn090 WHERE _ROWID_ = ?",
337                    &plugin->delRow) != SQLITE_OK))
338     {
339       LOG_SQLITE (plugin, NULL,
340                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
341       return GNUNET_SYSERR;
342     }
343
344   return GNUNET_OK;
345 }
346
347
348 /**
349  * Shutdown database connection and associate data
350  * structures.
351  * @param plugin the plugin context (state for this module)
352  */
353 static void
354 database_shutdown (struct Plugin *plugin)
355 {
356   int result;
357 #if SQLITE_VERSION_NUMBER >= 3007000
358   sqlite3_stmt *stmt;
359 #endif
360
361   if (plugin->delRow != NULL)
362     sqlite3_finalize (plugin->delRow);
363   if (plugin->updPrio != NULL)
364     sqlite3_finalize (plugin->updPrio);
365   if (plugin->updRepl != NULL)
366     sqlite3_finalize (plugin->updRepl);
367   if (plugin->selRepl != NULL)
368     sqlite3_finalize (plugin->selRepl);
369   if (plugin->selExpi != NULL)
370     sqlite3_finalize (plugin->selExpi);
371   if (plugin->selZeroAnon != NULL)
372     sqlite3_finalize (plugin->selZeroAnon);
373   if (plugin->insertContent != NULL)
374     sqlite3_finalize (plugin->insertContent);
375   result = sqlite3_close(plugin->dbh);
376 #if SQLITE_VERSION_NUMBER >= 3007000
377   if (result == SQLITE_BUSY)
378     {
379       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
380                        "sqlite",
381                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
382       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
383       while (stmt != NULL)
384         {
385 #if DEBUG_SQLITE
386           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
387                      "sqlite", "Closing statement %p\n", stmt);
388 #endif
389           result = sqlite3_finalize(stmt);
390 #if DEBUG_SQLITE
391           if (result != SQLITE_OK)
392               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
393                                "sqlite",
394                                "Failed to close statement %p: %d\n", stmt, result);
395 #endif
396           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
397         }
398       result = sqlite3_close(plugin->dbh);
399     }
400 #endif
401   if (SQLITE_OK != result)
402       LOG_SQLITE (plugin, NULL,
403                   GNUNET_ERROR_TYPE_ERROR, 
404                   "sqlite3_close");
405
406   GNUNET_free_non_null (plugin->fn);
407 }
408
409
410 /**
411  * Delete the database entry with the given
412  * row identifier.
413  *
414  * @param plugin the plugin context (state for this module)
415  * @param rid the ID of the row to delete
416  */
417 static int
418 delete_by_rowid (struct Plugin* plugin, 
419                  unsigned long long rid)
420 {
421   sqlite3_bind_int64 (plugin->delRow, 1, rid);
422   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
423     {
424       LOG_SQLITE (plugin, NULL,
425                   GNUNET_ERROR_TYPE_ERROR |
426                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
427       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
428           LOG_SQLITE (plugin, NULL,
429                       GNUNET_ERROR_TYPE_ERROR |
430                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
431       return GNUNET_SYSERR;
432     }
433   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
434     LOG_SQLITE (plugin, NULL,
435                 GNUNET_ERROR_TYPE_ERROR |
436                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
437   return GNUNET_OK;
438 }
439
440
441 /**
442  * Store an item in the datastore.
443  *
444  * @param cls closure
445  * @param key key for the item
446  * @param size number of bytes in data
447  * @param data content stored
448  * @param type type of the content
449  * @param priority priority of the content
450  * @param anonymity anonymity-level for the content
451  * @param replication replication-level for the content
452  * @param expiration expiration time for the content
453  * @param msg set to an error message
454  * @return GNUNET_OK on success
455  */
456 static int
457 sqlite_plugin_put (void *cls,
458                    const GNUNET_HashCode *key,
459                    uint32_t size,
460                    const void *data,
461                    enum GNUNET_BLOCK_Type type,
462                    uint32_t priority,
463                    uint32_t anonymity,
464                    uint32_t replication,
465                    struct GNUNET_TIME_Absolute expiration,
466                    char ** msg)
467 {
468   struct Plugin *plugin = cls;
469   int n;
470   int ret;
471   sqlite3_stmt *stmt;
472   GNUNET_HashCode vhash;
473   uint64_t rvalue;
474
475   if (size > MAX_ITEM_SIZE)
476     return GNUNET_SYSERR;
477 #if DEBUG_SQLITE
478   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
479                    "sqlite",
480                    "Storing in database block with type %u/key `%s'/priority %u/expiration in %llu ms (%lld).\n",
481                    type, 
482                    GNUNET_h2s(key),
483                    priority,
484                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
485                    (long long) expiration.abs_value);
486 #endif
487   GNUNET_CRYPTO_hash (data, size, &vhash);
488   stmt = plugin->insertContent;
489   rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
490   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
491       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
492       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
493       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
494       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
495       (SQLITE_OK != sqlite3_bind_int64 (stmt, 6, rvalue)) ||
496       (SQLITE_OK !=
497        sqlite3_bind_blob (stmt, 7, key, sizeof (GNUNET_HashCode),
498                           SQLITE_TRANSIENT)) ||
499       (SQLITE_OK !=
500        sqlite3_bind_blob (stmt, 8, &vhash, sizeof (GNUNET_HashCode),
501                           SQLITE_TRANSIENT))
502       || (SQLITE_OK !=
503           sqlite3_bind_blob (stmt, 9, data, size,
504                              SQLITE_TRANSIENT)))
505     {
506       LOG_SQLITE (plugin,
507                   msg,
508                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
509       if (SQLITE_OK != sqlite3_reset (stmt))
510         LOG_SQLITE (plugin, NULL,
511                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
512       return GNUNET_SYSERR;
513     }
514   n = sqlite3_step (stmt);
515   switch (n)
516     {
517     case SQLITE_DONE:
518       plugin->env->duc (plugin->env->cls,
519                         size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
520 #if DEBUG_SQLITE
521       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
522                        "sqlite",
523                        "Stored new entry (%u bytes)\n",
524                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
525 #endif
526       ret = GNUNET_OK;
527       break;
528     case SQLITE_BUSY:      
529       GNUNET_break (0);
530       LOG_SQLITE (plugin, msg,
531                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
532                   "sqlite3_step");
533       ret = GNUNET_SYSERR;
534       break;
535     default:
536       LOG_SQLITE (plugin, msg,
537                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
538                   "sqlite3_step");
539       if (SQLITE_OK != sqlite3_reset (stmt))
540         LOG_SQLITE (plugin, NULL,
541                     GNUNET_ERROR_TYPE_ERROR |
542                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
543       database_shutdown (plugin);
544       database_setup (plugin->env->cfg,
545                       plugin);
546       return GNUNET_SYSERR;    
547     }
548   if (SQLITE_OK != sqlite3_reset (stmt))
549     LOG_SQLITE (plugin, NULL,
550                 GNUNET_ERROR_TYPE_ERROR |
551                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
552   return ret;
553 }
554
555
556 /**
557  * Update the priority for a particular key in the datastore.  If
558  * the expiration time in value is different than the time found in
559  * the datastore, the higher value should be kept.  For the
560  * anonymity level, the lower value is to be used.  The specified
561  * priority should be added to the existing priority, ignoring the
562  * priority in value.
563  *
564  * Note that it is possible for multiple values to match this put.
565  * In that case, all of the respective values are updated.
566  *
567  * @param cls the plugin context (state for this module)
568  * @param uid unique identifier of the datum
569  * @param delta by how much should the priority
570  *     change?  If priority + delta < 0 the
571  *     priority should be set to 0 (never go
572  *     negative).
573  * @param expire new expiration time should be the
574  *     MAX of any existing expiration time and
575  *     this value
576  * @param msg set to an error message
577  * @return GNUNET_OK on success
578  */
579 static int
580 sqlite_plugin_update (void *cls,
581                       uint64_t uid,
582                       int delta, struct GNUNET_TIME_Absolute expire,
583                       char **msg)
584 {
585   struct Plugin *plugin = cls;
586   int n;
587
588   sqlite3_bind_int (plugin->updPrio, 1, delta);
589   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
590   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
591   n = sqlite3_step (plugin->updPrio);
592   sqlite3_reset (plugin->updPrio);
593   switch (n)
594     {
595     case SQLITE_DONE:
596 #if DEBUG_SQLITE
597       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
598                        "sqlite",
599                        "Block updated\n");
600 #endif
601       return GNUNET_OK;
602     case SQLITE_BUSY:
603       LOG_SQLITE (plugin, msg,
604                   GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
605                   "sqlite3_step");
606       return GNUNET_NO;
607     default:
608       LOG_SQLITE (plugin, msg,
609                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
610                   "sqlite3_step");
611       return GNUNET_SYSERR;
612     }
613 }
614
615
616 /**
617  * Execute statement that gets a row and call the callback
618  * with the result.  Resets the statement afterwards.
619  *
620  * @param plugin the plugin
621  * @param stmt the statement
622  * @param proc processor to call
623  * @param proc_cls closure for 'proc'
624  */
625 static void
626 execute_get (struct Plugin *plugin,
627              sqlite3_stmt *stmt,
628              PluginDatumProcessor proc, void *proc_cls)
629 {
630   int n;
631   struct GNUNET_TIME_Absolute expiration;
632   unsigned long long rowid;
633   unsigned int size;
634   int ret;
635
636   n = sqlite3_step (stmt);
637   switch (n)
638     {
639     case SQLITE_ROW:
640       size = sqlite3_column_bytes (stmt, 5);
641       rowid = sqlite3_column_int64 (stmt, 6);
642       if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
643         {
644           GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
645                            "sqlite",
646                            _("Invalid data in database.  Trying to fix (by deletion).\n"));
647           if (SQLITE_OK != sqlite3_reset (stmt))
648             LOG_SQLITE (plugin, NULL,
649                         GNUNET_ERROR_TYPE_ERROR |
650                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
651           if (GNUNET_OK == delete_by_rowid (plugin, rowid))
652             plugin->env->duc (plugin->env->cls,
653                               - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));        
654           break;
655         }
656       expiration.abs_value = sqlite3_column_int64 (stmt, 3);
657 #if DEBUG_SQLITE
658       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 
659                        "sqlite",
660                        "Found reply in database with expiration %llu\n",
661                        (unsigned long long) expiration.abs_value);
662 #endif
663       ret = proc (proc_cls,
664                   sqlite3_column_blob (stmt, 4) /* key */,
665                   size,
666                   sqlite3_column_blob (stmt, 5) /* data */, 
667                   sqlite3_column_int (stmt, 0) /* type */,
668                   sqlite3_column_int (stmt, 1) /* priority */,
669                   sqlite3_column_int (stmt, 2) /* anonymity */,
670                   expiration,
671                   rowid);
672       if (SQLITE_OK != sqlite3_reset (stmt))
673         LOG_SQLITE (plugin, NULL,
674                     GNUNET_ERROR_TYPE_ERROR |
675                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
676       if ( (GNUNET_NO == ret) &&
677            (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
678         plugin->env->duc (plugin->env->cls,
679                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
680       return;
681     case SQLITE_DONE:
682       /* database must be empty */
683       if (SQLITE_OK != sqlite3_reset (stmt))
684         LOG_SQLITE (plugin, NULL,
685                     GNUNET_ERROR_TYPE_ERROR |
686                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
687       break;
688     case SQLITE_BUSY:    
689     case SQLITE_ERROR:
690     case SQLITE_MISUSE:
691     default:
692       LOG_SQLITE (plugin, NULL,
693                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
694                   "sqlite3_step");
695       if (SQLITE_OK != sqlite3_reset (stmt))
696         LOG_SQLITE (plugin, NULL,
697                     GNUNET_ERROR_TYPE_ERROR |
698                     GNUNET_ERROR_TYPE_BULK,
699                     "sqlite3_reset");
700       GNUNET_break (0);
701       database_shutdown (plugin);
702       database_setup (plugin->env->cfg,
703                       plugin);
704       break;
705     }
706   if (SQLITE_OK != sqlite3_reset (stmt))
707     LOG_SQLITE (plugin, NULL,
708                 GNUNET_ERROR_TYPE_ERROR |
709                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
710   proc (proc_cls, NULL, 0, NULL, 0, 0, 0,           
711         GNUNET_TIME_UNIT_ZERO_ABS, 0);
712 }
713
714
715
716 /**
717  * Select a subset of the items in the datastore and call
718  * the given processor for the item.
719  *
720  * @param cls our plugin context
721  * @param type entries of which type should be considered?
722  *        Use 0 for any type.
723  * @param proc function to call on each matching value;
724  *        will be called once with a NULL value at the end
725  * @param proc_cls closure for proc
726  */
727 static void
728 sqlite_plugin_get_zero_anonymity (void *cls,
729                                   uint64_t offset,
730                                   enum GNUNET_BLOCK_Type type,
731                                   PluginDatumProcessor proc,
732                                   void *proc_cls)
733 {
734   struct Plugin *plugin = cls;
735   sqlite3_stmt *stmt;
736
737   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
738   stmt = plugin->selZeroAnon;
739   if ( (SQLITE_OK != sqlite3_bind_int (stmt, 1, type)) ||
740        (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, offset)) )
741     {
742       LOG_SQLITE (plugin, NULL,
743                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
744                   "sqlite3_bind_XXXX");
745       if (SQLITE_OK != sqlite3_reset (stmt))
746         LOG_SQLITE (plugin, NULL,
747                     GNUNET_ERROR_TYPE_ERROR | 
748                     GNUNET_ERROR_TYPE_BULK, 
749                     "sqlite3_reset");
750       proc (proc_cls, NULL, 0, NULL, 0, 0, 0,       
751             GNUNET_TIME_UNIT_ZERO_ABS, 0);
752       return;
753     }
754   execute_get (plugin, stmt, proc, proc_cls);
755 }
756
757
758
759 /**
760  * Get results for a particular key in the datastore.
761  *
762  * @param cls closure
763  * @param offset offset (mod count).
764  * @param key key to match, never NULL
765  * @param vhash hash of the value, maybe NULL (to
766  *        match all values that have the right key).
767  *        Note that for DBlocks there is no difference
768  *        betwen key and vhash, but for other blocks
769  *        there may be!
770  * @param type entries of which type are relevant?
771  *     Use 0 for any type.
772  * @param proc function to call on each matching value;
773  *        will be called once with a NULL value at the end
774  * @param proc_cls closure for proc
775  */
776 static void
777 sqlite_plugin_get_key (void *cls,
778                        uint64_t offset,
779                        const GNUNET_HashCode *key,
780                        const GNUNET_HashCode *vhash,
781                        enum GNUNET_BLOCK_Type type,
782                        PluginDatumProcessor proc, void *proc_cls)
783 {
784   struct Plugin *plugin = cls;
785   int ret;
786   int total;
787   int limit_off;
788   unsigned int sqoff;
789   sqlite3_stmt *stmt;
790   char scratch[256];
791
792   GNUNET_assert (proc != NULL);
793   GNUNET_assert (key != NULL);
794   GNUNET_snprintf (scratch, sizeof (scratch),
795                    "SELECT count(*) FROM gn090 WHERE hash=?%s%s",
796                    vhash == NULL ? "" : " AND vhash=?",
797                    type  == 0    ? "" : " AND type=?");
798   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
799     {
800       LOG_SQLITE (plugin, NULL,
801                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
802       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
803       return;
804     }
805   sqoff = 1;
806   ret = sqlite3_bind_blob (stmt, sqoff++,
807                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
808   if ((vhash != NULL) && (ret == SQLITE_OK))
809     ret = sqlite3_bind_blob (stmt, sqoff++,
810                              vhash,
811                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
812   if ((type != 0) && (ret == SQLITE_OK))
813     ret = sqlite3_bind_int (stmt, sqoff++, type);
814   if (SQLITE_OK != ret)
815     {
816       LOG_SQLITE (plugin, NULL,
817                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
818       sqlite3_finalize (stmt);
819       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
820       return;
821     }
822   ret = sqlite3_step (stmt);
823   if (ret != SQLITE_ROW)
824     {
825       LOG_SQLITE (plugin, NULL,
826                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
827                   "sqlite_step");
828       sqlite3_finalize (stmt);
829       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
830       return;
831     }
832   total = sqlite3_column_int (stmt, 0);
833   sqlite3_finalize (stmt);
834   if (0 == total)
835     {
836       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
837       return;
838     }
839   limit_off = (int) (offset % total);
840   if (limit_off < 0)
841     limit_off += total;
842   GNUNET_snprintf (scratch, sizeof (scratch),
843                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
844                    "FROM gn090 WHERE hash=?%s%s "
845                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
846                    vhash == NULL ? "" : " AND vhash=?",
847                    type == 0 ? "" : " AND type=?");
848   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
849     {
850       LOG_SQLITE (plugin, NULL,
851                   GNUNET_ERROR_TYPE_ERROR |
852                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
853       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
854       return;
855     }
856   sqoff = 1;
857   ret = sqlite3_bind_blob (stmt,
858                            sqoff++,
859                            key, 
860                            sizeof (GNUNET_HashCode),
861                            SQLITE_TRANSIENT);
862   if ((vhash != NULL) && (ret == SQLITE_OK))
863     ret = sqlite3_bind_blob (stmt,
864                              sqoff++,
865                              vhash,
866                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
867   if ((type != 0) && (ret == SQLITE_OK))
868     ret = sqlite3_bind_int (stmt, sqoff++, type);
869   if (ret == SQLITE_OK)
870     ret = sqlite3_bind_int64 (stmt, sqoff++, limit_off);
871   if (ret != SQLITE_OK)
872     {
873       LOG_SQLITE (plugin, NULL,
874                   GNUNET_ERROR_TYPE_ERROR |
875                   GNUNET_ERROR_TYPE_BULK, "sqlite_bind");
876       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
877       return;
878     }
879   execute_get (plugin, stmt, proc, proc_cls);
880   sqlite3_finalize (stmt);
881 }
882
883
884
885 /**
886  * Context for 'repl_proc' function.
887  */
888 struct ReplCtx
889 {
890   
891   /**
892    * Function to call for the result (or the NULL).
893    */
894   PluginDatumProcessor proc;
895   
896   /**
897    * Closure for proc.
898    */
899   void *proc_cls;
900
901   /**
902    * UID to use.
903    */
904   uint64_t uid;
905
906   /**
907    * Yes if UID was set.
908    */
909   int have_uid;
910 };
911
912
913 /**
914  * Wrapper for the processor for 'sqlite_plugin_replication_get'.
915  * Decrements the replication counter and calls the original
916  * processor.
917  *
918  * @param cls closure
919  * @param key key for the content
920  * @param size number of bytes in data
921  * @param data content stored
922  * @param type type of the content
923  * @param priority priority of the content
924  * @param anonymity anonymity-level for the content
925  * @param expiration expiration time for the content
926  * @param uid unique identifier for the datum;
927  *        maybe 0 if no unique identifier is available
928  *
929  * @return GNUNET_OK for normal return,
930  *         GNUNET_NO to delete the item
931  */
932 static int
933 repl_proc (void *cls,
934            const GNUNET_HashCode *key,
935            uint32_t size,
936            const void *data,
937            enum GNUNET_BLOCK_Type type,
938            uint32_t priority,
939            uint32_t anonymity,
940            struct GNUNET_TIME_Absolute expiration, 
941            uint64_t uid)
942 {
943   struct ReplCtx *rc = cls;
944   int ret;
945
946   ret = rc->proc (rc->proc_cls,
947                   key,
948                   size, data, 
949                   type, priority, anonymity, expiration,
950                   uid);
951   if (key != NULL)
952     {
953       rc->uid = uid;
954       rc->have_uid = GNUNET_YES;
955     }
956   return ret;
957 }
958
959
960 /**
961  * Get a random item for replication.  Returns a single random item
962  * from those with the highest replication counters.  The item's 
963  * replication counter is decremented by one IF it was positive before.
964  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
965  *
966  * @param cls closure
967  * @param proc function to call the value (once only).
968  * @param proc_cls closure for proc
969  */
970 static void
971 sqlite_plugin_get_replication (void *cls,
972                                PluginDatumProcessor proc, void *proc_cls)
973 {
974   struct Plugin *plugin = cls;
975   struct ReplCtx rc;
976
977 #if DEBUG_SQLITE
978   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
979                    "sqlite",
980                    "Getting random block based on replication order.\n");
981 #endif
982   rc.have_uid = GNUNET_NO;
983   rc.proc = proc;
984   rc.proc_cls = proc_cls;
985   execute_get (plugin, plugin->selRepl, &repl_proc, &rc); 
986   if (GNUNET_YES == rc.have_uid)
987     {
988       sqlite3_bind_int64 (plugin->updRepl, 1, rc.uid);
989       if (SQLITE_DONE != sqlite3_step (plugin->updRepl))        
990         LOG_SQLITE (plugin, NULL,
991                     GNUNET_ERROR_TYPE_ERROR |
992                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
993       if (SQLITE_OK != sqlite3_reset (plugin->updRepl))
994         LOG_SQLITE (plugin, NULL,
995                     GNUNET_ERROR_TYPE_ERROR |
996                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
997     }
998 }
999
1000
1001
1002 /**
1003  * Get a random item that has expired or has low priority.
1004  * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1005  *
1006  * @param cls closure
1007  * @param proc function to call the value (once only).
1008  * @param proc_cls closure for proc
1009  */
1010 static void
1011 sqlite_plugin_get_expiration (void *cls,
1012                               PluginDatumProcessor proc, void *proc_cls)
1013 {
1014   struct Plugin *plugin = cls;
1015   sqlite3_stmt *stmt;
1016   struct GNUNET_TIME_Absolute now;
1017
1018 #if DEBUG_SQLITE
1019   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1020                    "sqlite",
1021                    "Getting random block based on expiration and priority order.\n");
1022 #endif
1023   now = GNUNET_TIME_absolute_get ();
1024   stmt = plugin->selExpi;
1025   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1026     {
1027       LOG_SQLITE (plugin, NULL,           
1028                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1029       if (SQLITE_OK != sqlite3_reset (stmt))
1030         LOG_SQLITE (plugin, NULL,
1031                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1032       proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 
1033             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1034       return;
1035     }
1036   execute_get (plugin, stmt, proc, proc_cls);
1037 }
1038
1039
1040 /**
1041  * Drop database.
1042  *
1043  * @param cls our plugin context
1044  */
1045 static void 
1046 sqlite_plugin_drop (void *cls)
1047 {
1048   struct Plugin *plugin = cls;
1049   plugin->drop_on_shutdown = GNUNET_YES;
1050 }
1051
1052
1053 /**
1054  * Get an estimate of how much space the database is
1055  * currently using.
1056  *
1057  * @param cls the 'struct Plugin'
1058  * @return the size of the database on disk (estimate)
1059  */
1060 static unsigned long long
1061 sqlite_plugin_estimate_size (void *cls)
1062 {
1063   struct Plugin *plugin = cls;
1064   sqlite3_stmt *stmt;
1065   uint64_t pages;
1066   uint64_t page_size;
1067 #if ENULL_DEFINED
1068   char *e;
1069 #endif
1070
1071   if (SQLITE_VERSION_NUMBER < 3006000)
1072     {
1073       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1074                        "datastore-sqlite",
1075                        _("sqlite version to old to determine size, assuming zero\n"));
1076       return 0;
1077     }
1078   CHECK (SQLITE_OK ==
1079          sqlite3_exec (plugin->dbh,
1080                        "VACUUM", NULL, NULL, ENULL));
1081   CHECK (SQLITE_OK ==
1082          sqlite3_exec (plugin->dbh,
1083                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1084   CHECK (SQLITE_OK ==
1085          sq_prepare (plugin->dbh,
1086                      "PRAGMA page_count",
1087                      &stmt));
1088   if (SQLITE_ROW ==
1089       sqlite3_step (stmt))
1090     pages = sqlite3_column_int64 (stmt, 0);
1091   else
1092     pages = 0;
1093   sqlite3_finalize (stmt);
1094   CHECK (SQLITE_OK ==
1095          sq_prepare (plugin->dbh,
1096                      "PRAGMA page_size",
1097                      &stmt));
1098   CHECK (SQLITE_ROW ==
1099          sqlite3_step (stmt));
1100   page_size = sqlite3_column_int64 (stmt, 0);
1101   sqlite3_finalize (stmt);
1102   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1103               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1104               (unsigned long long) pages,
1105               (unsigned long long) page_size);
1106   return  pages * page_size;
1107 }
1108                                          
1109
1110 /**
1111  * Entry point for the plugin.
1112  *
1113  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1114  * @return NULL on error, othrewise the plugin context
1115  */
1116 void *
1117 libgnunet_plugin_datastore_sqlite_init (void *cls)
1118 {
1119   static struct Plugin plugin;
1120   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1121   struct GNUNET_DATASTORE_PluginFunctions *api;
1122
1123   if (plugin.env != NULL)
1124     return NULL; /* can only initialize once! */
1125   memset (&plugin, 0, sizeof(struct Plugin));
1126   plugin.env = env;
1127   if (GNUNET_OK !=
1128       database_setup (env->cfg, &plugin))
1129     {
1130       database_shutdown (&plugin);
1131       return NULL;
1132     }
1133   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1134   api->cls = &plugin;
1135   api->estimate_size = &sqlite_plugin_estimate_size;
1136   api->put = &sqlite_plugin_put;
1137   api->update = &sqlite_plugin_update;
1138   api->get_key = &sqlite_plugin_get_key;
1139   api->get_replication = &sqlite_plugin_get_replication;
1140   api->get_expiration = &sqlite_plugin_get_expiration;
1141   api->get_zero_anonymity = &sqlite_plugin_get_zero_anonymity;
1142   api->drop = &sqlite_plugin_drop;
1143   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1144                    "sqlite", _("Sqlite database running\n"));
1145   return api;
1146 }
1147
1148
1149 /**
1150  * Exit point from the plugin.
1151  *
1152  * @param cls the plugin context (as returned by "init")
1153  * @return always NULL
1154  */
1155 void *
1156 libgnunet_plugin_datastore_sqlite_done (void *cls)
1157 {
1158   char *fn;
1159   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1160   struct Plugin *plugin = api->cls;
1161
1162 #if DEBUG_SQLITE
1163   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1164                    "sqlite",
1165                    "sqlite plugin is done\n");
1166 #endif
1167
1168   fn = NULL;
1169   if (plugin->drop_on_shutdown)
1170     fn = GNUNET_strdup (plugin->fn);
1171 #if DEBUG_SQLITE
1172   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1173                    "sqlite",
1174                    "Shutting down database\n");
1175 #endif
1176   database_shutdown (plugin);
1177   plugin->env = NULL; 
1178   GNUNET_free (api);
1179   if (fn != NULL)
1180     {
1181       if (0 != UNLINK(fn))
1182         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1183                                   "unlink",
1184                                   fn);
1185       GNUNET_free (fn);
1186     }
1187 #if DEBUG_SQLITE
1188   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1189                    "sqlite",
1190                    "sqlite plugin is finished\n");
1191 #endif
1192   return NULL;
1193 }
1194
1195 /* end of plugin_datastore_sqlite.c */