/*
This file is part of GNUnet.
- (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
+ (C) 2004, 2005, 2006, 2007, 2009, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_datastore_plugin.h"
+#include <gauger.h>
#define VERBOSE GNUNET_NO
*/
#define PUT_10 (MAX_SIZE / 32 / 1024 / ITERATIONS)
+static char category[256];
+
+static unsigned int hits[PUT_10 / 8 + 1];
+
static unsigned long long stored_bytes;
static unsigned long long stored_entries;
enum RunPhase
{
- RP_DONE = 0,
+ RP_ERROR = 0,
RP_PUT,
- RP_LP_GET,
- RP_AE_GET,
- RP_ZA_GET
+ RP_REP_GET,
+ RP_ZA_GET,
+ RP_EXP_GET,
+ RP_DONE
};
struct GNUNET_TIME_Absolute end;
const struct GNUNET_CONFIGURATION_Handle *cfg;
struct GNUNET_DATASTORE_PluginFunctions * api;
- const char *msg;
enum RunPhase phase;
unsigned int cnt;
};
/* most content is 32k */
size = 32 * 1024;
-
if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0) /* but some of it is less! */
- size = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 32 * 1024);
+ size = 8 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 32 * 1024);
size = size - (size & 7); /* always multiple of 8 */
/* generate random key */
if (i > 255)
memset (value, i - 255, size / 2);
value[0] = k;
+ memcpy (&value[4], &i, sizeof (i));
msg = NULL;
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
if (GNUNET_OK != api->put (api->cls,
&key,
size,
value,
- i /* type */,
+ 1 + i % 4 /* type */,
prio,
- i /* anonymity */,
+ i % 4 /* anonymity */,
0 /* replication */,
GNUNET_TIME_relative_to_absolute
(GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
static int
-iterateDummy (void *cls,
+iterate_zeros (void *cls,
void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
uint64_t uid)
{
struct CpsRunContext *crc = cls;
-
+ int i;
+ const char *cdata = data;
+
if (key == NULL)
{
+ char buf[256];
+ unsigned int bc;
+
+ bc = 0;
+ for (i = 0;i<PUT_10;i++)
+ if (0 != (hits[i/8] & (1 << (i % 8))))
+ bc++;
+
crc->end = GNUNET_TIME_absolute_get();
- printf (crc->msg,
- crc->i,
+ GNUNET_snprintf (buf, sizeof (buf),
+ "Iteration over %u zero-anonymity items",
+ crc->cnt);
+ printf ("%s took %llu ms yielding %u/%u items\n",
+ buf,
(unsigned long long) (crc->end.abs_value - crc->start.abs_value),
+ bc,
crc->cnt);
- if (crc->phase != RP_ZA_GET)
- {
- crc->phase++;
- }
- else
+ GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
+ memset (hits, 0, sizeof (hits));
+ if ( (int) (PUT_10 / 4 - crc->cnt) > 2)
{
- if (crc->i == ITERATIONS)
- crc->phase = RP_DONE;
- else
- crc->phase = RP_PUT;
+ fprintf (stderr,
+ "Got %d items, expected %d\n",
+ (int) crc->cnt, (int) PUT_10 / 4);
+ GNUNET_break (0);
+ crc->phase = RP_ERROR;
}
+ crc->phase++;
crc->cnt = 0;
crc->start = GNUNET_TIME_absolute_get ();
GNUNET_SCHEDULER_add_now (&test, crc);
return GNUNET_OK;
}
-#if VERBOSE
+ GNUNET_assert (size >= 8);
+ memcpy (&i, &cdata[4], sizeof (i));
+ hits[i/8] |= (1 << (i % 8));
+
+#if VERBOSE
fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
type, priority, size,
(unsigned long long) expiration.abs_value);
}
+static int
+expiration_get (void *cls,
+ void *next_cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration,
+ uint64_t uid)
+{
+ struct CpsRunContext *crc = cls;
+ int i;
+ const char *cdata = data;
+
+ GNUNET_assert (size >= 8);
+ memcpy (&i, &cdata[4], sizeof (i));
+ hits[i/8] |= (1 << (i % 8));
+ crc->cnt++;
+ if (PUT_10 == crc->cnt)
+ {
+ char buf[256];
+ unsigned int bc;
+
+ bc = 0;
+ for (i = 0;i<PUT_10;i++)
+ if (0 != (hits[i/8] & (1 << (i % 8))))
+ bc++;
+
+ crc->end = GNUNET_TIME_absolute_get();
+ GNUNET_snprintf (buf, sizeof (buf),
+ "Execution of %u expiration+deletion-GET requests",
+ PUT_10);
+ printf ("%s took %llu ms yielding %u/%u items\n",
+ buf,
+ (unsigned long long) (crc->end.abs_value - crc->start.abs_value),
+ bc,
+ (unsigned int) PUT_10);
+ GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
+ memset (hits, 0, sizeof (hits));
+ crc->phase++;
+ crc->cnt = 0;
+ crc->start = GNUNET_TIME_absolute_get ();
+ }
+ GNUNET_SCHEDULER_add_now (&test, crc);
+ return GNUNET_NO;
+}
+
static int
-dummy_get (void *cls,
- void *next_cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration,
- uint64_t uid)
+replication_get (void *cls,
+ void *next_cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration,
+ uint64_t uid)
{
struct CpsRunContext *crc = cls;
+ int i;
+ const char *cdata = data;
+ GNUNET_assert (NULL != key);
+ GNUNET_assert (size >= 8);
+ memcpy (&i, &cdata[4], sizeof (i));
+ hits[i/8] |= (1 << (i % 8));
crc->cnt++;
- if (1000 == crc->cnt)
+ if (PUT_10 == crc->cnt)
{
+ char buf[256];
+ unsigned int bc;
+
+ bc = 0;
+ for (i = 0;i<PUT_10;i++)
+ if (0 != (hits[i/8] & (1 << (i % 8))))
+ bc++;
+
crc->end = GNUNET_TIME_absolute_get();
- printf (crc->msg,
- crc->i,
+ GNUNET_snprintf (buf, sizeof (buf),
+ "Execution of %u replication-GET requests",
+ PUT_10);
+ printf ("%s took %llu ms yielding %u/%u items\n",
+ buf,
(unsigned long long) (crc->end.abs_value - crc->start.abs_value),
- crc->cnt);
+ bc,
+ (unsigned int) PUT_10);
+ GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
+ memset (hits, 0, sizeof (hits));
crc->phase++;
- crc->cnt = 0;
+ crc->cnt = 0;
crc->start = GNUNET_TIME_absolute_get ();
}
+
GNUNET_SCHEDULER_add_now (&test, crc);
return GNUNET_OK;
}
+
/**
* Function called when the service shuts
* down. Unloads our datastore plugin.
int j;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- {
- crc->phase = RP_DONE;
- ok = 1;
- }
+ crc->phase = RP_ERROR;
switch (crc->phase)
{
+ case RP_ERROR:
+ GNUNET_break (0);
+ crc->api->drop (crc->api->cls);
+ ok = 1;
+ GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
+ &cleaning_task, crc);
+ break;
case RP_PUT:
crc->start = GNUNET_TIME_absolute_get ();
for (j=0;j<PUT_10;j++)
putValue (crc->api, j, crc->i);
crc->end = GNUNET_TIME_absolute_get ();
- printf ("%3u insertion took %20llums for %u\n",
- crc->i,
- (unsigned long long) (crc->end.abs_value - crc->start.abs_value),
- (unsigned int) PUT_10);
+ {
+ char buf[256];
+
+ GNUNET_snprintf (buf, sizeof (buf),
+ "Execution of %u PUT requests",
+ PUT_10);
+ printf ("%s took %llu ms\n",
+ buf,
+ (unsigned long long) (crc->end.abs_value - crc->start.abs_value));
+ GAUGER (category,
+ buf, crc->end.abs_value - crc->start.abs_value, "ms");
+ }
crc->i++;
crc->start = GNUNET_TIME_absolute_get ();
- crc->phase = RP_LP_GET;
- GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK,
- &test, crc);
+ crc->phase++;
+ GNUNET_SCHEDULER_add_now (&test, crc);
break;
- case RP_LP_GET:
- crc->msg = "%3u replication iteration took %20llums for %u\n";
+ case RP_REP_GET:
crc->api->replication_get (crc->api->cls,
- &dummy_get,
+ &replication_get,
crc);
break;
- case RP_AE_GET:
- crc->msg = "%3u expiration iteration took %20llums for %u\n";
- crc->api->expiration_get (crc->api->cls,
- &dummy_get,
- crc);
- break;
case RP_ZA_GET:
- crc->msg = "%3u zero anonymity iteration took %20llums for %u\n";
- crc->api->iter_zero_anonymity (crc->api->cls, 0,
- &iterateDummy,
+ crc->api->iter_zero_anonymity (crc->api->cls, 1,
+ &iterate_zeros,
crc);
break;
+ case RP_EXP_GET:
+ crc->api->expiration_get (crc->api->cls,
+ &expiration_get,
+ crc);
+ break;
case RP_DONE:
+ exit (0);
crc->api->drop (crc->api->cls);
+ ok = 0;
GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
&cleaning_task, crc);
break;
crc->api = api;
crc->cfg = c;
crc->phase = RP_PUT;
+ ok = 2;
GNUNET_SCHEDULER_add_now (&test, crc);
}
GNUNET_GETOPT_OPTION_END
};
+ GNUNET_snprintf (category, sizeof (category),
+ "DATASTORE-%s",
+ plugin_name);
GNUNET_snprintf (cfg_name,
sizeof (cfg_name),
"perf_plugin_datastore_data_%s.conf",
/*
This file is part of GNUnet
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0);
-/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle
- "OR" very well, so we need to use UNION instead. And UNION does not
- automatically apply a LIMIT on the outermost clause, so we need to
- repeat ourselves quite a bit. All hail the performance gods (and thanks
- to #mysql on freenode) */
-#define SELECT_IT_LOW_PRIORITY "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\
- "ORDER BY prio ASC,vkey ASC LIMIT 1) " \
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\
- "ORDER BY prio ASC,vkey ASC LIMIT 1)"\
- "ORDER BY prio ASC,vkey ASC LIMIT 1"
-
-#define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
- " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
- " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
- "ORDER BY prio DESC,vkey DESC LIMIT 1"
-
-#define SELECT_IT_EXPIRATION_TIME "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\
- "ORDER BY expire ASC,vkey ASC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\
- "ORDER BY expire ASC,vkey ASC LIMIT 1)"\
- "ORDER BY expire ASC,vkey ASC LIMIT 1"
-
-
-#define SELECT_IT_MIGRATION_ORDER "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\
- " AND expire > ? AND type!=3"\
- " ORDER BY expire DESC,vkey DESC LIMIT 1) "\
- "UNION "\
- "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\
- " AND expire > ? AND type!=3"\
- " ORDER BY expire DESC,vkey DESC LIMIT 1)"\
- "ORDER BY expire DESC,vkey DESC LIMIT 1"
-
-
-
struct GNUNET_MysqlStatementHandle
{
struct GNUNET_MysqlStatementHandle *next;
MYSQL_BIND rbind[6];
- unsigned int type;
-
- unsigned int iter_select;
+ enum GNUNET_BLOCK_Type type;
PluginIterator dviter;
void *dviter_cls;
- unsigned int last_prio;
-
- unsigned long long last_expire;
-
unsigned long long last_vkey;
int end_it;
/**
* Statements dealing with gn090 table
*/
-#define INSERT_ENTRY "INSERT INTO gn090 (type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?)"
+#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)"
struct GNUNET_MysqlStatementHandle *insert_entry;
#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn090 WHERE vkey=?"
#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072"
struct GNUNET_MysqlStatementHandle *get_size;
- struct GNUNET_MysqlStatementHandle *iter[4];
+/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle
+ "OR" very well, so we need to use UNION instead. And UNION does not
+ automatically apply a LIMIT on the outermost clause, so we need to
+ repeat ourselves quite a bit. All hail the performance gods (and thanks
+ to #mysql on freenode) */
+#define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\
+ " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
+ "UNION "\
+ "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\
+ " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\
+ "ORDER BY prio DESC,vkey DESC LIMIT 1"
+ struct GNUNET_MysqlStatementHandle *zero_iter;
+
+#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ?) "\
+ "ORDER BY prio ASC LIMIT 1) "\
+ "UNION "\
+ "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) "\
+ "ORDER BY prio ASC LIMIT 1) ORDER BY expire ASC LIMIT 1"
+ struct GNUNET_MysqlStatementHandle *select_expiration;
+
+#define SELECT_IT_REPLICATION "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) "\
+ "WHERE expire > ?"\
+ " ORDER BY repl DESC,RAND() LIMIT 1) "
+ struct GNUNET_MysqlStatementHandle *select_replication;
};
*/
typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
unsigned int num_values,
- MYSQL_BIND * values);
+ MYSQL_BIND *values);
/**
*/
static int
prepared_statement_run_select (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle
- *s,
+ struct GNUNET_MysqlStatementHandle *s,
unsigned int result_size,
- MYSQL_BIND * results,
- GNUNET_MysqlDataProcessor
- processor, void *processor_cls,
+ MYSQL_BIND *results,
+ GNUNET_MysqlDataProcessor processor, void *processor_cls,
...)
{
va_list ap;
}
-/**
- * Run the prepared statement to get the next data item ready.
- *
- * @param cls not used
- * @param nrc closure for the next request iterator
- * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
- */
-static int
-iterator_helper_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct Plugin *plugin;
- int ret;
-
- if (nrc == NULL)
- return GNUNET_NO;
- plugin = nrc->plugin;
- ret = GNUNET_SYSERR;
- switch (nrc->iter_select)
- {
- case 0:
- case 1:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 6,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONG,
- &nrc->last_prio,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &nrc->last_prio,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES, -1);
- break;
- case 2:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 6,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES, -1);
- break;
- case 3:
- ret = prepared_statement_run_select (plugin,
- plugin->iter[nrc->iter_select],
- 6,
- nrc->rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->now.abs_value,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_expire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &nrc->now.abs_value,
- GNUNET_YES, -1);
- break;
- default:
- GNUNET_assert (0);
- }
- return ret;
-}
-
-
/**
* Continuation of "mysql_next_request".
*
(GNUNET_OK != nrc->prep (nrc->prep_cls,
nrc)))
goto END_SET;
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
nrc->last_vkey = vkey;
- nrc->last_prio = priority;
- nrc->last_expire = exp;
+ GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
(hashSize != sizeof (GNUNET_HashCode)) )
{
}
-/**
- * Iterate over the items in the datastore
- * using the given query to select and order
- * the items.
- *
- * @param plugin plugin context
- * @param type entries of which type should be considered?
- * @param iter_select which iterator statement are we using
- * @param is_asc are we using ascending order?
- * @param dviter function to call on each matching item
- * @param dviter_cls closure for dviter
- */
-static void
-iterateHelper (struct Plugin *plugin,
- enum GNUNET_BLOCK_Type type,
- int is_asc,
- unsigned int iter_select,
- PluginIterator dviter,
- void *dviter_cls)
-{
- struct NextRequestClosure *nrc;
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->type = type;
- nrc->iter_select = iter_select;
- nrc->dviter = dviter;
- nrc->dviter_cls = dviter_cls;
- nrc->prep = &iterator_helper_prepare;
- if (is_asc)
- {
- nrc->last_prio = 0;
- nrc->last_vkey = 0;
- nrc->last_expire = 0;
- }
- else
- {
- nrc->last_prio = 0x7FFFFFFFL;
- nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- }
- mysql_plugin_next_request (nrc, GNUNET_NO);
-}
-
-
/**
* Get an estimate of how much space the database is
* currently using.
char **msg)
{
struct Plugin *plugin = cls;
+ unsigned int irepl = replication;
unsigned int itype = type;
unsigned int ipriority = priority;
unsigned int ianonymity = anonymity;
plugin->insert_entry,
NULL,
MYSQL_TYPE_LONG,
+ &irepl,
+ GNUNET_YES,
+ MYSQL_TYPE_LONG,
&itype,
GNUNET_YES,
MYSQL_TYPE_LONG,
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Update the priority for a particular key in the datastore. If
+ * the expiration time in value is different than the time found in
+ * the datastore, the higher value should be kept. For the
+ * anonymity level, the lower value is to be used. The specified
+ * priority should be added to the existing priority, ignoring the
+ * priority in value.
+ *
+ * Note that it is possible for multiple values to match this put.
+ * In that case, all of the respective values are updated.
*
* @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param uid unique identifier of the datum
+ * @param delta by how much should the priority
+ * change? If priority + delta < 0 the
+ * priority should be set to 0 (never go
+ * negative).
+ * @param expire new expiration time should be the
+ * MAX of any existing expiration time and
+ * this value
+ * @param msg set to error message
+ * @return GNUNET_OK on success
*/
-static void
-mysql_plugin_iter_low_priority (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+static int
+mysql_plugin_update (void *cls,
+ uint64_t uid,
+ int delta,
+ struct GNUNET_TIME_Absolute expire,
+ char **msg)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_YES,
- 0, iter, iter_cls);
+ unsigned long long vkey = uid;
+ unsigned long long lexpire = expire.abs_value;
+ int ret;
+
+#if DEBUG_MYSQL
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Updating value %llu adding %d to priority and maxing exp at %llu\n",
+ vkey,
+ delta,
+ lexpire);
+#endif
+ ret = prepared_statement_run (plugin,
+ plugin->update_entry,
+ NULL,
+ MYSQL_TYPE_LONG,
+ &delta,
+ GNUNET_NO,
+ MYSQL_TYPE_LONGLONG,
+ &lexpire,
+ GNUNET_YES,
+ MYSQL_TYPE_LONGLONG,
+ &lexpire,
+ GNUNET_YES,
+ MYSQL_TYPE_LONGLONG,
+ &vkey,
+ GNUNET_YES, -1);
+ if (ret != GNUNET_OK)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to update value %llu\n",
+ vkey);
+ }
+ return ret;
}
*/
static void
mysql_plugin_get (void *cls,
- const GNUNET_HashCode * key,
- const GNUNET_HashCode * vhash,
+ const GNUNET_HashCode *key,
+ const GNUNET_HashCode *vhash,
enum GNUNET_BLOCK_Type type,
PluginIterator iter, void *iter_cls)
{
long long total;
unsigned long hashSize;
+ GNUNET_assert (key != NULL);
if (iter == NULL)
return;
- if (key == NULL)
- {
- mysql_plugin_iter_low_priority (plugin,
- type,
- iter, iter_cls);
- return;
- }
hashSize = sizeof (GNUNET_HashCode);
memset (cbind, 0, sizeof (cbind));
total = -1;
nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
nrc->plugin = plugin;
nrc->type = type;
- nrc->iter_select = -1;
nrc->dviter = iter;
nrc->dviter_cls = iter_cls;
nrc->prep = &get_statement_prepare;
/**
- * Get a random item for replication. Returns a single, not expired, random item
- * from those with the highest replication counters. The item's
- * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Run the prepared statement to get the next data item ready.
+ *
+ * @param cls not used
+ * @param nrc closure for the next request iterator
+ * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
+ */
+static int
+iterator_zero_prepare (void *cls,
+ struct NextRequestClosure *nrc)
+{
+ struct Plugin *plugin;
+
+ if (nrc == NULL)
+ return GNUNET_NO;
+ plugin = nrc->plugin;
+ return prepared_statement_run_select (plugin,
+ plugin->zero_iter,
+ 6,
+ nrc->rbind,
+ &return_ok,
+ NULL,
+ MYSQL_TYPE_LONGLONG,
+ &nrc->now.abs_value,
+ GNUNET_YES,
+ MYSQL_TYPE_LONG,
+ &nrc->type,
+ GNUNET_YES, -1);
+}
+
+
+/**
+ * Select a subset of the items in the datastore and call
+ * the given iterator for each of them.
*
- * @param cls closure
- * @param iter function to call the value (once only).
+ * @param cls our "struct Plugin*"
+ * @param type entries of which type should be considered?
+ * Use 0 for any type.
+ * @param iter function to call on each matching value;
+ * will be called once with a NULL value at the end
* @param iter_cls closure for iter
*/
static void
-mysql_plugin_replication_get (void *cls,
- PluginIterator iter, void *iter_cls)
+mysql_plugin_iter_zero_anonymity (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ PluginIterator iter,
+ void *iter_cls)
{
- /* FIXME: not implemented! */
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ struct Plugin *plugin = cls;
+ struct NextRequestClosure *nrc;
+
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->plugin = plugin;
+ nrc->type = type;
+ nrc->dviter = iter;
+ nrc->dviter_cls = iter_cls;
+ nrc->prep = &iterator_zero_prepare;
+ nrc->last_vkey = INT64_MAX; /* MySQL only supports 63 bits, hence signed */
+ mysql_plugin_next_request (nrc, GNUNET_NO);
}
+/**
+ * Run the SELECT statement for the replication function.
+ *
+ * @param cls the 'struct Plugin'
+ * @param nrc the context (not used)
+ */
+static int
+replication_prepare (void *cls,
+ struct NextRequestClosure *nrc)
+{
+ struct Plugin *plugin = cls;
+ long long nt;
+
+ nt = (long long) nrc->now.abs_value;
+ return prepared_statement_run_select
+ (plugin,
+ plugin->select_replication,
+ 6, nrc->rbind,
+ &return_ok, NULL,
+ MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
+ -1);
+}
+
/**
- * Get a random item for expiration.
+ * Get a random item for replication. Returns a single, not expired, random item
+ * from those with the highest replication counters. The item's
+ * replication counter is decremented by one IF it was positive before.
* Call 'iter' with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
* @param iter_cls closure for iter
*/
static void
-mysql_plugin_expiration_get (void *cls,
- PluginIterator iter, void *iter_cls)
+mysql_plugin_replication_get (void *cls,
+ PluginIterator iter, void *iter_cls)
{
- /* FIXME: not implemented! */
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ struct Plugin *plugin = cls;
+ struct NextRequestClosure nrc;
+
+ memset (&nrc, 0, sizeof (nrc));
+ nrc.plugin = plugin;
+ nrc.now = GNUNET_TIME_absolute_get ();
+ nrc.prep = &replication_prepare;
+ nrc.prep_cls = plugin;
+ nrc.type = 0;
+ nrc.dviter = iter;
+ nrc.dviter_cls = iter_cls;
+ nrc.end_it = GNUNET_NO;
+ mysql_next_request_cont (&nrc, NULL);
}
/**
- * Update the priority for a particular key in the datastore. If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept. For the
- * anonymity level, the lower value is to be used. The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- * change? If priority + delta < 0 the
- * priority should be set to 0 (never go
- * negative).
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * Run the SELECT statement for the expiration function.
+ *
+ * @param cls the 'struct Plugin'
+ * @param nrc the context (not used)
*/
static int
-mysql_plugin_update (void *cls,
- uint64_t uid,
- int delta,
- struct GNUNET_TIME_Absolute expire,
- char **msg)
+expiration_prepare (void *cls,
+ struct NextRequestClosure *nrc)
{
struct Plugin *plugin = cls;
- unsigned long long vkey = uid;
- unsigned long long lexpire = expire.abs_value;
- int ret;
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Updating value %llu adding %d to priority and maxing exp at %llu\n",
- vkey,
- delta,
- lexpire);
-#endif
- ret = prepared_statement_run (plugin,
- plugin->update_entry,
- NULL,
- MYSQL_TYPE_LONG,
- &delta,
- GNUNET_NO,
- MYSQL_TYPE_LONGLONG,
- &lexpire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &lexpire,
- GNUNET_YES,
- MYSQL_TYPE_LONGLONG,
- &vkey,
- GNUNET_YES, -1);
- if (ret != GNUNET_OK)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to update value %llu\n",
- vkey);
- }
- return ret;
+ return prepared_statement_run_select
+ (plugin,
+ plugin->select_expiration,
+ 6, nrc->rbind,
+ &return_ok, NULL,
+ -1);
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Get a random item for expiration.
+ * Call 'iter' with all values ZERO or NULL if the datastore is empty.
*
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
+ * @param cls closure
+ * @param iter function to call the value (once only).
* @param iter_cls closure for iter
*/
static void
-mysql_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_expiration_get (void *cls,
+ PluginIterator iter, void *iter_cls)
{
struct Plugin *plugin = cls;
- iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls);
+ struct NextRequestClosure nrc;
+
+ memset (&nrc, 0, sizeof (nrc));
+ nrc.plugin = plugin;
+ nrc.now = GNUNET_TIME_absolute_get ();
+ nrc.prep = &expiration_prepare;
+ nrc.prep_cls = plugin;
+ nrc.type = 0;
+ nrc.dviter = iter;
+ nrc.dviter_cls = iter_cls;
+ nrc.end_it = GNUNET_NO;
+ mysql_next_request_cont (&nrc, NULL);
}
/**
* Drop database.
+ *
+ * @param cls the "struct Plugin*"
*/
static void
mysql_plugin_drop (void *cls)
#define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) )
#define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b)))
if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 ("
+ " repl INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" prio INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0,"
|| PINIT (plugin->count_entry_by_hash_vhash_and_type,
COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
|| PINIT (plugin->update_entry, UPDATE_ENTRY)
- || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY)
- || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS)
- || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME)
- || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER))
+ || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS)
+ || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION)
+ || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
{
iclose (plugin);
GNUNET_free_non_null (plugin->cnffile);
/**
* Enable or disable logging debug messages.
*/
-#define DEBUG_SQLITE GNUNET_NO
+#define DEBUG_SQLITE GNUNET_YES
/**
* We allocate items on the stack at times. To prevent a stack
/* create indices */
sqlite3_exec (dbh,
"CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
- NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL,
- NULL);
sqlite3_exec (dbh,
"CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
NULL, NULL);
- sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)",
+ sqlite3_exec (dbh, "CREATE INDEX idx_expire_repl ON gn090 (expire ASC,repl DESC)", NULL, NULL,
+ NULL);
+ sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (anonLevel ASC,expire ASC,prio,type,hash)",
+ NULL, NULL, NULL);
+ sqlite3_exec (dbh, "CREATE INDEX expire ON gn090 (expire)",
NULL, NULL, NULL);
}
CHECK (SQLITE_OK ==
sqlite3_exec (plugin->dbh,
"PRAGMA synchronous=OFF", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "PRAGMA legacy_file_format=OFF", NULL, NULL, ENULL));
CHECK (SQLITE_OK ==
sqlite3_exec (plugin->dbh,
"PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
+ CHECK (SQLITE_OK ==
+ sqlite3_exec (plugin->dbh,
+ "PRAGMA locking_mode=EXCLUSIVE", NULL, NULL, ENULL));
CHECK (SQLITE_OK ==
sqlite3_exec (plugin->dbh,
"PRAGMA count_changes=OFF", NULL, NULL, ENULL));
sqlite3_finalize (stmt);
if ((sq_prepare (plugin->dbh,
- "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
- "_ROWID_ = ?",
+ "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE _ROWID_ = ?",
&plugin->updPrio) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
- "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE "
- "_ROWID_ = ?",
+ "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?",
&plugin->updRepl) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
- "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) "
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE expire > ?"
" ORDER BY repl DESC, Random() LIMIT 1",
&plugin->selRepl) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
- "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) "
- " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) "
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
+ " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 "
" ORDER BY prio ASC LIMIT 1",
&plugin->selExpi) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
"INSERT INTO gn090 (repl, type, prio, "
- "anonLevel, expire, hash, vhash, value) VALUES "
- "(?, ?, ?, ?, ?, ?, ?, ?)",
+ "anonLevel, expire, hash, vhash, value) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
&plugin->insertContent) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
"DELETE FROM gn090 WHERE _ROWID_ = ?",
now = GNUNET_TIME_absolute_get ();
GNUNET_asprintf (&q,
"SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
- "WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND type=%d AND hash < ?2) "
+ "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) "
"ORDER BY hash DESC LIMIT 1",
(unsigned long long) now.abs_value,
type);
GNUNET_free (q);
GNUNET_asprintf (&q,
"SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
- "WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0 AND type=%d) "
+ "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) "
"ORDER BY prio DESC, hash DESC LIMIT 1",
(unsigned long long) now.abs_value,
type);
_("sqlite version to old to determine size, assuming zero\n"));
return 0;
}
- if (SQLITE_OK !=
- sqlite3_exec (plugin->dbh,
- "VACUUM", NULL, NULL, ENULL))
- abort ();
CHECK (SQLITE_OK ==
sqlite3_exec (plugin->dbh,
"VACUUM", NULL, NULL, ENULL));