* DV: [Nate]
- proper bandwidth allocation
- performance tests
-* PEERINFO:
+* PEERINFO:
- merge multiple HELLOs of the same peer in the transmission queue
(theoretically reduces overhead; bounds message queue size)
- merge multiple iteration requests over "all" peers in the queue
*/
#define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
+#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
+
+/**
+ * After how many payload-changing operations
+ * do we sync our statistics?
+ */
+#define MAX_STAT_SYNC_LAG 50
/**
};
+
/**
* Our datastore plugin (NULL if not available).
*/
* How much space have we currently reserved?
*/
static unsigned long long reserved;
+
+/**
+ * How much data are we currently storing
+ * in the database?
+ */
+static unsigned long long payload;
+
+/**
+ * Number of updates that were made to the
+ * payload value since we last synchronized
+ * it with the statistics service.
+ */
+static unsigned int lastSync;
+
+/**
+ * Did we get an answer from statistics?
+ */
+static int stats_worked;
/**
* Identity of the task that is used to delete
static struct GNUNET_STATISTICS_Handle *stats;
+/**
+ * Synchronize our utilization statistics with the
+ * statistics service.
+ */
+static void
+sync_stats ()
+{
+ GNUNET_STATISTICS_set (stats,
+ QUOTA_STAT_NAME,
+ payload,
+ GNUNET_YES);
+ lastSync = 0;
+}
+
+
+
+
/**
* Function called once the transmit operation has
* either failed or succeeded.
*/
static int cleaning_done;
+/**
+ * Handle for pending get request.
+ */
+static struct GNUNET_STATISTICS_GetHandle *stat_get;
+
+
/**
* Task that is used to remove expired entries from
* the datastore. This task will schedule itself
#endif
amount = GNUNET_ntohll(msg->amount);
entries = ntohl(msg->entries);
- used = plugin->api->get_size (plugin->api->cls) + reserved;
+ used = payload + reserved;
req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
if (used + req > quota)
{
(GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK,
msg);
GNUNET_free_non_null (msg);
- if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
+ if (quota - reserved - cache_size < payload)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Need %llu bytes more space (%llu allowed, using %llu)\n"),
(unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
(unsigned long long) (quota - reserved - cache_size),
- (unsigned long long) plugin->api->get_size (plugin->api->cls));
+ (unsigned long long) payload);
manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
}
}
}
+/**
+ * Function called by plugins to notify us about a
+ * change in their disk utilization.
+ *
+ * @param cls closure (NULL)
+ * @param delta change in disk utilization,
+ * 0 for "reset to empty"
+ */
+static void
+disk_utilization_change_cb (void *cls,
+ int delta)
+{
+ if ( (delta < 0) &&
+ (payload < -delta) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"),
+ (long long) payload,
+ (long long) -delta);
+ payload = plugin->api->get_size (plugin->api->cls);
+ sync_stats ();
+ return;
+ }
+ payload += delta;
+ lastSync++;
+ if (lastSync >= MAX_STAT_SYNC_LAG)
+ sync_stats ();
+}
+
+
+/**
+ * Callback function to process statistic values.
+ *
+ * @param cls closure (struct Plugin*)
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
+ * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
+ */
+static int
+process_stat_in (void *cls,
+ const char *subsystem,
+ const char *name,
+ uint64_t value,
+ int is_persistent)
+{
+ GNUNET_assert (stats_worked == GNUNET_NO);
+ stats_worked = GNUNET_YES;
+ payload += value;
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notification from statistics about existing payload (%llu), new payload is %llu\n",
+ value,
+ payload);
+#endif
+ return GNUNET_OK;
+}
+
+
+static void
+process_stat_done (void *cls,
+ int success)
+{
+ struct DatastorePlugin *plugin = cls;
+
+ stat_get = NULL;
+ if (stats_worked == GNUNET_NO)
+ payload = plugin->api->get_size (plugin->api->cls);
+}
+
+
/**
* Load the datastore plugin.
*/
ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
ret->env.cfg = cfg;
ret->env.sched = sched;
+ ret->env.duc = &disk_utilization_change_cb;
+ ret->env.cls = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Loading `%s' datastore plugin\n"), name);
GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
GNUNET_CONTAINER_bloomfilter_free (filter);
filter = NULL;
}
+ if (lastSync > 0)
+ sync_stats ();
+ if (stat_get != NULL)
+ {
+ GNUNET_STATISTICS_get_cancel (stat_get);
+ stat_get = NULL;
+ }
if (stats != NULL)
{
GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
}
return;
}
+ stat_get = GNUNET_STATISTICS_get (stats,
+ "datastore",
+ QUOTA_STAT_NAME,
+ GNUNET_TIME_UNIT_SECONDS,
+ &process_stat_done,
+ &process_stat_in,
+ plugin);
GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
GNUNET_SERVER_add_handlers (server, handlers);
expired_kill_task
};
+/**
+ * Function called by plugins to notify us about a
+ * change in their disk utilization.
+ *
+ * @param cls closure (NULL)
+ * @param delta change in disk utilization,
+ * 0 for "reset to empty"
+ */
+static void
+disk_utilization_change_cb (void *cls,
+ int delta)
+{
+}
+
static void
putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k)
}
env.cfg = cfg;
env.sched = sched;
+ env.duc = &disk_utilization_change_cb;
+ env.cls = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Loading `%s' datastore plugin\n"), name);
GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
* @file datastore/plugin_datastore.h
* @brief API for the database backend plugins.
* @author Christian Grothoff
- *
- * TODO:
- * - consider defining enumeration or at least typedef
- * for the type of "type" (instead of using uint32_t)
*/
#ifndef PLUGIN_DATASTORE_H
#define PLUGIN_DATASTORE_H
/**
* How many bytes of overhead will we assume per entry
- * in the SQlite DB?
+ * in any DB (for reservations)?
*/
#define GNUNET_DATASTORE_ENTRY_OVERHEAD 256
+/**
+ * Function invoked to notify service of disk utilization
+ * changes.
+ *
+ * @param cls closure
+ * @param delta change in disk utilization,
+ * 0 for "reset to empty"
+ */
+typedef void (*DiskUtilizationChange)(void *cls,
+ int delta);
+
+
/**
* The datastore service will pass a pointer to a struct
* of this type as the first and only argument to the
*/
struct GNUNET_SCHEDULER_Handle *sched;
+ /**
+ * Function to call on disk utilization change.
+ */
+ DiskUtilizationChange duc;
+
+ /**
+ * Closure.
+ */
+ void *cls;
+
};
" ORDER BY expire DESC,vkey DESC LIMIT 1)"\
"ORDER BY expire DESC,vkey DESC LIMIT 1"
-// #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072"
struct GNUNET_MysqlStatementHandle
#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?"
struct GNUNET_MysqlStatementHandle *update_entry;
- struct GNUNET_MysqlStatementHandle *iter[4];
+#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072"
+ struct GNUNET_MysqlStatementHandle *get_size;
- /**
- * Size of the mysql database on disk.
- */
- unsigned long long content_size;
+ struct GNUNET_MysqlStatementHandle *iter[4];
};
return ret;
}
+
static int
-return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values)
+return_ok (void *cls,
+ unsigned int num_values,
+ MYSQL_BIND * values)
{
return GNUNET_OK;
}
{
do_delete_value (plugin, vkey);
do_delete_entry_by_vkey (plugin, vkey);
- plugin->content_size -= length;
+ if (length != 0)
+ plugin->env->duc (plugin->env->cls,
+ - length);
}
return;
END_SET:
* Get an estimate of how much space the database is
* currently using.
*
- * @param cls our "struct Plugin*"
+ * @param cls our "struct Plugin *"
* @return number of bytes used on disk
*/
static unsigned long long
mysql_plugin_get_size (void *cls)
{
struct Plugin *plugin = cls;
- return plugin->content_size;
+ MYSQL_BIND cbind[1];
+ long long total;
+
+ memset (cbind, 0, sizeof (cbind));
+ total = 0;
+ cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
+ cbind[0].buffer = &total;
+ cbind[0].is_unsigned = GNUNET_NO;
+ if (GNUNET_OK !=
+ prepared_statement_run_select (plugin,
+ plugin->get_size,
+ 1, cbind,
+ &return_ok, NULL,
+ -1))
+ return 0;
+ return total;
}
vkey,
(unsigned int) size);
#endif
- plugin->content_size += size;
+ if (size > 0)
+ plugin->env->duc (plugin->env->cls,
+ size);
return GNUNET_OK;
}
"DROP TABLE gn090")) ||
(GNUNET_OK != run_statement (plugin,
"DROP TABLE gn072")))
- return; /* error */
- plugin->content_size = 0;
+ return; /* error */
+ plugin->env->duc (plugin->env->cls, 0);
}
|| PINIT (plugin->select_entry_by_hash_vhash_and_type,
SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE)
|| PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH)
+ || PINIT (plugin->get_size, SELECT_SIZE)
|| PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH)
|| PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
|| PINIT (plugin->count_entry_by_hash_vhash_and_type,
#define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
+/**
+ * Closure for 'postgres_next_request_cont'.
+ */
struct NextRequestClosure
{
+ /**
+ * Global plugin data.
+ */
struct Plugin *plugin;
+
+ /**
+ * Function to call for each matching entry.
+ */
PluginIterator iter;
+
+ /**
+ * Closure for 'iter'.
+ */
void *iter_cls;
+
+ /**
+ * Parameters for the prepared statement.
+ */
const char *paramValues[5];
+
+ /**
+ * Name of the prepared statement to run.
+ */
const char *pname;
+
+ /**
+ * Size of values pointed to by paramValues.
+ */
int paramLengths[5];
+
+ /**
+ * Number of paramters in paramValues/paramLengths.
+ */
int nparams;
+
+ /**
+ * Current time (possible parameter), big-endian.
+ */
uint64_t bnow;
+
+ /**
+ * Key (possible parameter)
+ */
GNUNET_HashCode key;
+
+ /**
+ * Hash of value (possible parameter)
+ */
GNUNET_HashCode vhash;
+
+ /**
+ * Number of entries found so far
+ */
long long count;
+
+ /**
+ * Offset this iteration starts at.
+ */
uint64_t off;
+
+ /**
+ * Current offset to use in query, big-endian.
+ */
uint64_t blimit_off;
+
+ /**
+ * Overall number of matching entries.
+ */
unsigned long long total;
+
+ /**
+ * Expiration value of previous result (possible parameter), big-endian.
+ */
uint64_t blast_expire;
+
+ /**
+ * Row ID of last result (possible paramter), big-endian.
+ */
uint32_t blast_rowid;
+
+ /**
+ * Priority of last result (possible parameter), big-endian.
+ */
uint32_t blast_prio;
+
+ /**
+ * Type of block (possible paramter), big-endian.
+ */
uint32_t btype;
+
+ /**
+ * Flag set to GNUNET_YES to stop iteration.
+ */
int end_it;
};
*/
GNUNET_SCHEDULER_TaskIdentifier next_task;
- unsigned long long payload;
-
- unsigned int lastSync;
-
};
* the desired status code. If not, log an error, clear the
* result and return GNUNET_SYSERR.
*
+ * @param plugin global context
+ * @param ret result to check
+ * @param expected_status expected return value
+ * @param command name of SQL command that was run
+ * @param args arguments to SQL command
+ * @param line line number for error reporting
* @return GNUNET_OK if the result is acceptable
*/
static int
/**
* Run simple SQL statement (without results).
+ *
+ * @param plugin global context
+ * @param sql statement to run
+ * @param line code line for error reporting
*/
static int
pq_exec (struct Plugin *plugin,
/**
* Prepare SQL statement.
+ *
+ * @param plugin global context
+ * @param sql SQL code to prepare
+ * @param nparams number of parameters in sql
+ * @param line code line for error reporting
+ * @return GNUNET_OK on success
*/
static int
pq_prepare (struct Plugin *plugin,
/**
* @brief Get a database handle
+ *
+ * @param plugin global context
* @return GNUNET_OK on success, GNUNET_SYSERR on error
*/
static int
* Delete the row identified by the given rowid (qid
* in postgres).
*
+ * @param plugin global context
+ * @param rowid which row to delete
* @return GNUNET_OK on success
*/
static int
postgres_plugin_get_size (void *cls)
{
struct Plugin *plugin = cls;
- double ret;
+ unsigned long long total;
+ PGresult *ret;
- ret = plugin->payload;
- return (unsigned long long) (ret * 1.00);
- /* benchmarking shows XX% overhead */
+ ret = PQexecParams (plugin->dbh,
+ "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
+ 0, NULL, NULL, NULL, NULL, 1);
+ if (GNUNET_OK != check_result (plugin,
+ ret,
+ PGRES_TUPLES_OK,
+ "PQexecParams",
+ "get_size",
+ __LINE__))
+ {
+ return 0;
+ }
+ if ((PQntuples (ret) != 1) ||
+ (PQnfields (ret) != 1) ||
+ (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
+ {
+ GNUNET_break (0);
+ PQclear (ret);
+ return 0;
+ }
+ total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
+ PQclear (ret);
+ return total;
}
"PQexecPrepared", "put", __LINE__))
return GNUNET_SYSERR;
PQclear (ret);
- plugin->payload += size;
+ plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"datastore-postgres",
- "Stored %u bytes in database, new payload is %llu\n",
- (unsigned int) size,
- (unsigned long long) plugin->payload);
+ "Stored %u bytes in database\n",
+ (unsigned int) size);
#endif
return GNUNET_OK;
}
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"datastore-postgres",
- "Deleting %u bytes from database, current payload is %llu\n",
- (unsigned int) size,
- (unsigned long long) plugin->payload);
+ "Deleting %u bytes from database\n",
+ (unsigned int) size);
#endif
- GNUNET_assert (plugin->payload >= size);
- plugin->payload -= size;
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"datastore-postgres",
- "Deleted %u bytes from database, new payload is %llu\n",
- (unsigned int) size,
- (unsigned long long) plugin->payload);
+ "Deleted %u bytes from database\n",
+ (unsigned int) size);
#endif
}
}
* Call a method for each key in the database and
* call the callback method on it.
*
+ * @param plugin global context
* @param type entries of which type should be considered?
+ * @param is_asc ascending or descending iteration?
+ * @param iter_select which SELECT method should be used?
* @param iter maybe NULL (to just count); iter
* should return GNUNET_SYSERR to abort the
* iteration, GNUNET_NO to delete the entry and
* continue and GNUNET_OK to continue iterating
+ * @param iter_cls closure for 'iter'
*/
static void
postgres_iterate (struct Plugin *plugin,
}
-
/**
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
}
-
/**
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
*/
#include "platform.h"
-#include "gnunet_statistics_service.h"
#include "plugin_datastore.h"
#include <sqlite3.h>
#define DEBUG_SQLITE GNUNET_NO
-/**
- * After how many payload-changing operations
- * do we sync our statistics?
- */
-#define MAX_STAT_SYNC_LAG 50
-
-#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
/**
* Log an error message at log-level 'level' that indicates
*/
sqlite3_stmt *insertContent;
- /**
- * Handle to the statistics service.
- */
- struct GNUNET_STATISTICS_Handle *statistics;
-
- /**
- * Handle for pending get request.
- */
- struct GNUNET_STATISTICS_GetHandle *stat_get;
-
/**
* Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
*/
* Pending task with scheduler for running the next request.
*/
GNUNET_SCHEDULER_TaskIdentifier next_task;
-
- /**
- * How much data are we currently storing
- * in the database?
- */
- unsigned long long payload;
-
- /**
- * Number of updates that were made to the
- * payload value since we last synchronized
- * it with the statistics service.
- */
- unsigned int lastSync;
/**
* Should the database be dropped on shutdown?
*/
int drop_on_shutdown;
- /**
- * Did we get an answer from statistics?
- */
- int stats_worked;
};
return GNUNET_SYSERR;
}
/* database is new or got deleted, reset payload to zero! */
- if (plugin->stat_get != NULL)
- {
- GNUNET_STATISTICS_get_cancel (plugin->stat_get);
- plugin->stat_get = NULL;
- }
- plugin->payload = 0;
+ plugin->env->duc (plugin->env->cls, 0);
}
plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
#ifdef ENABLE_NLS
}
-/**
- * Synchronize our utilization statistics with the
- * statistics service.
- * @param plugin the plugin context (state for this module)
- */
-static void
-sync_stats (struct Plugin *plugin)
-{
- GNUNET_STATISTICS_set (plugin->statistics,
- QUOTA_STAT_NAME,
- plugin->payload,
- GNUNET_YES);
- plugin->lastSync = 0;
-}
-
-
/**
* Shutdown database connection and associate data
* structures.
static void
database_shutdown (struct Plugin *plugin)
{
- if (plugin->lastSync > 0)
- sync_stats (plugin);
if (plugin->updPrio != NULL)
sqlite3_finalize (plugin->updPrio);
if (plugin->insertContent != NULL)
}
-/**
- * Get an estimate of how much space the database is
- * currently using.
- *
- * @param cls our plugin context
- * @return number of bytes used on disk
- */
-static unsigned long long sqlite_plugin_get_size (void *cls)
-{
- struct Plugin *plugin = cls;
- return plugin->payload;
-}
-
-
/**
* Delete the database entry with the given
* row identifier.
if ( (ret == GNUNET_NO) &&
(GNUNET_OK == delete_by_rowid (plugin, rowid)) )
{
- if (plugin->payload >= size + GNUNET_DATASTORE_ENTRY_OVERHEAD)
- plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Datastore payload inaccurate, please fix and restart!\n"));
- plugin->lastSync++;
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
#if DEBUG_SQLITE
- if (ret == GNUNET_NO)
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Removed entry %llu (%u bytes), new payload is %llu\n",
- (unsigned long long) rowid,
- size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
- plugin->payload);
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ "sqlite",
+ "Removed entry %llu (%u bytes)\n",
+ (unsigned long long) rowid,
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
#endif
- if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
- sync_stats (plugin);
}
}
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- plugin->lastSync++;
- plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
+ plugin->env->duc (plugin->env->cls,
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
#if DEBUG_SQLITE
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
- "Stored new entry (%u bytes), new payload is %llu\n",
- size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
- plugin->payload);
+ "Stored new entry (%u bytes)\n",
+ size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
#endif
- if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
- sync_stats (plugin);
return GNUNET_OK;
}
}
-/**
- * Callback function to process statistic values.
- *
- * @param cls closure
- * @param subsystem name of subsystem that created the statistic
- * @param name the name of the datum
- * @param value the current value
- * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
- * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
- */
-static int
-process_stat_in (void *cls,
- const char *subsystem,
- const char *name,
- uint64_t value,
- int is_persistent)
-{
- struct Plugin *plugin = cls;
-
- plugin->stats_worked = GNUNET_YES;
- plugin->payload += value;
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Notification from statistics about existing payload (%llu), new payload is %llu\n",
- value,
- plugin->payload);
-#endif
- return GNUNET_OK;
-}
-
-
-static void
-process_stat_done (void *cls,
- int success)
+static unsigned long long
+sqlite_plugin_get_size (void *cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
uint64_t pages;
uint64_t page_size;
- plugin->stat_get = NULL;
- if ( (plugin->stats_worked == GNUNET_NO) &&
- (SQLITE_VERSION_NUMBER >= 3006000) )
- {
- CHECK (SQLITE_OK ==
- sqlite3_exec (plugin->dbh,
- "VACUUM", NULL, NULL, ENULL));
- CHECK (SQLITE_OK ==
- sqlite3_exec (plugin->dbh,
- "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
- CHECK (SQLITE_OK ==
- sq_prepare (plugin->dbh,
- "PRAGMA page_count",
- &stmt));
- if (SQLITE_ROW ==
- sqlite3_step (stmt))
- pages = sqlite3_column_int64 (stmt, 0);
- else
- pages = 0;
- sqlite3_finalize (stmt);
- CHECK (SQLITE_OK ==
- sq_prepare (plugin->dbh,
- "PRAGMA page_size",
- &stmt));
- CHECK (SQLITE_ROW ==
- sqlite3_step (stmt));
- page_size = sqlite3_column_int64 (stmt, 0);
- sqlite3_finalize (stmt);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
- (unsigned long long) pages,
- (unsigned long long) page_size);
- plugin->payload = pages * page_size;
+ if (SQLITE_VERSION_NUMBER < 3006000)
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
+ "datastore-sqlite",
+ _("sqlite version to old to determine size, assuming zero\n"));
+ return 0;
}
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "VACUUM", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sq_prepare (plugin->dbh,
+ "PRAGMA page_count",
+ &stmt));
+ if (SQLITE_ROW ==
+ sqlite3_step (stmt))
+ pages = sqlite3_column_int64 (stmt, 0);
+ else
+ pages = 0;
+ sqlite3_finalize (stmt);
+ CHECK (SQLITE_OK ==
+ sq_prepare (plugin->dbh,
+ "PRAGMA page_size",
+ &stmt));
+ CHECK (SQLITE_ROW ==
+ sqlite3_step (stmt));
+ page_size = sqlite3_column_int64 (stmt, 0);
+ sqlite3_finalize (stmt);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
+ (unsigned long long) pages,
+ (unsigned long long) page_size);
+ return pages * page_size;
}
return NULL; /* can only initialize once! */
memset (&plugin, 0, sizeof(struct Plugin));
plugin.env = env;
- plugin.statistics = GNUNET_STATISTICS_create (env->sched,
- "ds-sqlite",
- env->cfg);
- plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics,
- "ds-sqlite",
- QUOTA_STAT_NAME,
- GNUNET_TIME_UNIT_SECONDS,
- &process_stat_done,
- &process_stat_in,
- &plugin);
if (GNUNET_OK !=
database_setup (env->cfg, &plugin))
{
struct GNUNET_DATASTORE_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
- if (plugin->stat_get != NULL)
- {
- GNUNET_STATISTICS_get_cancel (plugin->stat_get);
- plugin->stat_get = NULL;
- }
if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (plugin->env->sched,
if (plugin->drop_on_shutdown)
fn = GNUNET_strdup (plugin->fn);
database_shutdown (plugin);
- GNUNET_STATISTICS_destroy (plugin->statistics,
- GNUNET_NO);
plugin->env = NULL;
- plugin->payload = 0;
GNUNET_free (api);
if (fn != NULL)
{
* && (delay-ready
* || any-rs-ready
* || any-ws-ready
- * || (shutdown-active && run-on-shutdown) )
+ * || shutdown-active)
* </code>
*
* @param sched scheduler to use
check_PROGRAMS = \
- test_peerinfo_api
+ test_peerinfo_api \
+ perf_peerinfo_api
if !DISABLE_TEST_RUN
TESTS = $(check_PROGRAMS)
$(top_builddir)/src/peerinfo/libgnunetpeerinfo.la \
$(top_builddir)/src/util/libgnunetutil.la
+perf_peerinfo_api_SOURCES = \
+ perf_peerinfo_api.c
+perf_peerinfo_api_LDADD = \
+ $(top_builddir)/src/hello/libgnunethello.la \
+ $(top_builddir)/src/peerinfo/libgnunetpeerinfo.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
EXTRA_DIST = \
test_peerinfo_api_data.conf
#include "platform.h"
#include "gnunet_crypto_lib.h"
+#include "gnunet_container_lib.h"
#include "gnunet_disk_lib.h"
#include "gnunet_hello_lib.h"
#include "gnunet_protocols.h"
struct HostEntry
{
- /**
- * This is a linked list.
- */
- struct HostEntry *next;
-
/**
* Identity of the peer.
*/
/**
- * The in-memory list of known hosts.
+ * The in-memory list of known hosts, mapping of
+ * host IDs to 'struct HostEntry*' values.
*/
-static struct HostEntry *hosts;
+static struct GNUNET_CONTAINER_MultiHashMap *hostmap;
/**
* Clients to immediately notify about all changes.
}
-/**
- * Find the host entry for the given peer. FIXME: replace by hash map!
- * @return NULL if not found
- */
-static struct HostEntry *
-lookup_host_entry (const struct GNUNET_PeerIdentity *id)
-{
- struct HostEntry *pos;
-
- pos = hosts;
- while ((pos != NULL) &&
- (0 !=
- memcmp (id, &pos->identity, sizeof (struct GNUNET_PeerIdentity))))
- pos = pos->next;
- return pos;
-}
-
-
/**
* Broadcast information about the given entry to all
* clients that care.
struct GNUNET_TIME_Absolute now;
char *fn;
- entry = lookup_host_entry (identity);
+ entry = GNUNET_CONTAINER_multihashmap_get (hostmap,
+ &identity->hashPubKey);
if (entry != NULL)
return;
GNUNET_STATISTICS_update (stats,
}
}
GNUNET_free (fn);
- entry->next = hosts;
- hosts = entry;
+ GNUNET_CONTAINER_multihashmap_put (hostmap,
+ &identity->hashPubKey,
+ entry,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
notify_all (entry);
}
static int
-hosts_directory_scan_callback (void *cls, const char *fullname)
+hosts_directory_scan_callback (void *cls,
+ const char *fullname)
{
unsigned int *matched = cls;
struct GNUNET_PeerIdentity identity;
struct GNUNET_TIME_Absolute delta;
add_host_to_known_hosts (peer);
- host = lookup_host_entry (peer);
+ host = GNUNET_CONTAINER_multihashmap_get (hostmap,
+ &peer->hashPubKey);
GNUNET_assert (host != NULL);
if (host->hello == NULL)
{
}
+
/**
- * Do transmit info either for only the host matching the given
- * argument or for all known hosts.
+ * Do transmit info about peer to given host.
*
- * @param only NULL to hit all hosts, otherwise specifies a particular target
- * @param client who is making the request (and will thus receive our confirmation)
+ * @param cls NULL to hit all hosts, otherwise specifies a particular target
+ * @param key hostID
+ * @param value information to transmit
+ * @return GNUNET_YES (continue to iterate)
*/
-static void
-send_to_each_host (const struct GNUNET_PeerIdentity *only,
- struct GNUNET_SERVER_Client *client)
+static int
+add_to_tc (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
{
- struct HostEntry *pos;
+ struct GNUNET_SERVER_TransmitContext *tc = cls;
+ struct HostEntry *pos = value;
struct InfoMessage *im;
uint16_t hs;
char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
- struct GNUNET_SERVER_TransmitContext *tc;
- int match;
- tc = GNUNET_SERVER_transmit_context_create (client);
- match = GNUNET_NO;
- pos = hosts;
- while (pos != NULL)
+ hs = 0;
+ im = (struct InfoMessage *) buf;
+ if (pos->hello != NULL)
{
- if ((only == NULL) ||
- (0 ==
- memcmp (only, &pos->identity,
- sizeof (struct GNUNET_PeerIdentity))))
- {
- hs = 0;
- im = (struct InfoMessage *) buf;
- if (pos->hello != NULL)
- {
- hs = GNUNET_HELLO_size (pos->hello);
- GNUNET_assert (hs <
- GNUNET_SERVER_MAX_MESSAGE_SIZE -
- sizeof (struct InfoMessage));
- memcpy (&im[1], pos->hello, hs);
- match = GNUNET_YES;
- }
- im->header.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_INFO);
- im->header.size = htons (sizeof (struct InfoMessage) + hs);
- im->reserved = htonl (0);
- im->peer = pos->identity;
- GNUNET_SERVER_transmit_context_append_message (tc,
- &im->header);
- }
- pos = pos->next;
+ hs = GNUNET_HELLO_size (pos->hello);
+ GNUNET_assert (hs <
+ GNUNET_SERVER_MAX_MESSAGE_SIZE -
+ sizeof (struct InfoMessage));
+ memcpy (&im[1], pos->hello, hs);
}
- if ( (only != NULL) &&
- (match == GNUNET_NO) )
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "No `%s' message was found for peer `%4s'\n",
- "HELLO",
- GNUNET_i2s (only));
- GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0,
- GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END);
- GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_INFO);
+ im->header.size = htons (sizeof (struct InfoMessage) + hs);
+ im->reserved = htonl (0);
+ im->peer = pos->identity;
+ GNUNET_SERVER_transmit_context_append_message (tc,
+ &im->header);
+ return GNUNET_YES;
}
now = GNUNET_TIME_absolute_get ();
GNUNET_DISK_directory_scan (networkIdDirectory,
&discard_hosts_helper, &now);
-
GNUNET_SCHEDULER_add_delayed (tc->sched,
DATA_HOST_CLEAN_FREQ,
&cron_clean_data_hosts, NULL);
const struct GNUNET_MessageHeader *message)
{
const struct ListPeerMessage *lpm;
+ struct GNUNET_SERVER_TransmitContext *tc;
lpm = (const struct ListPeerMessage *) message;
#if DEBUG_PEERINFO
"GET",
GNUNET_i2s (&lpm->peer));
#endif
- send_to_each_host (&lpm->peer, client);
+ tc = GNUNET_SERVER_transmit_context_create (client);
+ GNUNET_CONTAINER_multihashmap_get_multiple (hostmap,
+ &lpm->peer.hashPubKey,
+ &add_to_tc,
+ tc);
+ GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0,
+ GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END);
+ GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
}
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
+ struct GNUNET_SERVER_TransmitContext *tc;
+
#if DEBUG_PEERINFO
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s' message received\n",
"GET_ALL");
#endif
- send_to_each_host (NULL, client);
+ tc = GNUNET_SERVER_transmit_context_create (client);
+ GNUNET_CONTAINER_multihashmap_iterate (hostmap,
+ &add_to_tc,
+ tc);
+ GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0,
+ GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END);
+ GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+static int
+do_notify_entry (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_SERVER_Client *client = cls;
+ struct HostEntry *he = value;
+ struct InfoMessage *msg;
+
+ msg = make_info_message (he);
+ GNUNET_SERVER_notification_context_unicast (notify_list,
+ client,
+ &msg->header,
+ GNUNET_NO);
+ GNUNET_free (msg);
+ return GNUNET_YES;
}
*/
static void
handle_notify (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
- struct InfoMessage *msg;
- struct HostEntry *pos;
-
#if DEBUG_PEERINFO
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s' message received\n",
#endif
GNUNET_SERVER_notification_context_add (notify_list,
client);
- pos = hosts;
- while (NULL != pos)
- {
- msg = make_info_message (pos);
- GNUNET_SERVER_notification_context_unicast (notify_list,
- client,
- &msg->header,
- GNUNET_NO);
- GNUNET_free (msg);
- pos = pos->next;
- }
+ GNUNET_CONTAINER_multihashmap_iterate (hostmap,
+ &do_notify_entry,
+ client);
}
+static int
+free_host_entry (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct HostEntry *he = value;
+ GNUNET_free (he);
+ return GNUNET_YES;
+}
+
/**
* Clean up our state. Called during shutdown.
*
{
GNUNET_SERVER_notification_context_destroy (notify_list);
notify_list = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (hostmap,
+ &free_host_entry,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (hostmap);
if (stats != NULL)
{
GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
sizeof (struct GNUNET_MessageHeader)},
{NULL, NULL, 0, 0}
};
+
+ hostmap = GNUNET_CONTAINER_multihashmap_create (1024);
stats = GNUNET_STATISTICS_create (sched, "peerinfo", cfg);
notify_list = GNUNET_SERVER_notification_context_create (server, 0);
GNUNET_assert (GNUNET_OK ==
--- /dev/null
+/*
+ This file is part of GNUnet.
+ (C) 2004, 2009 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file peerinfo/test_peerinfo_hammer.c
+ * @brief testcase for peerinfo_api.c, hopefully hammer the peerinfo service
+ * @author Nathan Evans
+ */
+
+#include "platform.h"
+#include "gnunet_hello_lib.h"
+#include "gnunet_getopt_lib.h"
+#include "gnunet_os_lib.h"
+#include "gnunet_peerinfo_service.h"
+#include "gnunet_program_lib.h"
+#include "gnunet_time_lib.h"
+#include "peerinfo.h"
+
+#define START_SERVICE 1
+
+#define NUM_REQUESTS 5000
+
+static struct GNUNET_SCHEDULER_Handle *sched;
+
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+static struct GNUNET_PEERINFO_IteratorContext *ic[NUM_REQUESTS];
+
+static struct GNUNET_PEERINFO_Handle *h;
+
+static unsigned int numpeers;
+
+static int
+check_it (void *cls,
+ const char *tname,
+ struct GNUNET_TIME_Absolute expiration,
+ const void *addr, uint16_t addrlen)
+{
+ if (addrlen > 0)
+ {
+#if DEBUG
+ fprintf (stderr,
+ "name: %s, addr: %s\n",
+ tname,
+ (const char*) addr);
+#endif
+ }
+ return GNUNET_OK;
+}
+
+
+static size_t
+address_generator (void *cls, size_t max, void *buf)
+{
+ size_t *agc = cls;
+ size_t ret;
+ char *address;
+
+ if (*agc == 0)
+ return 0;
+
+ GNUNET_asprintf(&address, "Address%d", *agc);
+
+ ret = GNUNET_HELLO_add_address ("peerinfotest",
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_TIME_UNIT_HOURS), address, strlen(address) + 1,
+ buf, max);
+ *agc = 0;
+ return ret;
+}
+
+
+static void
+add_peer (size_t i)
+{
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
+ struct GNUNET_PeerIdentity pid;
+ struct GNUNET_HELLO_Message *h2;
+ size_t agc;
+
+ agc = 2;
+ memset (&pkey, i, sizeof (pkey));
+ GNUNET_CRYPTO_hash (&pkey, sizeof (pkey), &pid.hashPubKey);
+ h2 = GNUNET_HELLO_create (&pkey, &address_generator, &i);
+ GNUNET_PEERINFO_add_peer (h, h2);
+ GNUNET_free (h2);
+}
+
+
+static void
+process (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_HELLO_Message *hello)
+{
+ if (peer == NULL)
+ {
+#if DEBUG
+ fprintf(stderr, "Process received NULL response\n");
+#endif
+ }
+ else
+ {
+#if DEBUG
+ fprintf(stderr, "Processed a peer\n");
+#endif
+ numpeers++;
+ if (0 && (hello != NULL))
+ GNUNET_HELLO_iterate_addresses (hello, GNUNET_NO, &check_it, NULL);
+
+ }
+}
+
+
+static void
+run (void *cls,
+ struct GNUNET_SCHEDULER_Handle *s,
+ char *const *args,
+ const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *c)
+{
+ size_t i;
+ sched = s;
+ cfg = c;
+ h = GNUNET_PEERINFO_connect (sched, cfg);
+
+ for (i = 0; i < NUM_REQUESTS; i++)
+ {
+ add_peer (i);
+ ic[i] = GNUNET_PEERINFO_iterate (h,
+ NULL,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 30),
+ &process, cls);
+ }
+ fprintf (stderr,
+ "Issued %u requests\n",
+ NUM_REQUESTS);
+}
+
+static int
+check ()
+{
+ int ok = 3;
+ char *const argv[] = { "test-peerinfo-hammer",
+ "-c",
+ "test_peerinfo_api_data.conf",
+#if DEBUG_PEERINFO
+ "-L", "DEBUG",
+#endif
+ NULL
+ };
+#if START_SERVICE
+ pid_t pid;
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+ pid = GNUNET_OS_start_process (NULL, NULL, "gnunet-service-peerinfo",
+ "gnunet-service-peerinfo",
+#if DEBUG_PEERINFO
+ "-L", "DEBUG",
+#endif
+ "-c", "test_peerinfo_api_data.conf", NULL);
+#endif
+ GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
+ argv, "test-peerinfo-api", "nohelp",
+ options, &run, &ok);
+ fprintf (stderr,
+ "Processed %u/%u peers\n",
+ numpeers,
+ NUM_REQUESTS);
+#if START_SERVICE
+ if (0 != PLIBC_KILL (pid, SIGTERM))
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
+ ok = 1;
+ }
+ GNUNET_OS_process_wait(pid);
+#endif
+ return ok;
+}
+
+
+int
+main (int argc, char *argv[])
+{
+ int ret = 0;
+
+ GNUNET_log_setup ("test_peerinfo_api",
+#if DEBUG_PEERINFO
+ "DEBUG",
+#else
+ "WARNING",
+#endif
+ NULL);
+ ret = check ();
+ GNUNET_DISK_directory_remove ("/tmp/test-gnunet-peerinfo");
+ return ret;
+}
+
+/* end of test_peerinfo_hammer.c */
#endif
NULL);
ret = check ();
- GNUNET_DISK_directory_remove ("/tmp/test-gnunetd-peerinfo");
+ GNUNET_DISK_directory_remove ("/tmp/test-gnunet-peerinfo");
return ret;
}
[PATHS]
-SERVICEHOME = /tmp/test-gnunetd-peerinfo/
+SERVICEHOME = /tmp/test-gnunet-peerinfo/
[peerinfo]
PORT = 22354
+DEBUG = NO
GNUNET_NETWORK_fdset_handle_set (struct GNUNET_NETWORK_FDSet *fds,
const struct GNUNET_DISK_FileHandle *h)
{
-
#ifdef MINGW
HANDLE hw;
GNUNET_DISK_internal_file_handle_ (h, &hw, sizeof (HANDLE));
*/
struct Task *pending;
+ /**
+ * List of tasks waiting ONLY for a timeout event.
+ * Sorted by timeout (earliest first). Used so that
+ * we do not traverse the list of these tasks when
+ * building select sets (we just look at the head
+ * to determine the respective timeout ONCE).
+ */
+ struct Task *pending_timeout;
+
+ /**
+ * Last inserted task waiting ONLY for a timeout event.
+ * Used to (heuristically) speed up insertion.
+ */
+ struct Task *pending_timeout_last;
+
/**
* ID of the task that is running right now.
*/
return GNUNET_NO;
min = -1; /* maximum value */
pos = sched->pending;
+ while (pos != NULL)
+ {
+ if (pos->id == id)
+ return GNUNET_YES;
+ if (pos->id < min)
+ min = pos->id;
+ pos = pos->next;
+ }
+ pos = sched->pending_timeout;
while (pos != NULL)
{
if (pos->id == id)
struct GNUNET_TIME_Relative *timeout)
{
struct Task *pos;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative to;
+ now = GNUNET_TIME_absolute_get ();
+ pos = sched->pending_timeout;
+ if (pos != NULL)
+ {
+ to = GNUNET_TIME_absolute_get_difference (now, pos->timeout);
+ if (timeout->value > to.value)
+ *timeout = to;
+ if (pos->reason != 0)
+ *timeout = GNUNET_TIME_UNIT_ZERO;
+ }
pos = sched->pending;
while (pos != NULL)
{
pos = pos->next;
continue;
}
-
if (pos->timeout.value != GNUNET_TIME_UNIT_FOREVER_ABS.value)
{
- struct GNUNET_TIME_Relative to;
-
- to = GNUNET_TIME_absolute_get_remaining (pos->timeout);
+ to = GNUNET_TIME_absolute_get_difference (now, pos->timeout);
if (timeout->value > to.value)
*timeout = to;
}
const struct GNUNET_NETWORK_FDSet *rs,
const struct GNUNET_NETWORK_FDSet *ws)
{
+ enum GNUNET_SCHEDULER_Reason reason;
+
+ reason = task->reason;
if (now.value >= task->timeout.value)
- task->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
- if ( (0 == (task->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
- ( (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd)) ||
+ reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
+ if ( (0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
+ ( ( (task->read_fd != -1) &&
+ (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd)) ) ||
(set_overlaps (rs, task->read_set) ) ) )
- task->reason |= GNUNET_SCHEDULER_REASON_READ_READY;
- if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
- ( (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd)) ||
+ reason |= GNUNET_SCHEDULER_REASON_READ_READY;
+ if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
+ ( ( (task->write_fd != -1) &&
+ (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd)) ) ||
(set_overlaps (ws, task->write_set) ) ) )
- task->reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
- if (task->reason == 0)
+ reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
+ if (reason == 0)
return GNUNET_NO; /* not ready */
if (task->prereq_id != GNUNET_SCHEDULER_NO_TASK)
{
if (GNUNET_YES == is_pending (sched, task->prereq_id))
- return GNUNET_NO; /* prereq waiting */
- task->reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
+ {
+ task->reason = reason;
+ return GNUNET_NO; /* prereq waiting */
+ }
+ reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
}
+ task->reason = reason;
return GNUNET_YES;
}
* @param task task ready for execution
*/
static void
-queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle, struct Task *task)
+queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle,
+ struct Task *task)
{
enum GNUNET_SCHEDULER_Priority p = task->priority;
if (0 != (task->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
now = GNUNET_TIME_absolute_get ();
prev = NULL;
+ pos = handle->pending_timeout;
+ while (pos != NULL)
+ {
+ next = pos->next;
+ if (now.value >= pos->timeout.value)
+ pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
+ if (0 == pos->reason)
+ break;
+ handle->pending_timeout = next;
+ if (handle->pending_timeout_last == pos)
+ handle->pending_timeout_last = NULL;
+ queue_ready_task (handle, pos);
+ pos = next;
+ }
pos = handle->pending;
while (pos != NULL)
{
struct Task *pos;
int i;
+ pos = sched->pending_timeout;
+ while (pos != NULL)
+ {
+ pos->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
+ /* we don't move the task into the ready queue yet; check_ready
+ will do that later, possibly adding additional
+ readiness-factors */
+ pos = pos->next;
+ }
pos = sched->pending;
while (pos != NULL)
{
destroy_task (pos);
sched->tasks_run++;
}
- while ((sched->pending == NULL) || (p >= sched->max_priority_added));
+ while ( (sched->pending == NULL) || (p >= sched->max_priority_added) );
}
/**
GNUNET_SCHEDULER_REASON_STARTUP);
last_tr = 0;
busy_wait_warning = 0;
- while ((sched.pending != NULL) || (sched.ready_count > 0))
+ while ((sched.pending != NULL) ||
+ (sched.pending_timeout != NULL) ||
+ (sched.ready_count > 0))
{
GNUNET_NETWORK_fdset_zero (rs);
GNUNET_NETWORK_fdset_zero (ws);
struct Task *t;
struct Task *prev;
enum GNUNET_SCHEDULER_Priority p;
+ int to;
void *ret;
+ to = 0;
prev = NULL;
t = sched->pending;
while (t != NULL)
prev = t;
t = t->next;
}
+ if (t == NULL)
+ {
+ prev = NULL;
+ to = 1;
+ t = sched->pending_timeout;
+ while (t != NULL)
+ {
+ if (t->id == task)
+ break;
+ prev = t;
+ t = t->next;
+ }
+ if (sched->pending_timeout_last == t)
+ sched->pending_timeout_last = NULL;
+ }
p = 0;
while (t == NULL)
{
if (prev == NULL)
{
if (p == 0)
- sched->pending = t->next;
+ {
+ if (to == 0)
+ {
+ sched->pending = t->next;
+ }
+ else
+ {
+ sched->pending_timeout = t->next;
+ }
+ }
else
- sched->ready[p] = t->next;
+ {
+ sched->ready[p] = t->next;
+ }
}
else
- prev->next = t->next;
+ {
+ prev->next = t->next;
+ }
ret = t->callback_cls;
#if DEBUG_TASKS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
struct GNUNET_TIME_Relative delay,
GNUNET_SCHEDULER_Task task, void *task_cls)
{
+#if 1
+ /* new, optimized version */
+ struct Task *t;
+ struct Task *pos;
+ struct Task *prev;
+#if EXECINFO
+ void *backtrace_array[MAX_TRACE_DEPTH];
+#endif
+
+ GNUNET_assert (NULL != task);
+ t = GNUNET_malloc (sizeof (struct Task));
+ t->callback = task;
+ t->callback_cls = task_cls;
+#if EXECINFO
+ t->num_backtrace_strings = backtrace(backtrace_array, MAX_TRACE_DEPTH);
+ t->backtrace_strings = backtrace_symbols(backtrace_array, t->num_backtrace_strings);
+#endif
+ t->read_fd = -1;
+ t->write_fd = -1;
+ t->id = ++sched->last_id;
+#if PROFILE_DELAYS
+ t->start_time = GNUNET_TIME_absolute_get ();
+#endif
+ t->timeout = GNUNET_TIME_relative_to_absolute (delay);
+ t->priority = sched->current_priority;
+ /* try tail first (optimization in case we are
+ appending to a long list of tasks with timeouts) */
+ prev = sched->pending_timeout_last;
+ if (prev != NULL)
+ {
+ if (prev->timeout.value > t->timeout.value)
+ prev = NULL;
+ else
+ pos = prev->next; /* heuristic success! */
+ }
+ if (prev == NULL)
+ {
+ /* heuristic failed, do traversal of timeout list */
+ pos = sched->pending_timeout;
+ }
+ while ( (pos != NULL) &&
+ ( (pos->timeout.value <= t->timeout.value) ||
+ (pos->reason != 0) ) )
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ if (prev == NULL)
+ sched->pending_timeout = t;
+ else
+ prev->next = t;
+ t->next = pos;
+ /* hyper-optimization... */
+ sched->pending_timeout_last = t;
+
+#if DEBUG_TASKS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding task: %llu / %p\n", t->id, t->callback_cls);
+#endif
+#if EXECINFO
+ int i;
+
+ for (i=0;i<t->num_backtrace_strings;i++)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Task %u trace %d: %s\n",
+ t->id,
+ i,
+ t->backtrace_strings[i]);
+#endif
+ return t->id;
+
+#else
+ /* unoptimized version */
return GNUNET_SCHEDULER_add_select (sched,
GNUNET_SCHEDULER_PRIORITY_KEEP,
GNUNET_SCHEDULER_NO_TASK, delay,
NULL, NULL, task, task_cls);
+#endif
}
* && (delay-ready
* || any-rs-ready
* || any-ws-ready
- * || (shutdown-active && run-on-shutdown) )
+ * || shutdown-active )
* </code>
*
* @param sched scheduler to use