2 This file is part of GNUnet
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/plugin_datastore_sqlite.c
23 * @brief sqlite-based datastore backend
24 * @author Christian Grothoff
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
32 #define DEBUG_SQLITE GNUNET_NO
35 * After how many payload-changing operations
36 * do we sync our statistics?
38 #define MAX_STAT_SYNC_LAG 50
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
43 * Log an error message at log-level 'level' that indicates
44 * a failure of the command 'cmd' on file 'filename'
45 * with the message given by strerror(errno).
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
49 #define SELECT_IT_LOW_PRIORITY_1 \
50 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51 "ORDER BY hash ASC LIMIT 1"
53 #define SELECT_IT_LOW_PRIORITY_2 \
54 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55 "ORDER BY prio ASC, hash ASC LIMIT 1"
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59 " ORDER BY hash DESC LIMIT 1"
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63 " ORDER BY prio DESC, hash DESC LIMIT 1"
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67 " ORDER BY hash ASC LIMIT 1"
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71 " ORDER BY expire ASC, hash ASC LIMIT 1"
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75 " ORDER BY hash DESC LIMIT 1"
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79 " ORDER BY expire DESC, hash DESC LIMIT 1"
82 * After how many ms "busy" should a DB operation fail for good?
83 * A low value makes sure that we are more responsive to requests
84 * (especially PUTs). A high value guarantees a higher success
85 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
87 * The default value of 250ms should ensure that users do not experience
88 * huge latencies while at the same time allowing operations to succeed
89 * with reasonable probability.
91 #define BUSY_TIMEOUT_MS 250
96 * Context for all functions in this plugin.
101 * Our execution environment.
103 struct GNUNET_DATASTORE_PluginEnvironment *env;
111 * Native SQLite database handle.
116 * Precompiled SQL for update.
118 sqlite3_stmt *updPrio;
121 * Precompiled SQL for insertion.
123 sqlite3_stmt *insertContent;
126 * Handle to the statistics service.
128 struct GNUNET_STATISTICS_Handle *statistics;
131 * Handle for pending get request.
133 struct GNUNET_STATISTICS_GetHandle *stat_get;
136 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
138 struct NextContext *next_task_nc;
141 * Pending task with scheduler for running the next request.
143 GNUNET_SCHEDULER_TaskIdentifier next_task;
146 * How much data are we currently storing
149 unsigned long long payload;
152 * Number of updates that were made to the
153 * payload value since we last synchronized
154 * it with the statistics service.
156 unsigned int lastSync;
159 * Should the database be dropped on shutdown?
161 int drop_on_shutdown;
166 * @brief Prepare a SQL statement
168 * @param dbh handle to the database
169 * @param zSql SQL statement, UTF-8 encoded
170 * @param ppStmt set to the prepared statement
171 * @return 0 on success
174 sq_prepare (sqlite3 * dbh, const char *zSql,
175 sqlite3_stmt ** ppStmt)
178 return sqlite3_prepare (dbh,
180 strlen (zSql), ppStmt, (const char **) &dummy);
185 * Create our database indices.
187 * @param dbh handle to the database
190 create_indices (sqlite3 * dbh)
194 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
196 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
198 sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
200 sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
202 sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
204 sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
206 sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
213 #define CHECK(a) GNUNET_break(a)
217 #define ENULL_DEFINED 1
218 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
225 * Initialize the database connections and associated
226 * data structures (create tables and indices
227 * as needed as well).
229 * @param cfg our configuration
230 * @param plugin the plugin context (state for this module)
231 * @return GNUNET_OK on success
234 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
235 struct Plugin *plugin)
244 GNUNET_CONFIGURATION_get_value_filename (cfg,
249 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
251 _("Option `%s' in section `%s' missing in configuration!\n"),
254 return GNUNET_SYSERR;
256 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
259 GNUNET_free (afsdir);
260 return GNUNET_SYSERR;
262 plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
264 nl_langinfo (CODESET)
266 "UTF-8" /* good luck */
269 GNUNET_free (afsdir);
271 /* Open database and precompile statements */
272 if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
274 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
276 _("Unable to initialize SQLite: %s.\n"),
277 sqlite3_errmsg (plugin->dbh));
278 return GNUNET_SYSERR;
281 sqlite3_exec (plugin->dbh,
282 "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
284 sqlite3_exec (plugin->dbh,
285 "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
287 sqlite3_exec (plugin->dbh,
288 "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
290 sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
292 CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
295 /* We have to do it here, because otherwise precompiling SQL might fail */
297 sq_prepare (plugin->dbh,
298 "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
300 if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
301 (sqlite3_exec (plugin->dbh,
302 "CREATE TABLE gn080 ("
303 " size INT4 NOT NULL DEFAULT 0,"
304 " type INT4 NOT NULL DEFAULT 0,"
305 " prio INT4 NOT NULL DEFAULT 0,"
306 " anonLevel INT4 NOT NULL DEFAULT 0,"
307 " expire INT8 NOT NULL DEFAULT 0,"
308 " hash TEXT NOT NULL DEFAULT '',"
309 " vhash TEXT NOT NULL DEFAULT '',"
310 " value BLOB NOT NULL DEFAULT '')", NULL, NULL,
311 NULL) != SQLITE_OK) )
313 LOG_SQLITE (plugin, NULL,
314 GNUNET_ERROR_TYPE_ERROR,
316 sqlite3_finalize (stmt);
317 return GNUNET_SYSERR;
319 sqlite3_finalize (stmt);
320 create_indices (plugin->dbh);
323 sq_prepare (plugin->dbh,
324 "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
326 if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
327 (sqlite3_exec (plugin->dbh,
328 "CREATE TABLE gn071 ("
329 " key TEXT NOT NULL DEFAULT '',"
330 " value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
331 NULL) != SQLITE_OK) )
333 LOG_SQLITE (plugin, NULL,
334 GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
335 sqlite3_finalize (stmt);
336 return GNUNET_SYSERR;
338 sqlite3_finalize (stmt);
340 if ((sq_prepare (plugin->dbh,
341 "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
343 &plugin->updPrio) != SQLITE_OK) ||
344 (sq_prepare (plugin->dbh,
345 "INSERT INTO gn080 (size, type, prio, "
346 "anonLevel, expire, hash, vhash, value) VALUES "
347 "(?, ?, ?, ?, ?, ?, ?, ?)",
348 &plugin->insertContent) != SQLITE_OK))
350 LOG_SQLITE (plugin, NULL,
351 GNUNET_ERROR_TYPE_ERROR, "precompiling");
352 return GNUNET_SYSERR;
359 * Synchronize our utilization statistics with the
360 * statistics service.
361 * @param plugin the plugin context (state for this module)
364 sync_stats (struct Plugin *plugin)
366 GNUNET_STATISTICS_set (plugin->statistics,
370 plugin->lastSync = 0;
375 * Shutdown database connection and associate data
377 * @param plugin the plugin context (state for this module)
380 database_shutdown (struct Plugin *plugin)
382 if (plugin->lastSync > 0)
384 if (plugin->updPrio != NULL)
385 sqlite3_finalize (plugin->updPrio);
386 if (plugin->insertContent != NULL)
387 sqlite3_finalize (plugin->insertContent);
388 sqlite3_close (plugin->dbh);
389 GNUNET_free_non_null (plugin->fn);
394 * Get an estimate of how much space the database is
397 * @param cls our plugin context
398 * @return number of bytes used on disk
400 static unsigned long long sqlite_plugin_get_size (void *cls)
402 struct Plugin *plugin = cls;
403 return plugin->payload;
408 * Delete the database entry with the given
411 * @param plugin the plugin context (state for this module)
412 * @param rid the ID of the row to delete
415 delete_by_rowid (struct Plugin* plugin,
416 unsigned long long rid)
420 if (sq_prepare (plugin->dbh,
421 "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
423 LOG_SQLITE (plugin, NULL,
424 GNUNET_ERROR_TYPE_ERROR |
425 GNUNET_ERROR_TYPE_BULK, "sq_prepare");
426 return GNUNET_SYSERR;
428 sqlite3_bind_int64 (stmt, 1, rid);
429 if (SQLITE_DONE != sqlite3_step (stmt))
431 LOG_SQLITE (plugin, NULL,
432 GNUNET_ERROR_TYPE_ERROR |
433 GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
434 sqlite3_finalize (stmt);
435 return GNUNET_SYSERR;
437 sqlite3_finalize (stmt);
443 * Context for the universal iterator.
448 * Type of a function that will prepare
449 * the next iteration.
452 * @param nc the next context; NULL for the last
453 * call which gives the callback a chance to
454 * clean up the closure
455 * @return GNUNET_OK on success, GNUNET_NO if there are
456 * no more values, GNUNET_SYSERR on error
458 typedef int (*PrepareFunction)(void *cls,
459 struct NextContext *nc);
463 * Context we keep for the "next request" callback.
470 struct Plugin *plugin;
473 * Function to call on the next value.
483 * Function to call to prepare the next
486 PrepareFunction prep;
494 * Statement that the iterator will get the data
495 * from (updated or set by prep).
500 * Row ID of the last result.
502 unsigned long long last_rowid;
505 * Key of the last result.
507 GNUNET_HashCode lastKey;
510 * Expiration time of the last value visited.
512 struct GNUNET_TIME_Absolute lastExpiration;
515 * Priority of the last value visited.
517 unsigned int lastPriority;
520 * Number of results processed so far.
525 * Set to GNUNET_YES if we must stop now.
532 * Continuation of "sqlite_next_request".
534 * @param cls the next context
535 * @param tc the task context (unused)
538 sqlite_next_request_cont (void *cls,
539 const struct GNUNET_SCHEDULER_TaskContext *tc)
541 struct NextContext * nc = cls;
542 struct Plugin *plugin;
543 unsigned long long rowid;
548 unsigned int priority;
549 unsigned int anonymity;
550 struct GNUNET_TIME_Absolute expiration;
551 const GNUNET_HashCode *key;
555 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
556 plugin->next_task_nc = NULL;
557 if ( (GNUNET_YES == nc->end_it) ||
558 (GNUNET_OK != (nc->prep(nc->prep_cls,
562 nc->iter (nc->iter_cls,
563 NULL, NULL, 0, NULL, 0, 0, 0,
564 GNUNET_TIME_UNIT_ZERO_ABS, 0);
565 nc->prep (nc->prep_cls, NULL);
570 rowid = sqlite3_column_int64 (nc->stmt, 7);
571 nc->last_rowid = rowid;
572 type = sqlite3_column_int (nc->stmt, 1);
573 size = sqlite3_column_bytes (nc->stmt, 6);
574 if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
576 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
578 _("Invalid data in database. Trying to fix (by deletion).\n"));
579 if (SQLITE_OK != sqlite3_reset (nc->stmt))
580 LOG_SQLITE (nc->plugin, NULL,
581 GNUNET_ERROR_TYPE_ERROR |
582 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
585 "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
586 &stmtd) != SQLITE_OK)
588 LOG_SQLITE (nc->plugin, NULL,
589 GNUNET_ERROR_TYPE_ERROR |
590 GNUNET_ERROR_TYPE_BULK,
595 if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
596 LOG_SQLITE (nc->plugin, NULL,
597 GNUNET_ERROR_TYPE_ERROR |
598 GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
599 if (SQLITE_DONE != sqlite3_step (stmtd))
600 LOG_SQLITE (nc->plugin, NULL,
601 GNUNET_ERROR_TYPE_ERROR |
602 GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
603 if (SQLITE_OK != sqlite3_finalize (stmtd))
604 LOG_SQLITE (nc->plugin, NULL,
605 GNUNET_ERROR_TYPE_ERROR |
606 GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
610 priority = sqlite3_column_int (nc->stmt, 2);
611 anonymity = sqlite3_column_int (nc->stmt, 3);
612 expiration.value = sqlite3_column_int64 (nc->stmt, 4);
613 key = sqlite3_column_blob (nc->stmt, 5);
614 nc->lastPriority = priority;
615 nc->lastExpiration = expiration;
616 memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
617 data = sqlite3_column_blob (nc->stmt, 6);
619 ret = nc->iter (nc->iter_cls,
629 if (ret == GNUNET_SYSERR)
631 nc->end_it = GNUNET_YES;
635 if (ret == GNUNET_NO)
636 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
638 "Asked to remove entry %llu (%u bytes)\n",
639 (unsigned long long) rowid,
640 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
642 if ( (ret == GNUNET_NO) &&
643 (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
645 plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
648 if (ret == GNUNET_NO)
649 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
651 "Removed entry %llu (%u bytes), new payload is %llu\n",
652 (unsigned long long) rowid,
653 size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
654 (unsigned long long) plugin->payload);
656 if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
663 * Function invoked on behalf of a "PluginIterator"
664 * asking the database plugin to call the iterator
665 * with the next item.
667 * @param next_cls whatever argument was given
668 * to the PluginIterator as "next_cls".
669 * @param end_it set to GNUNET_YES if we
670 * should terminate the iteration early
671 * (iterator should be still called once more
672 * to signal the end of the iteration).
675 sqlite_next_request (void *next_cls,
678 struct NextContext * nc= next_cls;
680 if (GNUNET_YES == end_it)
681 nc->end_it = GNUNET_YES;
682 nc->plugin->next_task_nc = nc;
683 nc->plugin->next_task = GNUNET_SCHEDULER_add_now (nc->plugin->env->sched,
684 &sqlite_next_request_cont,
691 * Store an item in the datastore.
694 * @param key key for the item
695 * @param size number of bytes in data
696 * @param data content stored
697 * @param type type of the content
698 * @param priority priority of the content
699 * @param anonymity anonymity-level for the content
700 * @param expiration expiration time for the content
701 * @param msg set to an error message
702 * @return GNUNET_OK on success
705 sqlite_plugin_put (void *cls,
706 const GNUNET_HashCode * key,
712 struct GNUNET_TIME_Absolute expiration,
715 struct Plugin *plugin = cls;
718 GNUNET_HashCode vhash;
721 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
723 "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
727 (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
728 (long long) expiration.value);
730 GNUNET_CRYPTO_hash (data, size, &vhash);
731 stmt = plugin->insertContent;
732 if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
733 (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
734 (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
735 (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
736 (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
738 sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
739 SQLITE_TRANSIENT)) ||
741 sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
744 sqlite3_bind_blob (stmt, 8, data, size,
749 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
750 if (SQLITE_OK != sqlite3_reset (stmt))
751 LOG_SQLITE (plugin, NULL,
752 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
753 return GNUNET_SYSERR;
755 n = sqlite3_step (stmt);
756 if (n != SQLITE_DONE)
758 if (n == SQLITE_BUSY)
760 LOG_SQLITE (plugin, msg,
761 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
762 sqlite3_reset (stmt);
766 LOG_SQLITE (plugin, msg,
767 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
768 sqlite3_reset (stmt);
769 return GNUNET_SYSERR;
771 if (SQLITE_OK != sqlite3_reset (stmt))
772 LOG_SQLITE (plugin, NULL,
773 GNUNET_ERROR_TYPE_ERROR |
774 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
776 plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
778 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
780 "Stored new entry (%u bytes), new payload is %llu\n",
781 size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
782 (unsigned long long) plugin->payload);
784 if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
791 * Update the priority for a particular key in the datastore. If
792 * the expiration time in value is different than the time found in
793 * the datastore, the higher value should be kept. For the
794 * anonymity level, the lower value is to be used. The specified
795 * priority should be added to the existing priority, ignoring the
798 * Note that it is possible for multiple values to match this put.
799 * In that case, all of the respective values are updated.
801 * @param cls the plugin context (state for this module)
802 * @param uid unique identifier of the datum
803 * @param delta by how much should the priority
804 * change? If priority + delta < 0 the
805 * priority should be set to 0 (never go
807 * @param expire new expiration time should be the
808 * MAX of any existing expiration time and
810 * @param msg set to an error message
811 * @return GNUNET_OK on success
814 sqlite_plugin_update (void *cls,
816 int delta, struct GNUNET_TIME_Absolute expire,
819 struct Plugin *plugin = cls;
822 sqlite3_bind_int (plugin->updPrio, 1, delta);
823 sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
824 sqlite3_bind_int64 (plugin->updPrio, 3, uid);
825 n = sqlite3_step (plugin->updPrio);
826 if (n != SQLITE_DONE)
827 LOG_SQLITE (plugin, msg,
828 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
832 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
836 sqlite3_reset (plugin->updPrio);
838 if (n == SQLITE_BUSY)
840 return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
845 * Internal context for an iteration.
852 sqlite3_stmt *stmt_1;
857 sqlite3_stmt *stmt_2;
877 int limit_nonanonymous;
880 * Desired type for blocks returned by this iterator.
887 * Prepare our SQL query to obtain the next record from the database.
889 * @param cls our "struct IterContext"
890 * @param nc NULL to terminate the iteration, otherwise our context for
891 * getting the next result.
892 * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
893 * GNUNET_SYSERR on error (or end of iteration)
896 iter_next_prepare (void *cls,
897 struct NextContext *nc)
899 struct IterContext *ic = cls;
900 struct Plugin *plugin;
906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907 "Asked to clean up iterator state.\n");
909 sqlite3_finalize (ic->stmt_1);
910 sqlite3_finalize (ic->stmt_2);
911 return GNUNET_SYSERR;
913 sqlite3_reset (ic->stmt_1);
914 sqlite3_reset (ic->stmt_2);
919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920 "Restricting to results larger than the last priority %u\n",
923 sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
924 sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930 "Restricting to results larger than the last expiration %llu\n",
931 (unsigned long long) nc->lastExpiration.value);
933 sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
934 sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938 "Restricting to results larger than the last key `%s'\n",
939 GNUNET_h2s(&nc->lastKey));
941 sqlite3_bind_blob (ic->stmt_1, 2,
943 sizeof (GNUNET_HashCode),
945 if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
948 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949 "Result found using iterator 1\n");
951 nc->stmt = ic->stmt_1;
954 if (ret != SQLITE_DONE)
956 LOG_SQLITE (plugin, NULL,
957 GNUNET_ERROR_TYPE_ERROR |
958 GNUNET_ERROR_TYPE_BULK,
960 return GNUNET_SYSERR;
962 if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
963 LOG_SQLITE (plugin, NULL,
964 GNUNET_ERROR_TYPE_ERROR |
965 GNUNET_ERROR_TYPE_BULK,
967 if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2)))
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Result found using iterator 2\n");
973 nc->stmt = ic->stmt_2;
976 if (ret != SQLITE_DONE)
978 LOG_SQLITE (plugin, NULL,
979 GNUNET_ERROR_TYPE_ERROR |
980 GNUNET_ERROR_TYPE_BULK,
982 return GNUNET_SYSERR;
984 if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
985 LOG_SQLITE (plugin, NULL,
986 GNUNET_ERROR_TYPE_ERROR |
987 GNUNET_ERROR_TYPE_BULK,
990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991 "No result found using either iterator\n");
998 * Call a method for each key in the database and
999 * call the callback method on it.
1001 * @param plugin our plugin context
1002 * @param type entries of which type should be considered?
1003 * @param is_asc are we iterating in ascending order?
1004 * @param is_prio are we iterating by priority (otherwise by expiration)
1005 * @param is_migr are we iterating in migration order?
1006 * @param limit_nonanonymous are we restricting results to those with anonymity
1008 * @param stmt_str_1 first SQL statement to execute
1009 * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1010 * @param iter function to call on each matching value;
1011 * will be called once with a NULL value at the end
1012 * @param iter_cls closure for iter
1015 basic_iter (struct Plugin *plugin,
1020 int limit_nonanonymous,
1021 const char *stmt_str_1,
1022 const char *stmt_str_2,
1023 PluginIterator iter,
1026 struct NextContext *nc;
1027 struct IterContext *ic;
1028 sqlite3_stmt *stmt_1;
1029 sqlite3_stmt *stmt_2;
1032 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033 "At %llu, using queries `%s' and `%s'\n",
1034 (unsigned long long) GNUNET_TIME_absolute_get ().value,
1038 if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1040 LOG_SQLITE (plugin, NULL,
1041 GNUNET_ERROR_TYPE_ERROR |
1042 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1043 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1046 if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1048 LOG_SQLITE (plugin, NULL,
1049 GNUNET_ERROR_TYPE_ERROR |
1050 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1051 sqlite3_finalize (stmt_1);
1052 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1055 nc = GNUNET_malloc (sizeof(struct NextContext) +
1056 sizeof(struct IterContext));
1057 nc->plugin = plugin;
1059 nc->iter_cls = iter_cls;
1061 ic = (struct IterContext*) &nc[1];
1062 ic->stmt_1 = stmt_1;
1063 ic->stmt_2 = stmt_2;
1065 ic->is_asc = is_asc;
1066 ic->is_prio = is_prio;
1067 ic->is_migr = is_migr;
1068 ic->limit_nonanonymous = limit_nonanonymous;
1069 nc->prep = &iter_next_prepare;
1073 nc->lastPriority = 0;
1074 nc->lastExpiration.value = 0;
1075 memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1079 nc->lastPriority = 0x7FFFFFFF;
1080 nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1081 memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1083 sqlite_next_request (nc, GNUNET_NO);
1088 * Select a subset of the items in the datastore and call
1089 * the given iterator for each of them.
1091 * @param cls our plugin context
1092 * @param type entries of which type should be considered?
1093 * Use 0 for any type.
1094 * @param iter function to call on each matching value;
1095 * will be called once with a NULL value at the end
1096 * @param iter_cls closure for iter
1099 sqlite_plugin_iter_low_priority (void *cls,
1101 PluginIterator iter,
1106 GNUNET_YES, GNUNET_YES,
1107 GNUNET_NO, GNUNET_NO,
1108 SELECT_IT_LOW_PRIORITY_1,
1109 SELECT_IT_LOW_PRIORITY_2,
1115 * Select a subset of the items in the datastore and call
1116 * the given iterator for each of them.
1118 * @param cls our plugin context
1119 * @param type entries of which type should be considered?
1120 * Use 0 for any type.
1121 * @param iter function to call on each matching value;
1122 * will be called once with a NULL value at the end
1123 * @param iter_cls closure for iter
1126 sqlite_plugin_iter_zero_anonymity (void *cls,
1128 PluginIterator iter,
1131 struct GNUNET_TIME_Absolute now;
1135 now = GNUNET_TIME_absolute_get ();
1136 GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1137 (unsigned long long) now.value);
1138 GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1139 (unsigned long long) now.value);
1142 GNUNET_NO, GNUNET_YES,
1143 GNUNET_NO, GNUNET_YES,
1154 * Select a subset of the items in the datastore and call
1155 * the given iterator for each of them.
1157 * @param cls our plugin context
1158 * @param type entries of which type should be considered?
1159 * Use 0 for any type.
1160 * @param iter function to call on each matching value;
1161 * will be called once with a NULL value at the end
1162 * @param iter_cls closure for iter
1165 sqlite_plugin_iter_ascending_expiration (void *cls,
1167 PluginIterator iter,
1170 struct GNUNET_TIME_Absolute now;
1174 now = GNUNET_TIME_absolute_get ();
1175 GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1176 (unsigned long long) 0*now.value);
1177 GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1178 (unsigned long long) 0*now.value);
1181 GNUNET_YES, GNUNET_NO,
1182 GNUNET_NO, GNUNET_NO,
1191 * Select a subset of the items in the datastore and call
1192 * the given iterator for each of them.
1194 * @param cls our plugin context
1195 * @param type entries of which type should be considered?
1196 * Use 0 for any type.
1197 * @param iter function to call on each matching value;
1198 * will be called once with a NULL value at the end
1199 * @param iter_cls closure for iter
1202 sqlite_plugin_iter_migration_order (void *cls,
1204 PluginIterator iter,
1207 struct GNUNET_TIME_Absolute now;
1210 now = GNUNET_TIME_absolute_get ();
1211 GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1212 (unsigned long long) now.value);
1215 GNUNET_NO, GNUNET_NO,
1216 GNUNET_YES, GNUNET_NO,
1217 SELECT_IT_MIGRATION_ORDER_1,
1225 * Call sqlite using the already prepared query to get
1228 * @param cls not used
1229 * @param nc context with the prepared query
1230 * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1231 * there are no more results
1234 all_next_prepare (void *cls,
1235 struct NextContext *nc)
1237 struct Plugin *plugin;
1243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244 "Asked to clean up iterator state.\n");
1246 return GNUNET_SYSERR;
1248 plugin = nc->plugin;
1249 if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1253 if (ret != SQLITE_DONE)
1255 LOG_SQLITE (plugin, NULL,
1256 GNUNET_ERROR_TYPE_ERROR |
1257 GNUNET_ERROR_TYPE_BULK,
1259 return GNUNET_SYSERR;
1266 * Select a subset of the items in the datastore and call
1267 * the given iterator for each of them.
1269 * @param cls our plugin context
1270 * @param type entries of which type should be considered?
1271 * Use 0 for any type.
1272 * @param iter function to call on each matching value;
1273 * will be called once with a NULL value at the end
1274 * @param iter_cls closure for iter
1277 sqlite_plugin_iter_all_now (void *cls,
1279 PluginIterator iter,
1282 struct Plugin *plugin = cls;
1283 struct NextContext *nc;
1286 if (sq_prepare (plugin->dbh,
1287 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1288 &stmt) != SQLITE_OK)
1290 LOG_SQLITE (plugin, NULL,
1291 GNUNET_ERROR_TYPE_ERROR |
1292 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1293 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1296 nc = GNUNET_malloc (sizeof(struct NextContext));
1297 nc->plugin = plugin;
1299 nc->iter_cls = iter_cls;
1301 nc->prep = &all_next_prepare;
1302 nc->prep_cls = NULL;
1303 sqlite_next_request (nc, GNUNET_NO);
1310 struct GetNextContext
1341 GNUNET_HashCode key;
1346 GNUNET_HashCode vhash;
1354 * @param cls our "struct GetNextContext*"
1356 * @return GNUNET_YES if there are more results,
1357 * GNUNET_NO if there are no more results,
1358 * GNUNET_SYSERR on internal error
1361 get_next_prepare (void *cls,
1362 struct NextContext *nc)
1364 struct GetNextContext *gnc = cls;
1371 sqlite3_finalize (gnc->stmt);
1372 return GNUNET_SYSERR;
1374 if (nc->count == gnc->total)
1376 if (nc->count + gnc->off == gnc->total)
1379 limit_off = gnc->off;
1383 sqlite3_reset (nc->stmt);
1384 ret = sqlite3_bind_blob (nc->stmt,
1387 sizeof (GNUNET_HashCode),
1389 if ((gnc->have_vhash) && (ret == SQLITE_OK))
1390 ret = sqlite3_bind_blob (nc->stmt,
1393 sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1394 if ((gnc->type != 0) && (ret == SQLITE_OK))
1395 ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1396 if (ret == SQLITE_OK)
1397 ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1398 if (ret == SQLITE_OK)
1399 ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1400 if (ret != SQLITE_OK)
1401 return GNUNET_SYSERR;
1402 if (SQLITE_ROW != sqlite3_step (nc->stmt))
1409 * Iterate over the results for a particular key
1412 * @param cls closure
1413 * @param key maybe NULL (to match all entries)
1414 * @param vhash hash of the value, maybe NULL (to
1415 * match all values that have the right key).
1416 * Note that for DBlocks there is no difference
1417 * betwen key and vhash, but for other blocks
1419 * @param type entries of which type are relevant?
1420 * Use 0 for any type.
1421 * @param iter function to call on each matching value;
1422 * will be called once with a NULL value at the end
1423 * @param iter_cls closure for iter
1426 sqlite_plugin_get (void *cls,
1427 const GNUNET_HashCode * key,
1428 const GNUNET_HashCode * vhash,
1430 PluginIterator iter, void *iter_cls)
1432 struct Plugin *plugin = cls;
1433 struct GetNextContext *gpc;
1434 struct NextContext *nc;
1441 GNUNET_assert (iter != NULL);
1444 sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1447 GNUNET_snprintf (scratch, sizeof (scratch),
1448 "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1449 vhash == NULL ? "" : " AND vhash=:2",
1450 type == 0 ? "" : (vhash ==
1451 NULL) ? " AND type=:2" : " AND type=:3");
1452 if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1454 LOG_SQLITE (plugin, NULL,
1455 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1456 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1460 ret = sqlite3_bind_blob (stmt,
1462 key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1463 if ((vhash != NULL) && (ret == SQLITE_OK))
1464 ret = sqlite3_bind_blob (stmt,
1467 sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1468 if ((type != 0) && (ret == SQLITE_OK))
1469 ret = sqlite3_bind_int (stmt, sqoff++, type);
1470 if (SQLITE_OK != ret)
1472 LOG_SQLITE (plugin, NULL,
1473 GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1474 sqlite3_reset (stmt);
1475 sqlite3_finalize (stmt);
1476 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1479 ret = sqlite3_step (stmt);
1480 if (ret != SQLITE_ROW)
1482 LOG_SQLITE (plugin, NULL,
1483 GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK,
1485 sqlite3_reset (stmt);
1486 sqlite3_finalize (stmt);
1487 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1490 total = sqlite3_column_int (stmt, 0);
1491 sqlite3_reset (stmt);
1492 sqlite3_finalize (stmt);
1495 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1499 GNUNET_snprintf (scratch, sizeof (scratch),
1500 "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1501 "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1502 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1503 vhash == NULL ? "" : " AND vhash=:2",
1504 type == 0 ? "" : (vhash ==
1505 NULL) ? " AND type=:2" : " AND type=:3",
1507 if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1509 LOG_SQLITE (plugin, NULL,
1510 GNUNET_ERROR_TYPE_ERROR |
1511 GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1512 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1515 nc = GNUNET_malloc (sizeof(struct NextContext) +
1516 sizeof(struct GetNextContext));
1517 nc->plugin = plugin;
1519 nc->iter_cls = iter_cls;
1521 gpc = (struct GetNextContext*) &nc[1];
1525 gpc->stmt = stmt; /* alias used for freeing at the end! */
1528 gpc->have_vhash = GNUNET_YES;
1529 gpc->vhash = *vhash;
1531 gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1532 nc->prep = &get_next_prepare;
1534 sqlite_next_request (nc, GNUNET_NO);
1541 * @param cls our plugin context
1544 sqlite_plugin_drop (void *cls)
1546 struct Plugin *plugin = cls;
1547 plugin->drop_on_shutdown = GNUNET_YES;
1552 * Callback function to process statistic values.
1554 * @param cls closure
1555 * @param subsystem name of subsystem that created the statistic
1556 * @param name the name of the datum
1557 * @param value the current value
1558 * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1559 * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1562 process_stat_in (void *cls,
1563 const char *subsystem,
1568 struct Plugin *plugin = cls;
1569 plugin->payload += value;
1571 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1573 "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1575 (unsigned long long) plugin->payload);
1582 process_stat_done (void *cls,
1585 struct Plugin *plugin = cls;
1586 plugin->stat_get = NULL;
1591 * Entry point for the plugin.
1593 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1594 * @return NULL on error, othrewise the plugin context
1597 libgnunet_plugin_datastore_sqlite_init (void *cls)
1599 static struct Plugin plugin;
1600 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1601 struct GNUNET_DATASTORE_PluginFunctions *api;
1603 if (plugin.env != NULL)
1604 return NULL; /* can only initialize once! */
1605 memset (&plugin, 0, sizeof(struct Plugin));
1607 plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1610 plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics,
1613 GNUNET_TIME_UNIT_MINUTES,
1618 database_setup (env->cfg, &plugin))
1620 database_shutdown (&plugin);
1623 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1625 api->get_size = &sqlite_plugin_get_size;
1626 api->put = &sqlite_plugin_put;
1627 api->next_request = &sqlite_next_request;
1628 api->get = &sqlite_plugin_get;
1629 api->update = &sqlite_plugin_update;
1630 api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1631 api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1632 api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1633 api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1634 api->iter_all_now = &sqlite_plugin_iter_all_now;
1635 api->drop = &sqlite_plugin_drop;
1636 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1637 "sqlite", _("Sqlite database running\n"));
1643 * Exit point from the plugin.
1645 * @param cls the plugin context (as returned by "init")
1646 * @return always NULL
1649 libgnunet_plugin_datastore_sqlite_done (void *cls)
1652 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1653 struct Plugin *plugin = api->cls;
1655 if (plugin->stat_get != NULL)
1657 GNUNET_STATISTICS_get_cancel (plugin->stat_get);
1658 plugin->stat_get = NULL;
1660 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1662 GNUNET_SCHEDULER_cancel (plugin->env->sched,
1664 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1665 plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1666 GNUNET_free (plugin->next_task_nc);
1667 plugin->next_task_nc = NULL;
1670 if (plugin->drop_on_shutdown)
1671 fn = GNUNET_strdup (plugin->fn);
1672 database_shutdown (plugin);
1673 GNUNET_STATISTICS_destroy (plugin->statistics,
1676 plugin->payload = 0;
1680 if (0 != UNLINK(fn))
1681 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1689 /* end of plugin_datastore_sqlite.c */