From 57924abe125fa52dce4dc2e84cb0a7c4dcd04579 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 28 Aug 2010 21:34:36 +0000 Subject: [PATCH] train hacks --- TODO | 2 +- src/datastore/gnunet-service-datastore.c | 143 +++++++++++++- src/datastore/perf_plugin_datastore.c | 16 ++ src/datastore/plugin_datastore.h | 28 ++- src/datastore/plugin_datastore_mysql.c | 46 +++-- src/datastore/plugin_datastore_postgres.c | 158 ++++++++++++--- src/datastore/plugin_datastore_sqlite.c | 230 +++++----------------- src/include/gnunet_scheduler_lib.h | 2 +- src/peerinfo/Makefile.am | 10 +- src/peerinfo/gnunet-service-peerinfo.c | 195 +++++++++--------- src/peerinfo/perf_peerinfo_api.c | 217 ++++++++++++++++++++ src/peerinfo/test_peerinfo_api.c | 2 +- src/peerinfo/test_peerinfo_api_data.conf | 3 +- src/util/network.c | 1 - src/util/scheduler.c | 214 ++++++++++++++++++-- 15 files changed, 920 insertions(+), 347 deletions(-) create mode 100755 src/peerinfo/perf_peerinfo_api.c diff --git a/TODO b/TODO index c51b4a6c3..8ac9a592c 100644 --- a/TODO +++ b/TODO @@ -115,7 +115,7 @@ * DV: [Nate] - proper bandwidth allocation - performance tests -* PEERINFO: +* PEERINFO: - merge multiple HELLOs of the same peer in the transmission queue (theoretically reduces overhead; bounds message queue size) - merge multiple iteration requests over "all" peers in the queue diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 40ea153de..01778b1b7 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -42,6 +42,13 @@ */ #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) +#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore") + +/** + * After how many payload-changing operations + * do we sync our statistics? + */ +#define MAX_STAT_SYNC_LAG 50 /** @@ -109,6 +116,7 @@ struct ReservationList }; + /** * Our datastore plugin (NULL if not available). */ @@ -141,6 +149,24 @@ static unsigned long long cache_size; * How much space have we currently reserved? */ static unsigned long long reserved; + +/** + * How much data are we currently storing + * in the database? + */ +static unsigned long long payload; + +/** + * Number of updates that were made to the + * payload value since we last synchronized + * it with the statistics service. + */ +static unsigned int lastSync; + +/** + * Did we get an answer from statistics? + */ +static int stats_worked; /** * Identity of the task that is used to delete @@ -164,6 +190,23 @@ struct GNUNET_SCHEDULER_Handle *sched; static struct GNUNET_STATISTICS_Handle *stats; +/** + * Synchronize our utilization statistics with the + * statistics service. + */ +static void +sync_stats () +{ + GNUNET_STATISTICS_set (stats, + QUOTA_STAT_NAME, + payload, + GNUNET_YES); + lastSync = 0; +} + + + + /** * Function called once the transmit operation has * either failed or succeeded. @@ -241,6 +284,12 @@ static struct TransmitCallbackContext *tcc_tail; */ static int cleaning_done; +/** + * Handle for pending get request. + */ +static struct GNUNET_STATISTICS_GetHandle *stat_get; + + /** * Task that is used to remove expired entries from * the datastore. This task will schedule itself @@ -731,7 +780,7 @@ handle_reserve (void *cls, #endif amount = GNUNET_ntohll(msg->amount); entries = ntohl(msg->entries); - used = plugin->api->get_size (plugin->api->cls) + reserved; + used = payload + reserved; req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries; if (used + req > quota) { @@ -931,13 +980,13 @@ execute_put (struct GNUNET_SERVER_Client *client, (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, msg); GNUNET_free_non_null (msg); - if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls)) + if (quota - reserved - cache_size < payload) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Need %llu bytes more space (%llu allowed, using %llu)\n"), (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD, (unsigned long long) (quota - reserved - cache_size), - (unsigned long long) plugin->api->get_size (plugin->api->cls)); + (unsigned long long) payload); manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD); } } @@ -1350,6 +1399,78 @@ handle_drop (void *cls, } +/** + * Function called by plugins to notify us about a + * change in their disk utilization. + * + * @param cls closure (NULL) + * @param delta change in disk utilization, + * 0 for "reset to empty" + */ +static void +disk_utilization_change_cb (void *cls, + int delta) +{ + if ( (delta < 0) && + (payload < -delta) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"), + (long long) payload, + (long long) -delta); + payload = plugin->api->get_size (plugin->api->cls); + sync_stats (); + return; + } + payload += delta; + lastSync++; + if (lastSync >= MAX_STAT_SYNC_LAG) + sync_stats (); +} + + +/** + * Callback function to process statistic values. + * + * @param cls closure (struct Plugin*) + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not + * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration + */ +static int +process_stat_in (void *cls, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + GNUNET_assert (stats_worked == GNUNET_NO); + stats_worked = GNUNET_YES; + payload += value; +#if DEBUG_SQLITE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notification from statistics about existing payload (%llu), new payload is %llu\n", + value, + payload); +#endif + return GNUNET_OK; +} + + +static void +process_stat_done (void *cls, + int success) +{ + struct DatastorePlugin *plugin = cls; + + stat_get = NULL; + if (stats_worked == GNUNET_NO) + payload = plugin->api->get_size (plugin->api->cls); +} + + /** * Load the datastore plugin. */ @@ -1373,6 +1494,8 @@ load_plugin () ret = GNUNET_malloc (sizeof(struct DatastorePlugin)); ret->env.cfg = cfg; ret->env.sched = sched; + ret->env.duc = &disk_utilization_change_cb; + ret->env.cls = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"), name); GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name); @@ -1426,6 +1549,13 @@ unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_CONTAINER_bloomfilter_free (filter); filter = NULL; } + if (lastSync > 0) + sync_stats (); + if (stat_get != NULL) + { + GNUNET_STATISTICS_get_cancel (stat_get); + stat_get = NULL; + } if (stats != NULL) { GNUNET_STATISTICS_destroy (stats, GNUNET_YES); @@ -1614,6 +1744,13 @@ run (void *cls, } return; } + stat_get = GNUNET_STATISTICS_get (stats, + "datastore", + QUOTA_STAT_NAME, + GNUNET_TIME_UNIT_SECONDS, + &process_stat_done, + &process_stat_in, + plugin); GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL); GNUNET_SERVER_add_handlers (server, handlers); expired_kill_task diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c index 17cd009bc..4d39d85ac 100644 --- a/src/datastore/perf_plugin_datastore.c +++ b/src/datastore/perf_plugin_datastore.c @@ -81,6 +81,20 @@ struct CpsRunContext }; +/** + * Function called by plugins to notify us about a + * change in their disk utilization. + * + * @param cls closure (NULL) + * @param delta change in disk utilization, + * 0 for "reset to empty" + */ +static void +disk_utilization_change_cb (void *cls, + int delta) +{ +} + static void putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k) @@ -331,6 +345,8 @@ load_plugin (const struct GNUNET_CONFIGURATION_Handle *cfg, } env.cfg = cfg; env.sched = sched; + env.duc = &disk_utilization_change_cb; + env.cls = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"), name); GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name); diff --git a/src/datastore/plugin_datastore.h b/src/datastore/plugin_datastore.h index e8f433671..fa08501cc 100644 --- a/src/datastore/plugin_datastore.h +++ b/src/datastore/plugin_datastore.h @@ -22,10 +22,6 @@ * @file datastore/plugin_datastore.h * @brief API for the database backend plugins. * @author Christian Grothoff - * - * TODO: - * - consider defining enumeration or at least typedef - * for the type of "type" (instead of using uint32_t) */ #ifndef PLUGIN_DATASTORE_H #define PLUGIN_DATASTORE_H @@ -39,11 +35,23 @@ /** * How many bytes of overhead will we assume per entry - * in the SQlite DB? + * in any DB (for reservations)? */ #define GNUNET_DATASTORE_ENTRY_OVERHEAD 256 +/** + * Function invoked to notify service of disk utilization + * changes. + * + * @param cls closure + * @param delta change in disk utilization, + * 0 for "reset to empty" + */ +typedef void (*DiskUtilizationChange)(void *cls, + int delta); + + /** * The datastore service will pass a pointer to a struct * of this type as the first and only argument to the @@ -61,6 +69,16 @@ struct GNUNET_DATASTORE_PluginEnvironment */ struct GNUNET_SCHEDULER_Handle *sched; + /** + * Function to call on disk utilization change. + */ + DiskUtilizationChange duc; + + /** + * Closure. + */ + void *cls; + }; diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index c216e989c..ea6cc2322 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -191,7 +191,6 @@ " ORDER BY expire DESC,vkey DESC LIMIT 1)"\ "ORDER BY expire DESC,vkey DESC LIMIT 1" -// #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072" struct GNUNET_MysqlStatementHandle @@ -344,12 +343,10 @@ struct Plugin #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?" struct GNUNET_MysqlStatementHandle *update_entry; - struct GNUNET_MysqlStatementHandle *iter[4]; +#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072" + struct GNUNET_MysqlStatementHandle *get_size; - /** - * Size of the mysql database on disk. - */ - unsigned long long content_size; + struct GNUNET_MysqlStatementHandle *iter[4]; }; @@ -957,8 +954,11 @@ do_delete_entry_by_vkey (struct Plugin *plugin, return ret; } + static int -return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values) +return_ok (void *cls, + unsigned int num_values, + MYSQL_BIND * values) { return GNUNET_OK; } @@ -1189,7 +1189,9 @@ mysql_next_request_cont (void *next_cls, { do_delete_value (plugin, vkey); do_delete_entry_by_vkey (plugin, vkey); - plugin->content_size -= length; + if (length != 0) + plugin->env->duc (plugin->env->cls, + - length); } return; END_SET: @@ -1279,14 +1281,29 @@ iterateHelper (struct Plugin *plugin, * Get an estimate of how much space the database is * currently using. * - * @param cls our "struct Plugin*" + * @param cls our "struct Plugin *" * @return number of bytes used on disk */ static unsigned long long mysql_plugin_get_size (void *cls) { struct Plugin *plugin = cls; - return plugin->content_size; + MYSQL_BIND cbind[1]; + long long total; + + memset (cbind, 0, sizeof (cbind)); + total = 0; + cbind[0].buffer_type = MYSQL_TYPE_LONGLONG; + cbind[0].buffer = &total; + cbind[0].is_unsigned = GNUNET_NO; + if (GNUNET_OK != + prepared_statement_run_select (plugin, + plugin->get_size, + 1, cbind, + &return_ok, NULL, + -1)) + return 0; + return total; } @@ -1373,7 +1390,9 @@ mysql_plugin_put (void *cls, vkey, (unsigned int) size); #endif - plugin->content_size += size; + if (size > 0) + plugin->env->duc (plugin->env->cls, + size); return GNUNET_OK; } @@ -1804,8 +1823,8 @@ mysql_plugin_drop (void *cls) "DROP TABLE gn090")) || (GNUNET_OK != run_statement (plugin, "DROP TABLE gn072"))) - return; /* error */ - plugin->content_size = 0; + return; /* error */ + plugin->env->duc (plugin->env->cls, 0); } @@ -1865,6 +1884,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls) || PINIT (plugin->select_entry_by_hash_vhash_and_type, SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) + || PINIT (plugin->get_size, SELECT_SIZE) || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH) || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE) || PINIT (plugin->count_entry_by_hash_vhash_and_type, diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index 889309784..54bdde542 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c @@ -82,26 +82,104 @@ #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS +/** + * Closure for 'postgres_next_request_cont'. + */ struct NextRequestClosure { + /** + * Global plugin data. + */ struct Plugin *plugin; + + /** + * Function to call for each matching entry. + */ PluginIterator iter; + + /** + * Closure for 'iter'. + */ void *iter_cls; + + /** + * Parameters for the prepared statement. + */ const char *paramValues[5]; + + /** + * Name of the prepared statement to run. + */ const char *pname; + + /** + * Size of values pointed to by paramValues. + */ int paramLengths[5]; + + /** + * Number of paramters in paramValues/paramLengths. + */ int nparams; + + /** + * Current time (possible parameter), big-endian. + */ uint64_t bnow; + + /** + * Key (possible parameter) + */ GNUNET_HashCode key; + + /** + * Hash of value (possible parameter) + */ GNUNET_HashCode vhash; + + /** + * Number of entries found so far + */ long long count; + + /** + * Offset this iteration starts at. + */ uint64_t off; + + /** + * Current offset to use in query, big-endian. + */ uint64_t blimit_off; + + /** + * Overall number of matching entries. + */ unsigned long long total; + + /** + * Expiration value of previous result (possible parameter), big-endian. + */ uint64_t blast_expire; + + /** + * Row ID of last result (possible paramter), big-endian. + */ uint32_t blast_rowid; + + /** + * Priority of last result (possible parameter), big-endian. + */ uint32_t blast_prio; + + /** + * Type of block (possible paramter), big-endian. + */ uint32_t btype; + + /** + * Flag set to GNUNET_YES to stop iteration. + */ int end_it; }; @@ -131,10 +209,6 @@ struct Plugin */ GNUNET_SCHEDULER_TaskIdentifier next_task; - unsigned long long payload; - - unsigned int lastSync; - }; @@ -143,6 +217,12 @@ struct Plugin * the desired status code. If not, log an error, clear the * result and return GNUNET_SYSERR. * + * @param plugin global context + * @param ret result to check + * @param expected_status expected return value + * @param command name of SQL command that was run + * @param args arguments to SQL command + * @param line line number for error reporting * @return GNUNET_OK if the result is acceptable */ static int @@ -173,6 +253,10 @@ check_result (struct Plugin *plugin, /** * Run simple SQL statement (without results). + * + * @param plugin global context + * @param sql statement to run + * @param line code line for error reporting */ static int pq_exec (struct Plugin *plugin, @@ -190,6 +274,12 @@ pq_exec (struct Plugin *plugin, /** * Prepare SQL statement. + * + * @param plugin global context + * @param sql SQL code to prepare + * @param nparams number of parameters in sql + * @param line code line for error reporting + * @return GNUNET_OK on success */ static int pq_prepare (struct Plugin *plugin, @@ -207,6 +297,8 @@ pq_prepare (struct Plugin *plugin, /** * @brief Get a database handle + * + * @param plugin global context * @return GNUNET_OK on success, GNUNET_SYSERR on error */ static int @@ -413,6 +505,8 @@ init_connection (struct Plugin *plugin) * Delete the row identified by the given rowid (qid * in postgres). * + * @param plugin global context + * @param rowid which row to delete * @return GNUNET_OK on success */ static int @@ -450,11 +544,32 @@ static unsigned long long postgres_plugin_get_size (void *cls) { struct Plugin *plugin = cls; - double ret; + unsigned long long total; + PGresult *ret; - ret = plugin->payload; - return (unsigned long long) (ret * 1.00); - /* benchmarking shows XX% overhead */ + ret = PQexecParams (plugin->dbh, + "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", + 0, NULL, NULL, NULL, NULL, 1); + if (GNUNET_OK != check_result (plugin, + ret, + PGRES_TUPLES_OK, + "PQexecParams", + "get_size", + __LINE__)) + { + return 0; + } + if ((PQntuples (ret) != 1) || + (PQnfields (ret) != 1) || + (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))) + { + GNUNET_break (0); + PQclear (ret); + return 0; + } + total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); + PQclear (ret); + return total; } @@ -518,13 +633,12 @@ postgres_plugin_put (void *cls, "PQexecPrepared", "put", __LINE__)) return GNUNET_SYSERR; PQclear (ret); - plugin->payload += size; + plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD); #if DEBUG_POSTGRES GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", - "Stored %u bytes in database, new payload is %llu\n", - (unsigned int) size, - (unsigned long long) plugin->payload); + "Stored %u bytes in database\n", + (unsigned int) size); #endif return GNUNET_OK; } @@ -695,18 +809,16 @@ postgres_next_request_cont (void *next_cls, #if DEBUG_POSTGRES GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", - "Deleting %u bytes from database, current payload is %llu\n", - (unsigned int) size, - (unsigned long long) plugin->payload); + "Deleting %u bytes from database\n", + (unsigned int) size); #endif - GNUNET_assert (plugin->payload >= size); - plugin->payload -= size; + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); #if DEBUG_POSTGRES GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", - "Deleted %u bytes from database, new payload is %llu\n", - (unsigned int) size, - (unsigned long long) plugin->payload); + "Deleted %u bytes from database\n", + (unsigned int) size); #endif } } @@ -803,11 +915,15 @@ postgres_plugin_update (void *cls, * Call a method for each key in the database and * call the callback method on it. * + * @param plugin global context * @param type entries of which type should be considered? + * @param is_asc ascending or descending iteration? + * @param iter_select which SELECT method should be used? * @param iter maybe NULL (to just count); iter * should return GNUNET_SYSERR to abort the * iteration, GNUNET_NO to delete the entry and * continue and GNUNET_OK to continue iterating + * @param iter_cls closure for 'iter' */ static void postgres_iterate (struct Plugin *plugin, @@ -1123,7 +1239,6 @@ postgres_plugin_iter_ascending_expiration (void *cls, } - /** * Select a subset of the items in the datastore and call * the given iterator for each of them. @@ -1148,7 +1263,6 @@ postgres_plugin_iter_migration_order (void *cls, } - /** * Select a subset of the items in the datastore and call * the given iterator for each of them. diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index ca1f4e4ae..076d468ee 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -25,19 +25,11 @@ */ #include "platform.h" -#include "gnunet_statistics_service.h" #include "plugin_datastore.h" #include #define DEBUG_SQLITE GNUNET_NO -/** - * After how many payload-changing operations - * do we sync our statistics? - */ -#define MAX_STAT_SYNC_LAG 50 - -#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore") /** * Log an error message at log-level 'level' that indicates @@ -122,16 +114,6 @@ struct Plugin */ sqlite3_stmt *insertContent; - /** - * Handle to the statistics service. - */ - struct GNUNET_STATISTICS_Handle *statistics; - - /** - * Handle for pending get request. - */ - struct GNUNET_STATISTICS_GetHandle *stat_get; - /** * Closure of the 'next_task' (must be freed if 'next_task' is cancelled). */ @@ -141,29 +123,12 @@ struct Plugin * Pending task with scheduler for running the next request. */ GNUNET_SCHEDULER_TaskIdentifier next_task; - - /** - * How much data are we currently storing - * in the database? - */ - unsigned long long payload; - - /** - * Number of updates that were made to the - * payload value since we last synchronized - * it with the statistics service. - */ - unsigned int lastSync; /** * Should the database be dropped on shutdown? */ int drop_on_shutdown; - /** - * Did we get an answer from statistics? - */ - int stats_worked; }; @@ -267,12 +232,7 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, return GNUNET_SYSERR; } /* database is new or got deleted, reset payload to zero! */ - if (plugin->stat_get != NULL) - { - GNUNET_STATISTICS_get_cancel (plugin->stat_get); - plugin->stat_get = NULL; - } - plugin->payload = 0; + plugin->env->duc (plugin->env->cls, 0); } plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir), #ifdef ENABLE_NLS @@ -374,22 +334,6 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, } -/** - * Synchronize our utilization statistics with the - * statistics service. - * @param plugin the plugin context (state for this module) - */ -static void -sync_stats (struct Plugin *plugin) -{ - GNUNET_STATISTICS_set (plugin->statistics, - QUOTA_STAT_NAME, - plugin->payload, - GNUNET_YES); - plugin->lastSync = 0; -} - - /** * Shutdown database connection and associate data * structures. @@ -398,8 +342,6 @@ sync_stats (struct Plugin *plugin) static void database_shutdown (struct Plugin *plugin) { - if (plugin->lastSync > 0) - sync_stats (plugin); if (plugin->updPrio != NULL) sqlite3_finalize (plugin->updPrio); if (plugin->insertContent != NULL) @@ -409,20 +351,6 @@ database_shutdown (struct Plugin *plugin) } -/** - * Get an estimate of how much space the database is - * currently using. - * - * @param cls our plugin context - * @return number of bytes used on disk - */ -static unsigned long long sqlite_plugin_get_size (void *cls) -{ - struct Plugin *plugin = cls; - return plugin->payload; -} - - /** * Delete the database entry with the given * row identifier. @@ -661,23 +589,15 @@ sqlite_next_request_cont (void *cls, if ( (ret == GNUNET_NO) && (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) { - if (plugin->payload >= size + GNUNET_DATASTORE_ENTRY_OVERHEAD) - plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD); - else - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Datastore payload inaccurate, please fix and restart!\n")); - plugin->lastSync++; + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); #if DEBUG_SQLITE - if (ret == GNUNET_NO) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Removed entry %llu (%u bytes), new payload is %llu\n", - (unsigned long long) rowid, - size + GNUNET_DATASTORE_ENTRY_OVERHEAD, - plugin->payload); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Removed entry %llu (%u bytes)\n", + (unsigned long long) rowid, + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); #endif - if (plugin->lastSync >= MAX_STAT_SYNC_LAG) - sync_stats (plugin); } } @@ -798,17 +718,14 @@ sqlite_plugin_put (void *cls, LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - plugin->lastSync++; - plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD; + plugin->env->duc (plugin->env->cls, + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); #if DEBUG_SQLITE GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", - "Stored new entry (%u bytes), new payload is %llu\n", - size + GNUNET_DATASTORE_ENTRY_OVERHEAD, - plugin->payload); + "Stored new entry (%u bytes)\n", + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); #endif - if (plugin->lastSync >= MAX_STAT_SYNC_LAG) - sync_stats (plugin); return GNUNET_OK; } @@ -1574,81 +1491,50 @@ sqlite_plugin_drop (void *cls) } -/** - * Callback function to process statistic values. - * - * @param cls closure - * @param subsystem name of subsystem that created the statistic - * @param name the name of the datum - * @param value the current value - * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not - * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration - */ -static int -process_stat_in (void *cls, - const char *subsystem, - const char *name, - uint64_t value, - int is_persistent) -{ - struct Plugin *plugin = cls; - - plugin->stats_worked = GNUNET_YES; - plugin->payload += value; -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Notification from statistics about existing payload (%llu), new payload is %llu\n", - value, - plugin->payload); -#endif - return GNUNET_OK; -} - - -static void -process_stat_done (void *cls, - int success) +static unsigned long long +sqlite_plugin_get_size (void *cls) { struct Plugin *plugin = cls; sqlite3_stmt *stmt; uint64_t pages; uint64_t page_size; - plugin->stat_get = NULL; - if ( (plugin->stats_worked == GNUNET_NO) && - (SQLITE_VERSION_NUMBER >= 3006000) ) - { - CHECK (SQLITE_OK == - sqlite3_exec (plugin->dbh, - "VACUUM", NULL, NULL, ENULL)); - CHECK (SQLITE_OK == - sqlite3_exec (plugin->dbh, - "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL)); - CHECK (SQLITE_OK == - sq_prepare (plugin->dbh, - "PRAGMA page_count", - &stmt)); - if (SQLITE_ROW == - sqlite3_step (stmt)) - pages = sqlite3_column_int64 (stmt, 0); - else - pages = 0; - sqlite3_finalize (stmt); - CHECK (SQLITE_OK == - sq_prepare (plugin->dbh, - "PRAGMA page_size", - &stmt)); - CHECK (SQLITE_ROW == - sqlite3_step (stmt)); - page_size = sqlite3_column_int64 (stmt, 0); - sqlite3_finalize (stmt); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"), - (unsigned long long) pages, - (unsigned long long) page_size); - plugin->payload = pages * page_size; + if (SQLITE_VERSION_NUMBER < 3006000) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, + "datastore-sqlite", + _("sqlite version to old to determine size, assuming zero\n")); + return 0; } + CHECK (SQLITE_OK == + sqlite3_exec (plugin->dbh, + "VACUUM", NULL, NULL, ENULL)); + CHECK (SQLITE_OK == + sqlite3_exec (plugin->dbh, + "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL)); + CHECK (SQLITE_OK == + sq_prepare (plugin->dbh, + "PRAGMA page_count", + &stmt)); + if (SQLITE_ROW == + sqlite3_step (stmt)) + pages = sqlite3_column_int64 (stmt, 0); + else + pages = 0; + sqlite3_finalize (stmt); + CHECK (SQLITE_OK == + sq_prepare (plugin->dbh, + "PRAGMA page_size", + &stmt)); + CHECK (SQLITE_ROW == + sqlite3_step (stmt)); + page_size = sqlite3_column_int64 (stmt, 0); + sqlite3_finalize (stmt); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"), + (unsigned long long) pages, + (unsigned long long) page_size); + return pages * page_size; } @@ -1669,16 +1555,6 @@ libgnunet_plugin_datastore_sqlite_init (void *cls) return NULL; /* can only initialize once! */ memset (&plugin, 0, sizeof(struct Plugin)); plugin.env = env; - plugin.statistics = GNUNET_STATISTICS_create (env->sched, - "ds-sqlite", - env->cfg); - plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics, - "ds-sqlite", - QUOTA_STAT_NAME, - GNUNET_TIME_UNIT_SECONDS, - &process_stat_done, - &process_stat_in, - &plugin); if (GNUNET_OK != database_setup (env->cfg, &plugin)) { @@ -1717,11 +1593,6 @@ libgnunet_plugin_datastore_sqlite_done (void *cls) struct GNUNET_DATASTORE_PluginFunctions *api = cls; struct Plugin *plugin = api->cls; - if (plugin->stat_get != NULL) - { - GNUNET_STATISTICS_get_cancel (plugin->stat_get); - plugin->stat_get = NULL; - } if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (plugin->env->sched, @@ -1735,10 +1606,7 @@ libgnunet_plugin_datastore_sqlite_done (void *cls) if (plugin->drop_on_shutdown) fn = GNUNET_strdup (plugin->fn); database_shutdown (plugin); - GNUNET_STATISTICS_destroy (plugin->statistics, - GNUNET_NO); plugin->env = NULL; - plugin->payload = 0; GNUNET_free (api); if (fn != NULL) { diff --git a/src/include/gnunet_scheduler_lib.h b/src/include/gnunet_scheduler_lib.h index 79739ef16..b39feb926 100644 --- a/src/include/gnunet_scheduler_lib.h +++ b/src/include/gnunet_scheduler_lib.h @@ -483,7 +483,7 @@ GNUNET_SCHEDULER_add_write_file (struct GNUNET_SCHEDULER_Handle *sched, * && (delay-ready * || any-rs-ready * || any-ws-ready - * || (shutdown-active && run-on-shutdown) ) + * || shutdown-active) * * * @param sched scheduler to use diff --git a/src/peerinfo/Makefile.am b/src/peerinfo/Makefile.am index 31c7ab92d..116095f2a 100644 --- a/src/peerinfo/Makefile.am +++ b/src/peerinfo/Makefile.am @@ -35,7 +35,8 @@ gnunet_service_peerinfo_LDADD = \ check_PROGRAMS = \ - test_peerinfo_api + test_peerinfo_api \ + perf_peerinfo_api if !DISABLE_TEST_RUN TESTS = $(check_PROGRAMS) @@ -48,5 +49,12 @@ test_peerinfo_api_LDADD = \ $(top_builddir)/src/peerinfo/libgnunetpeerinfo.la \ $(top_builddir)/src/util/libgnunetutil.la +perf_peerinfo_api_SOURCES = \ + perf_peerinfo_api.c +perf_peerinfo_api_LDADD = \ + $(top_builddir)/src/hello/libgnunethello.la \ + $(top_builddir)/src/peerinfo/libgnunetpeerinfo.la \ + $(top_builddir)/src/util/libgnunetutil.la + EXTRA_DIST = \ test_peerinfo_api_data.conf diff --git a/src/peerinfo/gnunet-service-peerinfo.c b/src/peerinfo/gnunet-service-peerinfo.c index ee8749955..4879eea34 100644 --- a/src/peerinfo/gnunet-service-peerinfo.c +++ b/src/peerinfo/gnunet-service-peerinfo.c @@ -33,6 +33,7 @@ #include "platform.h" #include "gnunet_crypto_lib.h" +#include "gnunet_container_lib.h" #include "gnunet_disk_lib.h" #include "gnunet_hello_lib.h" #include "gnunet_protocols.h" @@ -56,11 +57,6 @@ struct HostEntry { - /** - * This is a linked list. - */ - struct HostEntry *next; - /** * Identity of the peer. */ @@ -75,9 +71,10 @@ struct HostEntry /** - * The in-memory list of known hosts. + * The in-memory list of known hosts, mapping of + * host IDs to 'struct HostEntry*' values. */ -static struct HostEntry *hosts; +static struct GNUNET_CONTAINER_MultiHashMap *hostmap; /** * Clients to immediately notify about all changes. @@ -162,24 +159,6 @@ get_host_filename (const struct GNUNET_PeerIdentity *id) } -/** - * Find the host entry for the given peer. FIXME: replace by hash map! - * @return NULL if not found - */ -static struct HostEntry * -lookup_host_entry (const struct GNUNET_PeerIdentity *id) -{ - struct HostEntry *pos; - - pos = hosts; - while ((pos != NULL) && - (0 != - memcmp (id, &pos->identity, sizeof (struct GNUNET_PeerIdentity)))) - pos = pos->next; - return pos; -} - - /** * Broadcast information about the given entry to all * clients that care. @@ -215,7 +194,8 @@ add_host_to_known_hosts (const struct GNUNET_PeerIdentity *identity) struct GNUNET_TIME_Absolute now; char *fn; - entry = lookup_host_entry (identity); + entry = GNUNET_CONTAINER_multihashmap_get (hostmap, + &identity->hashPubKey); if (entry != NULL) return; GNUNET_STATISTICS_update (stats, @@ -250,8 +230,10 @@ add_host_to_known_hosts (const struct GNUNET_PeerIdentity *identity) } } GNUNET_free (fn); - entry->next = hosts; - hosts = entry; + GNUNET_CONTAINER_multihashmap_put (hostmap, + &identity->hashPubKey, + entry, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); notify_all (entry); } @@ -275,7 +257,8 @@ remove_garbage (const char *fullname) static int -hosts_directory_scan_callback (void *cls, const char *fullname) +hosts_directory_scan_callback (void *cls, + const char *fullname) { unsigned int *matched = cls; struct GNUNET_PeerIdentity identity; @@ -350,7 +333,8 @@ bind_address (const struct GNUNET_PeerIdentity *peer, struct GNUNET_TIME_Absolute delta; add_host_to_known_hosts (peer); - host = lookup_host_entry (peer); + host = GNUNET_CONTAINER_multihashmap_get (hostmap, + &peer->hashPubKey); GNUNET_assert (host != NULL); if (host->hello == NULL) { @@ -383,63 +367,43 @@ bind_address (const struct GNUNET_PeerIdentity *peer, } + /** - * Do transmit info either for only the host matching the given - * argument or for all known hosts. + * Do transmit info about peer to given host. * - * @param only NULL to hit all hosts, otherwise specifies a particular target - * @param client who is making the request (and will thus receive our confirmation) + * @param cls NULL to hit all hosts, otherwise specifies a particular target + * @param key hostID + * @param value information to transmit + * @return GNUNET_YES (continue to iterate) */ -static void -send_to_each_host (const struct GNUNET_PeerIdentity *only, - struct GNUNET_SERVER_Client *client) +static int +add_to_tc (void *cls, + const GNUNET_HashCode *key, + void *value) { - struct HostEntry *pos; + struct GNUNET_SERVER_TransmitContext *tc = cls; + struct HostEntry *pos = value; struct InfoMessage *im; uint16_t hs; char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_SERVER_TransmitContext *tc; - int match; - tc = GNUNET_SERVER_transmit_context_create (client); - match = GNUNET_NO; - pos = hosts; - while (pos != NULL) + hs = 0; + im = (struct InfoMessage *) buf; + if (pos->hello != NULL) { - if ((only == NULL) || - (0 == - memcmp (only, &pos->identity, - sizeof (struct GNUNET_PeerIdentity)))) - { - hs = 0; - im = (struct InfoMessage *) buf; - if (pos->hello != NULL) - { - hs = GNUNET_HELLO_size (pos->hello); - GNUNET_assert (hs < - GNUNET_SERVER_MAX_MESSAGE_SIZE - - sizeof (struct InfoMessage)); - memcpy (&im[1], pos->hello, hs); - match = GNUNET_YES; - } - im->header.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_INFO); - im->header.size = htons (sizeof (struct InfoMessage) + hs); - im->reserved = htonl (0); - im->peer = pos->identity; - GNUNET_SERVER_transmit_context_append_message (tc, - &im->header); - } - pos = pos->next; + hs = GNUNET_HELLO_size (pos->hello); + GNUNET_assert (hs < + GNUNET_SERVER_MAX_MESSAGE_SIZE - + sizeof (struct InfoMessage)); + memcpy (&im[1], pos->hello, hs); } - if ( (only != NULL) && - (match == GNUNET_NO) ) - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No `%s' message was found for peer `%4s'\n", - "HELLO", - GNUNET_i2s (only)); - GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0, - GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); + im->header.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_INFO); + im->header.size = htons (sizeof (struct InfoMessage) + hs); + im->reserved = htonl (0); + im->peer = pos->identity; + GNUNET_SERVER_transmit_context_append_message (tc, + &im->header); + return GNUNET_YES; } @@ -500,7 +464,6 @@ cron_clean_data_hosts (void *cls, now = GNUNET_TIME_absolute_get (); GNUNET_DISK_directory_scan (networkIdDirectory, &discard_hosts_helper, &now); - GNUNET_SCHEDULER_add_delayed (tc->sched, DATA_HOST_CLEAN_FREQ, &cron_clean_data_hosts, NULL); @@ -553,6 +516,7 @@ handle_get (void *cls, const struct GNUNET_MessageHeader *message) { const struct ListPeerMessage *lpm; + struct GNUNET_SERVER_TransmitContext *tc; lpm = (const struct ListPeerMessage *) message; #if DEBUG_PEERINFO @@ -561,7 +525,14 @@ handle_get (void *cls, "GET", GNUNET_i2s (&lpm->peer)); #endif - send_to_each_host (&lpm->peer, client); + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_CONTAINER_multihashmap_get_multiple (hostmap, + &lpm->peer.hashPubKey, + &add_to_tc, + tc); + GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0, + GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END); + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); } @@ -577,12 +548,39 @@ handle_get_all (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + struct GNUNET_SERVER_TransmitContext *tc; + #if DEBUG_PEERINFO GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s' message received\n", "GET_ALL"); #endif - send_to_each_host (NULL, client); + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_CONTAINER_multihashmap_iterate (hostmap, + &add_to_tc, + tc); + GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0, + GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END); + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); +} + + +static int +do_notify_entry (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_SERVER_Client *client = cls; + struct HostEntry *he = value; + struct InfoMessage *msg; + + msg = make_info_message (he); + GNUNET_SERVER_notification_context_unicast (notify_list, + client, + &msg->header, + GNUNET_NO); + GNUNET_free (msg); + return GNUNET_YES; } @@ -595,12 +593,9 @@ handle_get_all (void *cls, */ static void handle_notify (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - struct InfoMessage *msg; - struct HostEntry *pos; - #if DEBUG_PEERINFO GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s' message received\n", @@ -608,20 +603,22 @@ handle_notify (void *cls, #endif GNUNET_SERVER_notification_context_add (notify_list, client); - pos = hosts; - while (NULL != pos) - { - msg = make_info_message (pos); - GNUNET_SERVER_notification_context_unicast (notify_list, - client, - &msg->header, - GNUNET_NO); - GNUNET_free (msg); - pos = pos->next; - } + GNUNET_CONTAINER_multihashmap_iterate (hostmap, + &do_notify_entry, + client); } +static int +free_host_entry (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct HostEntry *he = value; + GNUNET_free (he); + return GNUNET_YES; +} + /** * Clean up our state. Called during shutdown. * @@ -634,6 +631,10 @@ shutdown_task (void *cls, { GNUNET_SERVER_notification_context_destroy (notify_list); notify_list = NULL; + GNUNET_CONTAINER_multihashmap_iterate (hostmap, + &free_host_entry, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (hostmap); if (stats != NULL) { GNUNET_STATISTICS_destroy (stats, GNUNET_NO); @@ -666,6 +667,8 @@ run (void *cls, sizeof (struct GNUNET_MessageHeader)}, {NULL, NULL, 0, 0} }; + + hostmap = GNUNET_CONTAINER_multihashmap_create (1024); stats = GNUNET_STATISTICS_create (sched, "peerinfo", cfg); notify_list = GNUNET_SERVER_notification_context_create (server, 0); GNUNET_assert (GNUNET_OK == diff --git a/src/peerinfo/perf_peerinfo_api.c b/src/peerinfo/perf_peerinfo_api.c new file mode 100755 index 000000000..1c0df0332 --- /dev/null +++ b/src/peerinfo/perf_peerinfo_api.c @@ -0,0 +1,217 @@ +/* + This file is part of GNUnet. + (C) 2004, 2009 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file peerinfo/test_peerinfo_hammer.c + * @brief testcase for peerinfo_api.c, hopefully hammer the peerinfo service + * @author Nathan Evans + */ + +#include "platform.h" +#include "gnunet_hello_lib.h" +#include "gnunet_getopt_lib.h" +#include "gnunet_os_lib.h" +#include "gnunet_peerinfo_service.h" +#include "gnunet_program_lib.h" +#include "gnunet_time_lib.h" +#include "peerinfo.h" + +#define START_SERVICE 1 + +#define NUM_REQUESTS 5000 + +static struct GNUNET_SCHEDULER_Handle *sched; + +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +static struct GNUNET_PEERINFO_IteratorContext *ic[NUM_REQUESTS]; + +static struct GNUNET_PEERINFO_Handle *h; + +static unsigned int numpeers; + +static int +check_it (void *cls, + const char *tname, + struct GNUNET_TIME_Absolute expiration, + const void *addr, uint16_t addrlen) +{ + if (addrlen > 0) + { +#if DEBUG + fprintf (stderr, + "name: %s, addr: %s\n", + tname, + (const char*) addr); +#endif + } + return GNUNET_OK; +} + + +static size_t +address_generator (void *cls, size_t max, void *buf) +{ + size_t *agc = cls; + size_t ret; + char *address; + + if (*agc == 0) + return 0; + + GNUNET_asprintf(&address, "Address%d", *agc); + + ret = GNUNET_HELLO_add_address ("peerinfotest", + GNUNET_TIME_relative_to_absolute + (GNUNET_TIME_UNIT_HOURS), address, strlen(address) + 1, + buf, max); + *agc = 0; + return ret; +} + + +static void +add_peer (size_t i) +{ + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey; + struct GNUNET_PeerIdentity pid; + struct GNUNET_HELLO_Message *h2; + size_t agc; + + agc = 2; + memset (&pkey, i, sizeof (pkey)); + GNUNET_CRYPTO_hash (&pkey, sizeof (pkey), &pid.hashPubKey); + h2 = GNUNET_HELLO_create (&pkey, &address_generator, &i); + GNUNET_PEERINFO_add_peer (h, h2); + GNUNET_free (h2); +} + + +static void +process (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Message *hello) +{ + if (peer == NULL) + { +#if DEBUG + fprintf(stderr, "Process received NULL response\n"); +#endif + } + else + { +#if DEBUG + fprintf(stderr, "Processed a peer\n"); +#endif + numpeers++; + if (0 && (hello != NULL)) + GNUNET_HELLO_iterate_addresses (hello, GNUNET_NO, &check_it, NULL); + + } +} + + +static void +run (void *cls, + struct GNUNET_SCHEDULER_Handle *s, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + size_t i; + sched = s; + cfg = c; + h = GNUNET_PEERINFO_connect (sched, cfg); + + for (i = 0; i < NUM_REQUESTS; i++) + { + add_peer (i); + ic[i] = GNUNET_PEERINFO_iterate (h, + NULL, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 30), + &process, cls); + } + fprintf (stderr, + "Issued %u requests\n", + NUM_REQUESTS); +} + +static int +check () +{ + int ok = 3; + char *const argv[] = { "test-peerinfo-hammer", + "-c", + "test_peerinfo_api_data.conf", +#if DEBUG_PEERINFO + "-L", "DEBUG", +#endif + NULL + }; +#if START_SERVICE + pid_t pid; + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_OPTION_END + }; + pid = GNUNET_OS_start_process (NULL, NULL, "gnunet-service-peerinfo", + "gnunet-service-peerinfo", +#if DEBUG_PEERINFO + "-L", "DEBUG", +#endif + "-c", "test_peerinfo_api_data.conf", NULL); +#endif + GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, + argv, "test-peerinfo-api", "nohelp", + options, &run, &ok); + fprintf (stderr, + "Processed %u/%u peers\n", + numpeers, + NUM_REQUESTS); +#if START_SERVICE + if (0 != PLIBC_KILL (pid, SIGTERM)) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); + ok = 1; + } + GNUNET_OS_process_wait(pid); +#endif + return ok; +} + + +int +main (int argc, char *argv[]) +{ + int ret = 0; + + GNUNET_log_setup ("test_peerinfo_api", +#if DEBUG_PEERINFO + "DEBUG", +#else + "WARNING", +#endif + NULL); + ret = check (); + GNUNET_DISK_directory_remove ("/tmp/test-gnunet-peerinfo"); + return ret; +} + +/* end of test_peerinfo_hammer.c */ diff --git a/src/peerinfo/test_peerinfo_api.c b/src/peerinfo/test_peerinfo_api.c index 47304d549..a3c2c99eb 100644 --- a/src/peerinfo/test_peerinfo_api.c +++ b/src/peerinfo/test_peerinfo_api.c @@ -208,7 +208,7 @@ main (int argc, char *argv[]) #endif NULL); ret = check (); - GNUNET_DISK_directory_remove ("/tmp/test-gnunetd-peerinfo"); + GNUNET_DISK_directory_remove ("/tmp/test-gnunet-peerinfo"); return ret; } diff --git a/src/peerinfo/test_peerinfo_api_data.conf b/src/peerinfo/test_peerinfo_api_data.conf index a81ffccb9..998df571a 100644 --- a/src/peerinfo/test_peerinfo_api_data.conf +++ b/src/peerinfo/test_peerinfo_api_data.conf @@ -1,5 +1,6 @@ [PATHS] -SERVICEHOME = /tmp/test-gnunetd-peerinfo/ +SERVICEHOME = /tmp/test-gnunet-peerinfo/ [peerinfo] PORT = 22354 +DEBUG = NO diff --git a/src/util/network.c b/src/util/network.c index 886813426..9b96436db 100644 --- a/src/util/network.c +++ b/src/util/network.c @@ -836,7 +836,6 @@ void GNUNET_NETWORK_fdset_handle_set (struct GNUNET_NETWORK_FDSet *fds, const struct GNUNET_DISK_FileHandle *h) { - #ifdef MINGW HANDLE hw; GNUNET_DISK_internal_file_handle_ (h, &hw, sizeof (HANDLE)); diff --git a/src/util/scheduler.c b/src/util/scheduler.c index 1352fe0d8..0ff6f9612 100644 --- a/src/util/scheduler.c +++ b/src/util/scheduler.c @@ -173,6 +173,21 @@ struct GNUNET_SCHEDULER_Handle */ struct Task *pending; + /** + * List of tasks waiting ONLY for a timeout event. + * Sorted by timeout (earliest first). Used so that + * we do not traverse the list of these tasks when + * building select sets (we just look at the head + * to determine the respective timeout ONCE). + */ + struct Task *pending_timeout; + + /** + * Last inserted task waiting ONLY for a timeout event. + * Used to (heuristically) speed up insertion. + */ + struct Task *pending_timeout_last; + /** * ID of the task that is running right now. */ @@ -266,6 +281,15 @@ is_pending (struct GNUNET_SCHEDULER_Handle *sched, return GNUNET_NO; min = -1; /* maximum value */ pos = sched->pending; + while (pos != NULL) + { + if (pos->id == id) + return GNUNET_YES; + if (pos->id < min) + min = pos->id; + pos = pos->next; + } + pos = sched->pending_timeout; while (pos != NULL) { if (pos->id == id) @@ -306,7 +330,19 @@ update_sets (struct GNUNET_SCHEDULER_Handle *sched, struct GNUNET_TIME_Relative *timeout) { struct Task *pos; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative to; + now = GNUNET_TIME_absolute_get (); + pos = sched->pending_timeout; + if (pos != NULL) + { + to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); + if (timeout->value > to.value) + *timeout = to; + if (pos->reason != 0) + *timeout = GNUNET_TIME_UNIT_ZERO; + } pos = sched->pending; while (pos != NULL) { @@ -316,12 +352,9 @@ update_sets (struct GNUNET_SCHEDULER_Handle *sched, pos = pos->next; continue; } - if (pos->timeout.value != GNUNET_TIME_UNIT_FOREVER_ABS.value) { - struct GNUNET_TIME_Relative to; - - to = GNUNET_TIME_absolute_get_remaining (pos->timeout); + to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); if (timeout->value > to.value) *timeout = to; } @@ -384,24 +417,33 @@ is_ready (struct GNUNET_SCHEDULER_Handle *sched, const struct GNUNET_NETWORK_FDSet *rs, const struct GNUNET_NETWORK_FDSet *ws) { + enum GNUNET_SCHEDULER_Reason reason; + + reason = task->reason; if (now.value >= task->timeout.value) - task->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if ( (0 == (task->reason & GNUNET_SCHEDULER_REASON_READ_READY)) && - ( (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd)) || + reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; + if ( (0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) && + ( ( (task->read_fd != -1) && + (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd)) ) || (set_overlaps (rs, task->read_set) ) ) ) - task->reason |= GNUNET_SCHEDULER_REASON_READ_READY; - if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - ( (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd)) || + reason |= GNUNET_SCHEDULER_REASON_READ_READY; + if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && + ( ( (task->write_fd != -1) && + (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd)) ) || (set_overlaps (ws, task->write_set) ) ) ) - task->reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; - if (task->reason == 0) + reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; + if (reason == 0) return GNUNET_NO; /* not ready */ if (task->prereq_id != GNUNET_SCHEDULER_NO_TASK) { if (GNUNET_YES == is_pending (sched, task->prereq_id)) - return GNUNET_NO; /* prereq waiting */ - task->reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; + { + task->reason = reason; + return GNUNET_NO; /* prereq waiting */ + } + reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; } + task->reason = reason; return GNUNET_YES; } @@ -413,7 +455,8 @@ is_ready (struct GNUNET_SCHEDULER_Handle *sched, * @param task task ready for execution */ static void -queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle, struct Task *task) +queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle, + struct Task *task) { enum GNUNET_SCHEDULER_Priority p = task->priority; if (0 != (task->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) @@ -444,6 +487,20 @@ check_ready (struct GNUNET_SCHEDULER_Handle *handle, now = GNUNET_TIME_absolute_get (); prev = NULL; + pos = handle->pending_timeout; + while (pos != NULL) + { + next = pos->next; + if (now.value >= pos->timeout.value) + pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; + if (0 == pos->reason) + break; + handle->pending_timeout = next; + if (handle->pending_timeout_last == pos) + handle->pending_timeout_last = NULL; + queue_ready_task (handle, pos); + pos = next; + } pos = handle->pending; while (pos != NULL) { @@ -484,6 +541,15 @@ GNUNET_SCHEDULER_shutdown (struct GNUNET_SCHEDULER_Handle *sched) struct Task *pos; int i; + pos = sched->pending_timeout; + while (pos != NULL) + { + pos->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN; + /* we don't move the task into the ready queue yet; check_ready + will do that later, possibly adding additional + readiness-factors */ + pos = pos->next; + } pos = sched->pending; while (pos != NULL) { @@ -615,7 +681,7 @@ run_ready (struct GNUNET_SCHEDULER_Handle *sched, destroy_task (pos); sched->tasks_run++; } - while ((sched->pending == NULL) || (p >= sched->max_priority_added)); + while ( (sched->pending == NULL) || (p >= sched->max_priority_added) ); } /** @@ -700,7 +766,9 @@ GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_Task task, void *task_cls) GNUNET_SCHEDULER_REASON_STARTUP); last_tr = 0; busy_wait_warning = 0; - while ((sched.pending != NULL) || (sched.ready_count > 0)) + while ((sched.pending != NULL) || + (sched.pending_timeout != NULL) || + (sched.ready_count > 0)) { GNUNET_NETWORK_fdset_zero (rs); GNUNET_NETWORK_fdset_zero (ws); @@ -832,8 +900,10 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Handle *sched, struct Task *t; struct Task *prev; enum GNUNET_SCHEDULER_Priority p; + int to; void *ret; + to = 0; prev = NULL; t = sched->pending; while (t != NULL) @@ -843,6 +913,21 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Handle *sched, prev = t; t = t->next; } + if (t == NULL) + { + prev = NULL; + to = 1; + t = sched->pending_timeout; + while (t != NULL) + { + if (t->id == task) + break; + prev = t; + t = t->next; + } + if (sched->pending_timeout_last == t) + sched->pending_timeout_last = NULL; + } p = 0; while (t == NULL) { @@ -864,12 +949,25 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Handle *sched, if (prev == NULL) { if (p == 0) - sched->pending = t->next; + { + if (to == 0) + { + sched->pending = t->next; + } + else + { + sched->pending_timeout = t->next; + } + } else - sched->ready[p] = t->next; + { + sched->ready[p] = t->next; + } } else - prev->next = t->next; + { + prev->next = t->next; + } ret = t->callback_cls; #if DEBUG_TASKS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -998,10 +1096,84 @@ GNUNET_SCHEDULER_add_delayed (struct GNUNET_SCHEDULER_Handle * sched, struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_Task task, void *task_cls) { +#if 1 + /* new, optimized version */ + struct Task *t; + struct Task *pos; + struct Task *prev; +#if EXECINFO + void *backtrace_array[MAX_TRACE_DEPTH]; +#endif + + GNUNET_assert (NULL != task); + t = GNUNET_malloc (sizeof (struct Task)); + t->callback = task; + t->callback_cls = task_cls; +#if EXECINFO + t->num_backtrace_strings = backtrace(backtrace_array, MAX_TRACE_DEPTH); + t->backtrace_strings = backtrace_symbols(backtrace_array, t->num_backtrace_strings); +#endif + t->read_fd = -1; + t->write_fd = -1; + t->id = ++sched->last_id; +#if PROFILE_DELAYS + t->start_time = GNUNET_TIME_absolute_get (); +#endif + t->timeout = GNUNET_TIME_relative_to_absolute (delay); + t->priority = sched->current_priority; + /* try tail first (optimization in case we are + appending to a long list of tasks with timeouts) */ + prev = sched->pending_timeout_last; + if (prev != NULL) + { + if (prev->timeout.value > t->timeout.value) + prev = NULL; + else + pos = prev->next; /* heuristic success! */ + } + if (prev == NULL) + { + /* heuristic failed, do traversal of timeout list */ + pos = sched->pending_timeout; + } + while ( (pos != NULL) && + ( (pos->timeout.value <= t->timeout.value) || + (pos->reason != 0) ) ) + { + prev = pos; + pos = pos->next; + } + if (prev == NULL) + sched->pending_timeout = t; + else + prev->next = t; + t->next = pos; + /* hyper-optimization... */ + sched->pending_timeout_last = t; + +#if DEBUG_TASKS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding task: %llu / %p\n", t->id, t->callback_cls); +#endif +#if EXECINFO + int i; + + for (i=0;inum_backtrace_strings;i++) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Task %u trace %d: %s\n", + t->id, + i, + t->backtrace_strings[i]); +#endif + return t->id; + +#else + /* unoptimized version */ return GNUNET_SCHEDULER_add_select (sched, GNUNET_SCHEDULER_PRIORITY_KEEP, GNUNET_SCHEDULER_NO_TASK, delay, NULL, NULL, task, task_cls); +#endif } @@ -1045,7 +1217,7 @@ GNUNET_SCHEDULER_add_now (struct GNUNET_SCHEDULER_Handle *sched, * && (delay-ready * || any-rs-ready * || any-ws-ready - * || (shutdown-active && run-on-shutdown) ) + * || shutdown-active ) * * * @param sched scheduler to use -- 2.25.1