From: Christian Grothoff Date: Sun, 3 Apr 2011 20:00:42 +0000 (+0000) Subject: improving datastore API --- not working yet X-Git-Tag: initial-import-from-subversion-38251~18822 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=e8f35bb025c25839a52fb502e452393831e4e6f0;p=oweals%2Fgnunet.git improving datastore API --- not working yet --- diff --git a/src/datastore/Makefile.am b/src/datastore/Makefile.am index 4cd80d090..ec0598819 100644 --- a/src/datastore/Makefile.am +++ b/src/datastore/Makefile.am @@ -37,6 +37,7 @@ gnunet_service_datastore_LDADD = \ $(GN_LIBINTL) if HAVE_MYSQL +if HAVE_EXPERIMENTAL MYSQL_PLUGIN = libgnunet_plugin_datastore_mysql.la MYSQL_TESTS = \ test_datastore_api_mysql \ @@ -44,6 +45,7 @@ if HAVE_MYSQL perf_datastore_api_mysql \ perf_plugin_datastore_mysql endif +endif if HAVE_SQLITE SQLITE_PLUGIN = libgnunet_plugin_datastore_sqlite.la SQLITE_TESTS = \ @@ -53,6 +55,7 @@ if HAVE_SQLITE perf_plugin_datastore_sqlite endif if HAVE_POSTGRES +if HAVE_EXPERIMENTAL POSTGRES_PLUGIN = libgnunet_plugin_datastore_postgres.la POSTGRES_TESTS = \ test_datastore_api_postgres \ @@ -60,6 +63,7 @@ if HAVE_POSTGRES perf_datastore_api_postgres \ perf_plugin_datastore_postgres endif +endif plugin_LTLIBRARIES = \ $(SQLITE_PLUGIN) \ diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 344a70842..dde45f24f 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -305,8 +305,9 @@ transmit_drop (void *cls, * @param h handle to the datastore * @param drop set to GNUNET_YES to delete all data in datastore (!) */ -void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, - int drop) +void +GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, + int drop) { struct GNUNET_DATASTORE_QueueEntry *qe; @@ -668,7 +669,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) * @param emsg error message */ static void -drop_status_cont (void *cls, int result, const char *emsg) +drop_status_cont (void *cls, int32_t result, const char *emsg) { /* do nothing */ } @@ -806,7 +807,7 @@ process_status_message (void *cls, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - int rid, + uint32_t rid, const GNUNET_HashCode * key, size_t size, const void *data, @@ -959,7 +960,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - int rid, + uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, @@ -1022,7 +1023,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, - unsigned long long uid, + uint64_t uid, uint32_t priority, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, @@ -1250,7 +1251,7 @@ process_result_message (void *cls, do_disconnect (h); return; } - GNUNET_DATASTORE_get_next (h); + GNUNET_DATASTORE_iterate_get_next (h); return; } dm = (const struct DataMessage*) msg; @@ -1355,13 +1356,13 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_BLOCK_Type type, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GetZeroAnonymityMessage *m; @@ -1404,7 +1405,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * in the datastore. The iterator will only be called * once initially; if the first call did contain a * result, further results can be obtained by calling - * "GNUNET_DATASTORE_get_next" with the given argument. + * "GNUNET_DATASTORE_iterate_get_next" with the given argument. * * @param h handle to the datastore * @param key maybe NULL (to match all entries) @@ -1421,14 +1422,14 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GetMessage *gm; @@ -1482,7 +1483,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore */ void -GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h) +GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h) { struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 2538d5ef6..1fa2bbccb 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -42,6 +42,13 @@ */ #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) +/** + * How fast are we allowed to query the database for deleting + * expired content? (1 item per second). + */ +#define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) + + #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore") /** @@ -348,10 +355,12 @@ expired_processor (void *cls, if (expiration.abs_value > now.abs_value) { /* finished processing */ - plugin->api->next_request (next_cls, GNUNET_YES); + expired_kill_task + = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, + &delete_expired, + NULL); return GNUNET_SYSERR; } - plugin->api->next_request (next_cls, GNUNET_NO); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting content `%s' of type %u that expired %llu ms ago\n", @@ -365,7 +374,11 @@ expired_processor (void *cls, GNUNET_YES); GNUNET_CONTAINER_bloomfilter_remove (filter, key); - return GNUNET_NO; /* delete */ + expired_kill_task + = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY, + &delete_expired, + NULL); + return GNUNET_NO; } @@ -383,15 +396,15 @@ delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { expired_kill_task = GNUNET_SCHEDULER_NO_TASK; - plugin->api->iter_ascending_expiration (plugin->api->cls, - 0, - &expired_processor, - NULL); + plugin->api->expiration_get (plugin->api->cls, + &expired_processor, + NULL); } /** - * An iterator over a set of items stored in the datastore. + * An iterator over a set of items stored in the datastore + * that deletes until we're happy with respect to our quota. * * @param cls closure * @param next_cls closure to pass to the "next" function. @@ -410,31 +423,21 @@ delete_expired (void *cls, * GNUNET_NO to delete the item and continue (if supported) */ static int -manage (void *cls, - void *next_cls, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid) +quota_processor (void *cls, + void *next_cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { unsigned long long *need = cls; if (NULL == key) - { - GNUNET_free (need); - return GNUNET_SYSERR; - } - if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need) - *need = 0; - else - *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD; - plugin->api->next_request (next_cls, - (0 == *need) ? GNUNET_YES : GNUNET_NO); + return GNUNET_SYSERR; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n", @@ -443,6 +446,10 @@ manage (void *cls, type, *need); #endif + if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need) + *need = 0; + else + *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD; GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes purged (low-priority)"), size, @@ -468,19 +475,22 @@ manage (void *cls, static void manage_space (unsigned long long need) { - unsigned long long *n; + unsigned long long last; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to free up %llu bytes of cache space\n", need); #endif - n = GNUNET_malloc (sizeof(unsigned long long)); - *n = need; - plugin->api->iter_low_priority (plugin->api->cls, - 0, - &manage, - n); + last = 0; + while ( (need > 0) && + (last != need) ) + { + last = need; + plugin->api->expiration_get (plugin->api->cls, + "a_processor, + &need); + } } @@ -1250,10 +1260,9 @@ handle_get_random (void *cls, 1, GNUNET_NO); GNUNET_SERVER_client_keep (client); - plugin->api->iter_migration_order (plugin->api->cls, - GNUNET_BLOCK_TYPE_ANY, - &transmit_item, - client); + plugin->api->replication_get (plugin->api->cls, + &transmit_item, + client); } /** diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c index 00d91b4c7..3b89ad233 100644 --- a/src/datastore/perf_datastore_api.c +++ b/src/datastore/perf_datastore_api.c @@ -223,13 +223,13 @@ delete_value (void *cls, stored_ops++; if (stored_bytes < MAX_SIZE) { - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); return; } crc->key = *key; crc->esize = size; memcpy (crc->data, data, size); - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); } diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c index cb25da46b..f7216a5a6 100644 --- a/src/datastore/perf_plugin_datastore.c +++ b/src/datastore/perf_plugin_datastore.c @@ -62,7 +62,6 @@ enum RunPhase RP_LP_GET, RP_AE_GET, RP_ZA_GET, - RP_MO_GET, RP_AN_GET }; @@ -183,8 +182,9 @@ iterateDummy (void *cls, else crc->phase = RP_PUT; } - GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK, - &test, crc); + crc->cnt = 0; + crc->start = GNUNET_TIME_absolute_get (); + GNUNET_SCHEDULER_add_now (&test, crc); return GNUNET_OK; } #if VERBOSE @@ -200,6 +200,37 @@ iterateDummy (void *cls, +static int +dummy_get (void *cls, + void *next_cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid) +{ + struct CpsRunContext *crc = cls; + + crc->cnt++; + if (1000 == crc->cnt) + { + crc->end = GNUNET_TIME_absolute_get(); + printf (crc->msg, + crc->i, + (unsigned long long) (crc->end.abs_value - crc->start.abs_value), + crc->cnt); + crc->phase++; + crc->cnt = 0; + crc->start = GNUNET_TIME_absolute_get (); + } + GNUNET_SCHEDULER_add_now (&test, crc); + return GNUNET_OK; +} + /** * Function called when the service shuts * down. Unloads our datastore plugin. @@ -265,46 +296,31 @@ test (void *cls, (unsigned long long) (crc->end.abs_value - crc->start.abs_value), (unsigned int) PUT_10); crc->i++; + crc->start = GNUNET_TIME_absolute_get (); crc->phase = RP_LP_GET; GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK, &test, crc); break; case RP_LP_GET: - crc->cnt = 0; - crc->start = GNUNET_TIME_absolute_get (); - crc->msg = "%3u low priority iteration took %20llums for %u\n"; - crc->api->iter_low_priority (crc->api->cls, 0, - &iterateDummy, - crc); + crc->msg = "%3u replication iteration took %20llums for %u\n"; + crc->api->replication_get (crc->api->cls, + &dummy_get, + crc); break; case RP_AE_GET: - crc->cnt = 0; - crc->start = GNUNET_TIME_absolute_get (); - crc->msg = "%3u ascending expiration iteration took %20llums for %u\n"; - crc->api->iter_ascending_expiration (crc->api->cls, 0, - &iterateDummy, - crc); + crc->msg = "%3u expiration iteration took %20llums for %u\n"; + crc->api->expiration_get (crc->api->cls, + &dummy_get, + crc); break; case RP_ZA_GET: - crc->cnt = 0; - crc->start = GNUNET_TIME_absolute_get (); - crc->msg = "%3u zero anonymity iteration took %20llums for %u\n"; + crc->msg = "%3u zero anonymity iteration took %20llums for %u\n"; crc->api->iter_zero_anonymity (crc->api->cls, 0, &iterateDummy, crc); break; - case RP_MO_GET: - crc->cnt = 0; - crc->start = GNUNET_TIME_absolute_get (); - crc->msg = "%3u migration order iteration took %20llums for %u\n"; - crc->api->iter_migration_order (crc->api->cls, 0, - &iterateDummy, - crc); - break; case RP_AN_GET: - crc->cnt = 0; - crc->start = GNUNET_TIME_absolute_get (); - crc->msg = "%3u all now iteration took %20llums for %u\n"; + crc->msg = "%3u all now iteration took %20llums for %u\n"; crc->api->iter_all_now (crc->api->cls, 0, &iterateDummy, crc); @@ -312,7 +328,7 @@ test (void *cls, case RP_DONE: crc->api->drop (crc->api->cls); GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, - &cleaning_task, crc); + &cleaning_task, crc); break; } } diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index b8661f46d..b05a0a9c1 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -38,43 +38,25 @@ */ #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0) -#define SELECT_IT_LOW_PRIORITY_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash > ?) "\ - "ORDER BY hash ASC LIMIT 1" - -#define SELECT_IT_LOW_PRIORITY_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio > ?) "\ - "ORDER BY prio ASC, hash ASC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND hash < ?2) "\ " ORDER BY hash DESC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0)"\ " ORDER BY prio DESC, hash DESC LIMIT 1" -#define SELECT_IT_EXPIRATION_TIME_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash > ?) "\ - " ORDER BY hash ASC LIMIT 1" - -#define SELECT_IT_EXPIRATION_TIME_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ - " ORDER BY expire ASC, hash ASC LIMIT 1" - -#define SELECT_IT_MIGRATION_ORDER_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash < ?) "\ - " ORDER BY hash DESC LIMIT 1" - -#define SELECT_IT_MIGRATION_ORDER_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ? AND expire > %llu) "\ - " ORDER BY expire DESC, hash DESC LIMIT 1" - #define SELECT_IT_REPLICATION_ORDER \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) "\ " ORDER BY repl DESC, Random() LIMIT 1" +#define SELECT_IT_EXPIRATION_ORDER \ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) "\ + " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) "\ + " ORDER BY prio ASC LIMIT 1" + /** * After how many ms "busy" should a DB operation fail for good? @@ -126,10 +108,15 @@ struct Plugin sqlite3_stmt *updRepl; /** - * Precompiled SQL for replication decrement. + * Precompiled SQL for replication selection. */ sqlite3_stmt *selRepl; + /** + * Precompiled SQL for expiration selection. + */ + sqlite3_stmt *selExpi; + /** * Precompiled SQL for insertion. */ @@ -162,18 +149,23 @@ struct Plugin * @return 0 on success */ static int -sq_prepare (sqlite3 * dbh, const char *zSql, +sq_prepare (sqlite3 * dbh, + const char *zSql, sqlite3_stmt ** ppStmt) { char *dummy; int result; result = sqlite3_prepare_v2 (dbh, zSql, - strlen (zSql), ppStmt, (const char **) &dummy); + strlen (zSql), + ppStmt, + (const char **) &dummy); #if DEBUG_SQLITE GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", - "Prepared %p: %d\n", *ppStmt, result); + "Prepared %p: %d\n", + *ppStmt, + result); #endif return result; } @@ -190,21 +182,15 @@ create_indices (sqlite3 * dbh) /* create indices */ sqlite3_exec (dbh, "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL); - sqlite3_exec (dbh, - "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL, - NULL, NULL); sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL, + sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL, + sqlite3_exec (dbh, + "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)", + sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL, - NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL, - NULL, NULL); } @@ -357,6 +343,9 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, (sq_prepare (plugin->dbh, SELECT_IT_REPLICATION_ORDER, &plugin->selRepl) != SQLITE_OK) || + (sq_prepare (plugin->dbh, + SELECT_IT_EXPIRATION_ORDER, + &plugin->selExpi) != SQLITE_OK) || (sq_prepare (plugin->dbh, "INSERT INTO gn090 (repl, type, prio, " "anonLevel, expire, hash, vhash, value) VALUES " @@ -396,6 +385,8 @@ database_shutdown (struct Plugin *plugin) sqlite3_finalize (plugin->updRepl); if (plugin->selRepl != NULL) sqlite3_finalize (plugin->selRepl); + if (plugin->selExpi != NULL) + sqlite3_finalize (plugin->selExpi); if (plugin->insertContent != NULL) sqlite3_finalize (plugin->insertContent); result = sqlite3_close(plugin->dbh); @@ -457,9 +448,9 @@ delete_by_rowid (struct Plugin* plugin, return GNUNET_SYSERR; } if (SQLITE_OK != sqlite3_reset (plugin->delRow)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); return GNUNET_OK; } @@ -531,11 +522,6 @@ struct NextContext */ GNUNET_HashCode lastKey; - /** - * Expiration time of the last value visited. - */ - struct GNUNET_TIME_Absolute lastExpiration; - /** * Priority of the last value visited. */ @@ -566,15 +552,14 @@ sqlite_next_request_cont (void *cls, struct NextContext * nc = cls; struct Plugin *plugin; unsigned long long rowid; - sqlite3_stmt *stmtd; int ret; - unsigned int type; unsigned int size; - unsigned int priority; - unsigned int anonymity; - struct GNUNET_TIME_Absolute expiration; + uint32_t anonymity; + uint32_t priority; + enum GNUNET_BLOCK_Type type; const GNUNET_HashCode *key; - const void *data; + struct GNUNET_TIME_Absolute expiration; + char data[GNUNET_SERVER_MAX_MESSAGE_SIZE]; plugin = nc->plugin; plugin->next_task = GNUNET_SCHEDULER_NO_TASK; @@ -592,90 +577,72 @@ sqlite_next_request_cont (void *cls, return; } - rowid = sqlite3_column_int64 (nc->stmt, 6); - nc->last_rowid = rowid; type = sqlite3_column_int (nc->stmt, 0); + priority = sqlite3_column_int (nc->stmt, 1); + anonymity = sqlite3_column_int (nc->stmt, 2); + expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3); + key = sqlite3_column_blob (nc->stmt, 4); size = sqlite3_column_bytes (nc->stmt, 5); + memcpy (data, sqlite3_column_blob (nc->stmt, 5), size); + rowid = sqlite3_column_int64 (nc->stmt, 6); if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode)) { GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "sqlite", _("Invalid data in database. Trying to fix (by deletion).\n")); if (SQLITE_OK != sqlite3_reset (nc->stmt)) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - if (sq_prepare - (nc->plugin->dbh, - "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?", - &stmtd) != SQLITE_OK) - { - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sq_prepare"); - goto END; - } - - if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode))) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int"); - if (SQLITE_DONE != sqlite3_step (stmtd)) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - if (SQLITE_OK != sqlite3_finalize (stmtd)) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize"); + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); goto END; } - - priority = sqlite3_column_int (nc->stmt, 1); - anonymity = sqlite3_column_int (nc->stmt, 2); - expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3); - key = sqlite3_column_blob (nc->stmt, 4); - nc->lastPriority = priority; - nc->lastExpiration = expiration; - memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode)); - data = sqlite3_column_blob (nc->stmt, 5); nc->count++; - ret = nc->iter (nc->iter_cls, - nc, + nc->last_rowid = rowid; + nc->lastPriority = priority; + nc->lastKey = *key; + if (SQLITE_OK != sqlite3_reset (nc->stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + ret = nc->iter (nc->iter_cls, nc, key, - size, - data, - type, - priority, - anonymity, - expiration, + size, data, + type, priority, + anonymity, expiration, rowid); - if (ret == GNUNET_SYSERR) + switch (ret) { + case GNUNET_SYSERR: nc->end_it = GNUNET_YES; - return; - } -#if DEBUG_SQLITE - if (ret == GNUNET_NO) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Asked to remove entry %llu (%u bytes)\n", - (unsigned long long) rowid, - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#endif - if ( (ret == GNUNET_NO) && - (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) - { - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); + break; + case GNUNET_NO: #if DEBUG_SQLITE GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", - "Removed entry %llu (%u bytes)\n", + "Asked to remove entry %llu (%u bytes)\n", (unsigned long long) rowid, size + GNUNET_DATASTORE_ENTRY_OVERHEAD); #endif + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + { + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Removed entry %llu (%u bytes)\n", + (unsigned long long) rowid, + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); +#endif + } + break; + case GNUNET_YES: + break; + default: + GNUNET_break (0); } } @@ -723,7 +690,7 @@ sqlite_next_request (void *next_cls, */ static int sqlite_plugin_put (void *cls, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, @@ -774,37 +741,39 @@ sqlite_plugin_put (void *cls, return GNUNET_SYSERR; } n = sqlite3_step (stmt); - if (n != SQLITE_DONE) + switch (n) { - if (n == SQLITE_BUSY) - { - LOG_SQLITE (plugin, msg, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - sqlite3_reset (stmt); - GNUNET_break (0); - return GNUNET_NO; - } + case SQLITE_DONE: + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + plugin->env->duc (plugin->env->cls, + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Stored new entry (%u bytes)\n", + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); +#endif + return GNUNET_OK; + case SQLITE_BUSY: + GNUNET_break (0); + LOG_SQLITE (plugin, msg, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + sqlite3_reset (stmt); + return GNUNET_SYSERR; + default: LOG_SQLITE (plugin, msg, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); sqlite3_reset (stmt); database_shutdown (plugin); database_setup (plugin->env->cfg, plugin); - return GNUNET_SYSERR; + return GNUNET_SYSERR; } - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - plugin->env->duc (plugin->env->cls, - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Stored new entry (%u bytes)\n", - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#endif - return GNUNET_OK; } @@ -844,21 +813,27 @@ sqlite_plugin_update (void *cls, sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value); sqlite3_bind_int64 (plugin->updPrio, 3, uid); n = sqlite3_step (plugin->updPrio); - if (n != SQLITE_DONE) - LOG_SQLITE (plugin, msg, - GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); + sqlite3_reset (plugin->updPrio); + switch (n) + { + case SQLITE_DONE: #if DEBUG_SQLITE - else - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Block updated\n"); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Block updated\n"); #endif - sqlite3_reset (plugin->updPrio); - - if (n == SQLITE_BUSY) - return GNUNET_NO; - return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR; + return GNUNET_OK; + case SQLITE_BUSY: + LOG_SQLITE (plugin, msg, + GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + return GNUNET_NO; + default: + LOG_SQLITE (plugin, msg, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + return GNUNET_SYSERR; + } } @@ -877,26 +852,6 @@ struct IterContext */ sqlite3_stmt *stmt_2; - /** - * FIXME. - */ - int is_asc; - - /** - * FIXME. - */ - int is_prio; - - /** - * FIXME. - */ - int is_migr; - - /** - * FIXME. - */ - int limit_nonanonymous; - /** * Desired type for blocks returned by this iterator. */ @@ -934,26 +889,13 @@ iter_next_prepare (void *cls, sqlite3_reset (ic->stmt_1); sqlite3_reset (ic->stmt_2); plugin = nc->plugin; - if (ic->is_prio) - { -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Restricting to results larger than the last priority %u\n", - nc->lastPriority); -#endif - sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority); - sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority); - } - else - { #if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Restricting to results larger than the last expiration %llu\n", - (unsigned long long) nc->lastExpiration.abs_value); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Restricting to results larger than the last priority %u\n", + nc->lastPriority); #endif - sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value); - sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value); - } + sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority); + sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority); #if DEBUG_SQLITE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Restricting to results larger than the last key `%s'\n", @@ -1016,63 +958,56 @@ iter_next_prepare (void *cls, /** - * Call a method for each key in the database and - * call the callback method on it. + * Select a subset of the items in the datastore and call + * the given iterator for each of them. * - * @param plugin our plugin context + * @param cls our plugin context * @param type entries of which type should be considered? - * @param is_asc are we iterating in ascending order? - * @param is_prio are we iterating by priority (otherwise by expiration) - * @param is_migr are we iterating in migration order? - * @param limit_nonanonymous are we restricting results to those with anonymity - * level zero? - * @param stmt_str_1 first SQL statement to execute - * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration) + * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void -basic_iter (struct Plugin *plugin, - enum GNUNET_BLOCK_Type type, - int is_asc, - int is_prio, - int is_migr, - int limit_nonanonymous, - const char *stmt_str_1, - const char *stmt_str_2, - PluginIterator iter, - void *iter_cls) +sqlite_plugin_iter_zero_anonymity (void *cls, + enum GNUNET_BLOCK_Type type, + PluginIterator iter, + void *iter_cls) { + struct Plugin *plugin = cls; + struct GNUNET_TIME_Absolute now; struct NextContext *nc; struct IterContext *ic; sqlite3_stmt *stmt_1; sqlite3_stmt *stmt_2; + char *q; -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "At %llu, using queries `%s' and `%s'\n", - (unsigned long long) GNUNET_TIME_absolute_get ().abs_value, - stmt_str_1, - stmt_str_2); -#endif - if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK) + now = GNUNET_TIME_absolute_get (); + GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_1, + (unsigned long long) now.abs_value); + if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK) { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (q); return; } - if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK) + GNUNET_free (q); + GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_2, + (unsigned long long) now.abs_value); + if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK) { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); sqlite3_finalize (stmt_1); iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (q); return; } + GNUNET_free (q); nc = GNUNET_malloc (sizeof(struct NextContext) + sizeof(struct IterContext)); nc->plugin = plugin; @@ -1083,165 +1018,14 @@ basic_iter (struct Plugin *plugin, ic->stmt_1 = stmt_1; ic->stmt_2 = stmt_2; ic->type = type; - ic->is_asc = is_asc; - ic->is_prio = is_prio; - ic->is_migr = is_migr; - ic->limit_nonanonymous = limit_nonanonymous; nc->prep = &iter_next_prepare; nc->prep_cls = ic; - if (is_asc) - { - nc->lastPriority = 0; - nc->lastExpiration.abs_value = 0; - memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode)); - } - else - { - nc->lastPriority = 0x7FFFFFFF; - nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL; - memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode)); - } + nc->lastPriority = 0x7FFFFFFF; + memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode)); sqlite_next_request (nc, GNUNET_NO); } -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_low_priority (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - basic_iter (cls, - type, - GNUNET_YES, GNUNET_YES, - GNUNET_NO, GNUNET_NO, - SELECT_IT_LOW_PRIORITY_1, - SELECT_IT_LOW_PRIORITY_2, - iter, iter_cls); -} - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct GNUNET_TIME_Absolute now; - char *q1; - char *q2; - - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1, - (unsigned long long) now.abs_value); - GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2, - (unsigned long long) now.abs_value); - basic_iter (cls, - type, - GNUNET_NO, GNUNET_YES, - GNUNET_NO, GNUNET_YES, - q1, - q2, - iter, iter_cls); - GNUNET_free (q1); - GNUNET_free (q2); -} - - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_ascending_expiration (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct GNUNET_TIME_Absolute now; - char *q1; - char *q2; - - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1, - (unsigned long long) 0*now.abs_value); - GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2, - (unsigned long long) 0*now.abs_value); - basic_iter (cls, - type, - GNUNET_YES, GNUNET_NO, - GNUNET_NO, GNUNET_NO, - q1, q2, - iter, iter_cls); - GNUNET_free (q1); - GNUNET_free (q2); -} - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_migration_order (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct GNUNET_TIME_Absolute now; - char *q; - - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2, - (unsigned long long) now.abs_value); - basic_iter (cls, - type, - GNUNET_NO, GNUNET_NO, - GNUNET_YES, GNUNET_NO, - SELECT_IT_MIGRATION_ORDER_1, - q, - iter, iter_cls); - GNUNET_free (q); -} - - /** * Call sqlite using the already prepared query to get * the next result. @@ -1271,19 +1055,20 @@ all_next_prepare (void *cls, return GNUNET_SYSERR; } plugin = nc->plugin; - if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt))) - { - return GNUNET_OK; - } - if (ret != SQLITE_DONE) + ret = sqlite3_step (nc->stmt); + switch (ret) { + case SQLITE_ROW: + return GNUNET_OK; + case SQLITE_DONE: + return GNUNET_NO; + default: LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); return GNUNET_SYSERR; } - return GNUNET_NO; } @@ -1466,7 +1251,7 @@ sqlite_plugin_get (void *cls, GNUNET_assert (iter != NULL); if (key == NULL) { - sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls); + sqlite_plugin_iter_all_now (cls, type, iter, iter_cls); return; } GNUNET_snprintf (scratch, sizeof (scratch), @@ -1561,46 +1346,30 @@ sqlite_plugin_get (void *cls, /** - * Get a random item for replication. Returns a single, not expired, random item - * from those with the highest replication counters. The item's - * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Execute statement that gets a row and call the iterator + * with the result. Resets the statement afterwards. * - * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param plugin the plugin + * @param stmt the statement + * @param iter iterator to call + * @param iter_cls closure for 'iter' */ static void -sqlite_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +execute_get (struct Plugin *plugin, + sqlite3_stmt *stmt, + PluginIterator iter, void *iter_cls) { - struct Plugin *plugin = cls; int n; - sqlite3_stmt *stmt; struct GNUNET_TIME_Absolute expiration; unsigned long long rowid; + unsigned int size; + int ret; -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Getting random block based on replication order.\n"); -#endif - stmt = plugin->selRepl; - if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value)) - { - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; - } n = sqlite3_step (stmt); switch (n) { case SQLITE_ROW: + size = sqlite3_column_bytes (stmt, 5); rowid = sqlite3_column_int64 (stmt, 6); if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode)) { @@ -1611,24 +1380,30 @@ sqlite_plugin_replication_get (void *cls, LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - delete_by_rowid (plugin, rowid); + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); break; } expiration.abs_value = sqlite3_column_int64 (stmt, 3); - (void) iter (iter_cls, - NULL, - sqlite3_column_blob (stmt, 4) /* key */, - sqlite3_column_bytes (stmt, 5) /* size of data */, - sqlite3_column_blob (stmt, 5) /* data */, - sqlite3_column_int (stmt, 0) /* type */, - sqlite3_column_int (stmt, 1) /* priority */, - sqlite3_column_int (stmt, 2) /* anonymity */, - expiration, - rowid); + ret = iter (iter_cls, + NULL, + sqlite3_column_blob (stmt, 4) /* key */, + size, + sqlite3_column_blob (stmt, 5) /* data */, + sqlite3_column_int (stmt, 0) /* type */, + sqlite3_column_int (stmt, 1) /* priority */, + sqlite3_column_int (stmt, 2) /* anonymity */, + expiration, + rowid); if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + if ( (GNUNET_NO == ret) && + (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); return; case SQLITE_DONE: /* database must be empty */ @@ -1656,6 +1431,85 @@ sqlite_plugin_replication_get (void *cls, } +/** + * Get a random item for replication. Returns a single, not expired, random item + * from those with the highest replication counters. The item's + * replication counter is decremented by one IF it was positive before. + * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * + * @param cls closure + * @param iter function to call the value (once only). + * @param iter_cls closure for iter + */ +static void +sqlite_plugin_replication_get (void *cls, + PluginIterator iter, void *iter_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt; + struct GNUNET_TIME_Absolute now; + +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Getting random block based on replication order.\n"); +#endif + stmt = plugin->selRepl; + now = GNUNET_TIME_absolute_get (); + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value)) + { + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; + } + execute_get (plugin, stmt, iter, iter_cls); +} + + + +/** + * Get a random item that has expired or has low priority. + * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * + * @param cls closure + * @param iter function to call the value (once only). + * @param iter_cls closure for iter + */ +static void +sqlite_plugin_expiration_get (void *cls, + PluginIterator iter, void *iter_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt; + struct GNUNET_TIME_Absolute now; + +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Getting random block based on expiration and priority order.\n"); +#endif + now = GNUNET_TIME_absolute_get (); + stmt = plugin->selExpi; + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value)) + { + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; + } + execute_get (plugin, stmt, iter, iter_cls); +} + + /** * Drop database. * @@ -1669,6 +1523,12 @@ sqlite_plugin_drop (void *cls) } +/** + * FIXME. + * + * @param cls the 'struct Plugin' + * @return the size of the database on disk (estimate) + */ static unsigned long long sqlite_plugin_get_size (void *cls) { @@ -1749,11 +1609,9 @@ libgnunet_plugin_datastore_sqlite_init (void *cls) api->next_request = &sqlite_next_request; api->get = &sqlite_plugin_get; api->replication_get = &sqlite_plugin_replication_get; + api->expiration_get = &sqlite_plugin_expiration_get; api->update = &sqlite_plugin_update; - api->iter_low_priority = &sqlite_plugin_iter_low_priority; api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity; - api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration; - api->iter_migration_order = &sqlite_plugin_iter_migration_order; api->iter_all_now = &sqlite_plugin_iter_all_now; api->drop = &sqlite_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c index fc67f600e..41d92a117 100644 --- a/src/datastore/plugin_datastore_template.c +++ b/src/datastore/plugin_datastore_template.c @@ -154,6 +154,22 @@ template_plugin_replication_get (void *cls, } +/** + * Get a random item for expiration. + * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * + * @param cls closure + * @param iter function to call the value (once only). + * @param iter_cls closure for iter + */ +static void +template_plugin_expiration_get (void *cls, + PluginIterator iter, void *iter_cls) +{ + GNUNET_break (0); +} + + /** * Update the priority for a particular key in the datastore. If * the expiration time in value is different than the time found in @@ -189,28 +205,6 @@ template_plugin_update (void *cls, } -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -template_plugin_iter_low_priority (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - GNUNET_break (0); -} - - - /** * Select a subset of the items in the datastore and call * the given iterator for each of them. @@ -232,51 +226,6 @@ template_plugin_iter_zero_anonymity (void *cls, } - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -template_plugin_iter_ascending_expiration (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - GNUNET_break (0); -} - - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -template_plugin_iter_migration_order (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - GNUNET_break (0); -} - - - /** * Select a subset of the items in the datastore and call * the given iterator for each of them. @@ -330,11 +279,9 @@ libgnunet_plugin_datastore_template_init (void *cls) api->next_request = &template_plugin_next_request; api->get = &template_plugin_get; api->replication_get = &template_plugin_replication_get; + api->expiration_get = &template_plugin_expiration_get; api->update = &template_plugin_update; - api->iter_low_priority = &template_plugin_iter_low_priority; api->iter_zero_anonymity = &template_plugin_iter_zero_anonymity; - api->iter_ascending_expiration = &template_plugin_iter_ascending_expiration; - api->iter_migration_order = &template_plugin_iter_migration_order; api->iter_all_now = &template_plugin_iter_all_now; api->drop = &template_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c index 965f05687..6280907ad 100644 --- a/src/datastore/test_datastore_api.c +++ b/src/datastore/test_datastore_api.c @@ -210,7 +210,7 @@ check_value (void *cls, GNUNET_assert (priority == get_priority (i)); GNUNET_assert (anonymity == get_anonymity(i)); GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value); - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); } @@ -249,7 +249,7 @@ delete_value (void *cls, crc->key = *key; crc->data = GNUNET_malloc (size); memcpy (crc->data, data, size); - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); } @@ -329,7 +329,7 @@ check_multiple (void *cls, #endif if (priority == get_priority (42)) crc->uid = uid; - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); } @@ -370,7 +370,7 @@ check_update (void *cls, } else GNUNET_assert (size == get_size (43)); - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); } @@ -420,12 +420,12 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_value, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_value, + crc); break; case RP_DEL: crc->i--; @@ -437,12 +437,12 @@ run_continuation (void *cls, #endif crc->data = NULL; GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &delete_value, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &delete_value, + crc); break; case RP_DO_DEL: #if VERBOSE @@ -477,12 +477,12 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_nothing, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_nothing, + crc); break; case RP_RESERVE: crc->phase = RP_PUT_MULTIPLE; @@ -526,12 +526,12 @@ run_continuation (void *cls, crc); break; case RP_GET_MULTIPLE: - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (42), - 1, 1, TIMEOUT, - &check_multiple, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (42), + 1, 1, TIMEOUT, + &check_multiple, + crc); break; case RP_GET_MULTIPLE_NEXT: case RP_GET_MULTIPLE_DONE: @@ -549,12 +549,12 @@ run_continuation (void *cls, crc); break; case RP_UPDATE_VALIDATE: - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (42), - 1, 1, TIMEOUT, - &check_update, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (42), + 1, 1, TIMEOUT, + &check_update, + crc); break; case RP_UPDATE_DONE: GNUNET_assert (0); diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c index 50a426af6..5dfb5cea7 100644 --- a/src/datastore/test_datastore_api_management.c +++ b/src/datastore/test_datastore_api_management.c @@ -181,7 +181,7 @@ check_value (void *cls, GNUNET_assert (priority == get_priority (i)); GNUNET_assert (anonymity == get_anonymity(i)); GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value); - GNUNET_DATASTORE_get_next (datastore); + GNUNET_DATASTORE_iterate_get_next (datastore); } @@ -254,12 +254,12 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_value, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_value, + crc); break; case RP_GET_FAIL: #if VERBOSE @@ -269,12 +269,12 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_get (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_nothing, - crc); + GNUNET_DATASTORE_iterate_key (datastore, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_nothing, + crc); break; case RP_DONE: GNUNET_assert (0 == crc->i); diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index caf534140..35d89c50f 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -1297,7 +1297,7 @@ process_migration_content (void *cls, MIN_MIGRATION_CONTENT_LIFETIME.rel_value) { /* content will expire soon, don't bother */ - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); return; } if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) @@ -1309,7 +1309,7 @@ process_migration_content (void *cls, &process_migration_content, NULL)) { - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); } return; } @@ -1333,7 +1333,7 @@ process_migration_content (void *cls, GNUNET_CONTAINER_multihashmap_iterate (connected_peers, &consider_migration, mb); - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); } @@ -1344,7 +1344,7 @@ static void dht_put_continuation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); } @@ -1455,10 +1455,10 @@ gather_dht_put_blocks (void *cls, { if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; - dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - dht_put_type++, - &process_dht_put_content, NULL); + dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (dsh, 0, UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + dht_put_type++, + &process_dht_put_content, NULL); GNUNET_assert (dht_qe != NULL); } } @@ -3991,7 +3991,7 @@ process_local_reply (void *cls, pr)) if (pr->qe != NULL) { - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); } return; } @@ -4014,7 +4014,7 @@ process_local_reply (void *cls, -1, -1, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); return; } prq.type = type; @@ -4033,7 +4033,7 @@ process_local_reply (void *cls, if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) { pr->local_only = GNUNET_YES; /* do not forward */ - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); return; } if ( (pr->client_request_list == NULL) && @@ -4048,10 +4048,10 @@ process_local_reply (void *cls, gettext_noop ("# processing result set cut short due to load"), 1, GNUNET_NO); - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); return; } - GNUNET_DATASTORE_get_next (dsh); + GNUNET_DATASTORE_iterate_get_next (dsh); } @@ -4412,14 +4412,14 @@ handle_p2p_get (void *cls, "Handing request for `%s' to datastore\n", GNUNET_h2s (&gm->query)); #endif - pr->qe = GNUNET_DATASTORE_get (dsh, - &gm->query, - type, - pr->priority + 1, - MAX_DATASTORE_QUEUE, - timeout, - &process_local_reply, - pr); + pr->qe = GNUNET_DATASTORE_iterate_key (dsh, + &gm->query, + type, + pr->priority + 1, + MAX_DATASTORE_QUEUE, + timeout, + &process_local_reply, + pr); if (NULL == pr->qe) { GNUNET_STATISTICS_update (stats, @@ -4617,13 +4617,13 @@ handle_start_search (void *cls, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */ - pr->qe = GNUNET_DATASTORE_get (dsh, - &sm->query, - type, - -3, -1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &process_local_reply, - pr); + pr->qe = GNUNET_DATASTORE_iterate_key (dsh, + &sm->query, + type, + -3, -1, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &process_local_reply, + pr); } diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index c44a658df..16389e130 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -1047,7 +1047,7 @@ process_local_reply (void *cls, pr)) { if (pr->qe != NULL) - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); } return; } @@ -1070,7 +1070,7 @@ process_local_reply (void *cls, -1, -1, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); return; } prq.type = type; @@ -1112,7 +1112,7 @@ process_local_reply (void *cls, } return; } - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); } @@ -1132,20 +1132,20 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, GNUNET_assert (NULL == pr->llc_cont); pr->llc_cont = cont; pr->llc_cont_cls = cont_cls; - pr->qe = GNUNET_DATASTORE_get (GSF_dsh, - &pr->public_data.query, - pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK - ? GNUNET_BLOCK_TYPE_ANY - : pr->public_data.type, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* queue priority */, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* max queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_local_reply, - pr); + pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh, + &pr->public_data.query, + pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK + ? GNUNET_BLOCK_TYPE_ANY + : pr->public_data.type, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* queue priority */, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* max queue size */, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, + pr); } diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 094489273..c08b57020 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -507,7 +507,7 @@ process_migration_content (void *cls, MIN_MIGRATION_CONTENT_LIFETIME.rel_value) { /* content will expire soon, don't bother */ - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); return; } if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) @@ -519,7 +519,7 @@ process_migration_content (void *cls, &process_migration_content, NULL)) { - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); } return; } @@ -556,7 +556,7 @@ process_migration_content (void *cls, } pos = pos->next; } - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); } diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index 5fd2ce81c..121a90bcd 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -109,7 +109,7 @@ static void dht_put_continuation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_DATASTORE_get_next (GSF_dsh); + GNUNET_DATASTORE_iterate_get_next (GSF_dsh); } @@ -198,11 +198,11 @@ gather_dht_put_blocks (void *cls, return; if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; - dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, - 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - dht_put_type++, - &process_dht_put_content, NULL); + dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh, + 0, UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + dht_put_type++, + &process_dht_put_content, NULL); GNUNET_assert (dht_qe != NULL); } diff --git a/src/include/gnunet_datastore_plugin.h b/src/include/gnunet_datastore_plugin.h index c981ceb1d..34a659163 100644 --- a/src/include/gnunet_datastore_plugin.h +++ b/src/include/gnunet_datastore_plugin.h @@ -162,7 +162,7 @@ typedef int (*PluginPut) (void *cls, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, - char **msg); + char **msg); /** @@ -187,26 +187,25 @@ typedef int (*PluginPut) (void *cls, * @param iter_cls closure for iter */ typedef void (*PluginGet) (void *cls, - const GNUNET_HashCode * key, - const GNUNET_HashCode * vhash, + const GNUNET_HashCode *key, + const GNUNET_HashCode *vhash, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls); /** - * Get a random item for replication. Returns a single, - * not expired, random item - * from those with the highest replication counters. The item's - * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Get a random item (additional constraints may apply depending on + * the specific implementation). Calls 'iter' with all values ZERO or + * NULL if no item applies, otherwise 'iter' is called once and only + * once with an item, with the 'next_cls' argument being NULL. * * @param cls closure * @param iter function to call the value (once only). * @param iter_cls closure for iter */ -typedef void (*PluginReplicationGet) (void *cls, - PluginIterator iter, void *iter_cls); +typedef void (*PluginRandomGet) (void *cls, + PluginIterator iter, void *iter_cls); /** @@ -234,13 +233,16 @@ typedef void (*PluginReplicationGet) (void *cls, */ typedef int (*PluginUpdate) (void *cls, uint64_t uid, - int delta, struct GNUNET_TIME_Absolute expire, + int delta, + struct GNUNET_TIME_Absolute expire, char **msg); /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * Select a subset of the items in the datastore and call the given + * iterator for the first item; then allow getting more items by + * calling the 'next_request' callback with the given 'next_cls' + * argument passed to 'iter'. * * @param cls closure * @param type entries of which type should be considered? @@ -258,6 +260,7 @@ typedef void (*PluginSelector) (void *cls, PluginIterator iter, void *iter_cls); + /** * Drop database. * @@ -307,9 +310,18 @@ struct GNUNET_DATASTORE_PluginFunctions /** * Function to get a random item with high replication score from - * the database, lowering the item's replication score. + * the database, lowering the item's replication score. Returns a + * single, not expired, random item from those with the highest + * replication counters. The item's replication counter is + * decremented by one IF it was positive before. + */ + PluginRandomGet replication_get; + + /** + * Function to get a random expired item or, if none are expired, one + * with a low priority. */ - PluginReplicationGet replication_get; + PluginRandomGet expiration_get; /** * Update the priority for a particular key in the datastore. If @@ -322,30 +334,10 @@ struct GNUNET_DATASTORE_PluginFunctions PluginUpdate update; /** - * Iterate over the items in the datastore in ascending - * order of priority. - */ - PluginSelector iter_low_priority; - - /** - * Iterate over content with anonymity zero. + * Iterate over content with anonymity level zero. */ PluginSelector iter_zero_anonymity; - /** - * Iterate over the items in the datastore in ascending order of - * expiration time. - */ - PluginSelector iter_ascending_expiration; - - /** - * Iterate over the items in the datastore in migration - * order. Call the given function on the next item only - * (and then signal 'end' with a second call). This is - * a significant difference from all the other iterators! - */ - PluginSelector iter_migration_order; - /** * Iterate over all the items in the datastore * as fast as possible in a single transaction diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h index 1e2e9e050..284d544f5 100644 --- a/src/include/gnunet_datastore_service.h +++ b/src/include/gnunet_datastore_service.h @@ -82,11 +82,11 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, * @param cls closure * @param success GNUNET_SYSERR on failure, * GNUNET_NO on timeout/queue drop - * GNUNET_YES on success + * GNUNET_YES (or other positive value) on success * @param msg NULL on success, otherwise an error message */ typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, - int success, + int32_t success, const char *msg); @@ -148,7 +148,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - int rid, + uint32_t rid, const GNUNET_HashCode * key, size_t size, const void *data, @@ -187,7 +187,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - int rid, + uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, @@ -214,7 +214,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, - unsigned long long uid, + uint64_t uid, uint32_t priority, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, @@ -287,7 +287,7 @@ typedef void (*GNUNET_DATASTORE_Iterator) (void *cls, * in the datastore. The iterator will only be called * once initially; if the first call did contain a * result, further results can be obtained by calling - * "GNUNET_DATASTORE_get_next" with the given argument. + * "GNUNET_DATASTORE_iterate_get_next" with the given argument. * * @param h handle to the datastore * @param key maybe NULL (to match all entries) @@ -304,24 +304,54 @@ typedef void (*GNUNET_DATASTORE_Iterator) (void *cls, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls); +GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls); + + +/** + * Get all zero-anonymity values from the datastore. + * + * @param h handle to the datastore + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param type allowed type for the operation (ANY for 'all types') + * @param iter function to call on a random value; it + * will be called once with a value (if available) + * and always once with a value of NULL at the end. + * @param iter_cls closure for iter + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) + */ +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls); /** * Function called to trigger obtaining the next result - * from the datastore. + * from the datastore. ONLY applies for 'GNUNET_DATASTORE_iterate_*' + * calls, not for 'get' calls. FIXME: how much mixing of iterate + * calls with other operations can we permit!? Should we pass + * the 'QueueEntry' instead of the datastore handle here instead? * * @param h handle to the datastore */ void -GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h); +GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h); /** @@ -353,32 +383,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, void *iter_cls); -/** - * Get a zero-anonymity value from the datastore. - * - * @param h handle to the datastore - * @param queue_priority ranking of this request in the priority queue - * @param max_queue_size at what queue size should this request be dropped - * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response - * @param type allowed type for the operation - * @param iter function to call on a random value; it - * will be called once with a value (if available) - * and always once with a value of NULL. - * @param iter_cls closure for iter - * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) - */ -struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_BLOCK_Type type, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls); - /** * Cancel a datastore operation. The final callback from the