};
+/**
+ * Context for the universal iterator.
+ */
+struct NextRequestClosure;
+
+/**
+ * Type of a function that will prepare
+ * the next iteration.
+ *
+ * @param cls closure
+ * @param nc the next context; NULL for the last
+ * call which gives the callback a chance to
+ * clean up the closure
+ * @return GNUNET_OK on success, GNUNET_NO if there are
+ * no more values, GNUNET_SYSERR on error
+ */
+typedef int (*PrepareFunction)(void *cls,
+ struct NextRequestClosure *nc);
+
+
struct NextRequestClosure
{
struct Plugin *plugin;
+ struct GNUNET_TIME_Absolute now;
+
+ /**
+ * Function to call to prepare the next
+ * iteration.
+ */
+ PrepareFunction prep;
+
+ /**
+ * Closure for prep.
+ */
+ void *prep_cls;
+
+ MYSQL_BIND rbind[7];
+
unsigned int type;
unsigned int iter_select;
*/
char *cnffile;
-
/**
* Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
*/
}
+
+/**
+ * Free a prepared statement.
+ */
+static void
+prepared_statement_destroy (struct Plugin *plugin,
+ struct GNUNET_MysqlStatementHandle
+ *s)
+{
+ GNUNET_CONTAINER_DLL_remove (plugin->shead,
+ plugin->stail,
+ s);
+ if (s->valid)
+ mysql_stmt_close (s->statement);
+ GNUNET_free (s->query);
+ GNUNET_free (s);
+}
+
+
/**
* Close database connection and all prepared statements (we got a DB
* disconnect error).
struct GNUNET_MysqlStatementHandle *spos;
spos = plugin->shead;
- while (spos != NULL)
- {
- if (spos->statement != NULL)
- {
- mysql_stmt_close (spos->statement);
- spos->statement = NULL;
- }
- spos->valid = GNUNET_NO;
- spos = spos->next;
- }
+ while (NULL != plugin->shead)
+ prepared_statement_destroy (plugin,
+ plugin->shead);
if (plugin->dbf != NULL)
{
mysql_close (plugin->dbf);
}
+#if 0
/**
* Run the given MySQL SELECT statement. The statement
* must have only a single result (one column, one row).
mysql_free_result (sql_res);
return ret;
}
+#endif
/**
}
ret->valid = GNUNET_YES;
return GNUNET_OK;
-}
-
-/**
- * Free a prepared statement.
- */
-static void
-prepared_statement_destroy (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle
- *s)
-{
- GNUNET_CONTAINER_DLL_remove (plugin->shead,
- plugin->stail,
- s);
- if (s->valid)
- mysql_stmt_close (s->statement);
- GNUNET_free (s->query);
- GNUNET_free (s);
}
}
-/**
- * Continuation of "sqlite_next_request".
- *
- * @param next_cls the next context
- * @param tc the task context (unused)
- */
-static void
-sqlite_next_request_cont (void *next_cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+iterator_helper_prepare (void *cls,
+ struct NextRequestClosure *nrc)
{
- struct NextRequestClosure *nrc = next_cls;
struct Plugin *plugin;
int ret;
- unsigned int size;
- unsigned int type;
- unsigned int priority;
- unsigned int anonymity;
- unsigned long long exp;
- unsigned long long vkey;
- unsigned long hashSize;
- GNUNET_HashCode key;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute expiration;
- MYSQL_BIND rbind[7];
- unsigned int contentSize;
- unsigned long length;
- MYSQL_BIND dbind[1];
- char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
- AGAIN:
+ if (nrc == NULL)
+ return GNUNET_NO;
plugin = nrc->plugin;
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc = NULL;
- hashSize = sizeof (GNUNET_HashCode);
- memset (rbind, 0, sizeof (rbind));
- rbind[0].buffer_type = MYSQL_TYPE_LONG;
- rbind[0].buffer = &size;
- rbind[0].is_unsigned = 1;
- rbind[1].buffer_type = MYSQL_TYPE_LONG;
- rbind[1].buffer = &type;
- rbind[1].is_unsigned = 1;
- rbind[2].buffer_type = MYSQL_TYPE_LONG;
- rbind[2].buffer = &priority;
- rbind[2].is_unsigned = 1;
- rbind[3].buffer_type = MYSQL_TYPE_LONG;
- rbind[3].buffer = &anonymity;
- rbind[3].is_unsigned = 1;
- rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[4].buffer = &exp;
- rbind[4].is_unsigned = 1;
- rbind[5].buffer_type = MYSQL_TYPE_BLOB;
- rbind[5].buffer = &key;
- rbind[5].buffer_length = hashSize;
- rbind[5].length = &hashSize;
- rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[6].buffer = &vkey;
- rbind[6].is_unsigned = GNUNET_YES;
-
- now = GNUNET_TIME_absolute_get ();
+ ret = GNUNET_SYSERR;
switch (nrc->iter_select)
{
case 0:
ret = prepared_statement_run_select (plugin,
plugin->iter[nrc->iter_select],
7,
- rbind,
+ nrc->rbind,
&return_ok,
NULL,
MYSQL_TYPE_LONG,
ret = prepared_statement_run_select (plugin,
plugin->iter[nrc->iter_select],
7,
- rbind,
+ nrc->rbind,
&return_ok,
NULL,
MYSQL_TYPE_LONGLONG,
ret = prepared_statement_run_select (plugin,
plugin->iter[nrc->iter_select],
7,
- rbind,
+ nrc->rbind,
&return_ok,
NULL,
MYSQL_TYPE_LONGLONG,
&nrc->last_vkey,
GNUNET_YES,
MYSQL_TYPE_LONGLONG,
- &now.value,
+ &nrc->now.value,
GNUNET_YES,
MYSQL_TYPE_LONGLONG,
&nrc->last_expire,
&nrc->last_vkey,
GNUNET_YES,
MYSQL_TYPE_LONGLONG,
- &now.value,
+ &nrc->now.value,
GNUNET_YES, -1);
break;
default:
GNUNET_assert (0);
- return;
}
+ return ret;
+}
+
+
+/**
+ * Continuation of "sqlite_next_request".
+ *
+ * @param next_cls the next context
+ * @param tc the task context (unused)
+ */
+static void
+sqlite_next_request_cont (void *next_cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NextRequestClosure *nrc = next_cls;
+ struct Plugin *plugin;
+ int ret;
+ unsigned int size;
+ unsigned int type;
+ unsigned int priority;
+ unsigned int anonymity;
+ unsigned long long exp;
+ unsigned long long vkey;
+ unsigned long hashSize;
+ GNUNET_HashCode key;
+ struct GNUNET_TIME_Absolute expiration;
+ unsigned int contentSize;
+ unsigned long length;
+ MYSQL_BIND *rbind; /* size 7 */
+ MYSQL_BIND dbind[1];
+ char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+
+ plugin = nrc->plugin;
+ plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+ plugin->next_task_nc = NULL;
+
+ AGAIN:
+ ret = nrc->prep (nrc->prep_cls,
+ nrc);
if (ret != GNUNET_OK)
goto END_SET;
+ nrc->now = GNUNET_TIME_absolute_get ();
+ hashSize = sizeof (GNUNET_HashCode);
+ memset (nrc->rbind, 0, sizeof (nrc->rbind));
+ rbind = nrc->rbind;
+ rbind[0].buffer_type = MYSQL_TYPE_LONG;
+ rbind[0].buffer = &size;
+ rbind[0].is_unsigned = 1;
+ rbind[1].buffer_type = MYSQL_TYPE_LONG;
+ rbind[1].buffer = &type;
+ rbind[1].is_unsigned = 1;
+ rbind[2].buffer_type = MYSQL_TYPE_LONG;
+ rbind[2].buffer = &priority;
+ rbind[2].is_unsigned = 1;
+ rbind[3].buffer_type = MYSQL_TYPE_LONG;
+ rbind[3].buffer = &anonymity;
+ rbind[3].is_unsigned = 1;
+ rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[4].buffer = &exp;
+ rbind[4].is_unsigned = 1;
+ rbind[5].buffer_type = MYSQL_TYPE_BLOB;
+ rbind[5].buffer = &key;
+ rbind[5].buffer_length = hashSize;
+ rbind[5].length = &hashSize;
+ rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[6].buffer = &vkey;
+ rbind[6].is_unsigned = GNUNET_YES;
+
nrc->last_vkey = vkey;
nrc->last_prio = priority;
nrc->last_expire = exp;
return;
END_SET:
/* call dviter with "end of set" */
- return;
+ nrc->dviter (nrc->dviter_cls,
+ NULL, NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ nrc->prep (nrc->prep_cls, NULL);
+ GNUNET_free (nrc);
}
nrc->iter_select = iter_select;
nrc->dviter = dviter;
nrc->dviter_cls = dviter_cls;
-
+ nrc->prep = &iterator_helper_prepare;
if (is_asc)
{
nrc->last_prio = 0;
}
+struct GetContext
+{
+ GNUNET_HashCode key;
+ GNUNET_HashCode vhash;
+
+ unsigned int prio;
+ unsigned int anonymity;
+ unsigned int limit_off;
+ unsigned long long expiration;
+ unsigned long long vkey;
+ unsigned long long total;
+ int off;
+ int count;
+ int have_vhash;
+ unsigned long size; /* OBSOLETE! */
+ unsigned long hashSize;
+};
+
+
+static int
+get_statement_prepare (void *cls,
+ struct NextRequestClosure *nrc)
+{
+ struct GetContext *gc = cls;
+ struct Plugin *plugin;
+ MYSQL_BIND *rbind;
+ int ret;
+
+ if (NULL == nrc)
+ {
+ GNUNET_free (gc);
+ return GNUNET_NO;
+ }
+ if (gc->count == gc->total)
+ return GNUNET_NO;
+ plugin = nrc->plugin;
+ memset (nrc->rbind, 0, sizeof (nrc->rbind));
+ gc->hashSize = sizeof (GNUNET_HashCode);
+ rbind = nrc->rbind;
+ rbind[0].buffer_type = MYSQL_TYPE_LONG;
+ rbind[0].buffer = &gc->size;
+ rbind[0].is_unsigned = GNUNET_YES;
+ rbind[1].buffer_type = MYSQL_TYPE_LONG;
+ rbind[1].buffer = &nrc->type;
+ rbind[1].is_unsigned = GNUNET_YES;
+ rbind[2].buffer_type = MYSQL_TYPE_LONG;
+ rbind[2].buffer = &nrc->last_prio;
+ rbind[2].is_unsigned = GNUNET_YES;
+ rbind[3].buffer_type = MYSQL_TYPE_LONG;
+ rbind[3].buffer = &gc->anonymity;
+ rbind[3].is_unsigned = GNUNET_YES;
+ rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[4].buffer = &gc->expiration;
+ rbind[4].is_unsigned = GNUNET_YES;
+ rbind[5].buffer_type = MYSQL_TYPE_BLOB;
+ rbind[5].buffer = &gc->key;
+ rbind[5].buffer_length = gc->hashSize;
+ rbind[5].length = &gc->hashSize;
+ rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
+ rbind[6].buffer = &gc->vkey;
+ rbind[6].is_unsigned = GNUNET_YES;
+ if (gc->count == 0)
+ gc->limit_off = gc->off;
+ else
+ gc->limit_off = 0;
+
+ if (nrc->type != 0)
+ {
+ if (gc->have_vhash)
+ {
+ ret =
+ prepared_statement_run_select
+ (plugin,
+ plugin->select_entry_by_hash_vhash_and_type, 7, nrc->rbind, &return_ok,
+ NULL, MYSQL_TYPE_BLOB, &gc->key, gc->hashSize, &gc->hashSize,
+ MYSQL_TYPE_BLOB, &gc->vhash, gc->hashSize, &gc->hashSize,
+ MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
+ &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &gc->limit_off, GNUNET_YES,
+ -1);
+ }
+ else
+ {
+ ret =
+ prepared_statement_run_select
+ (plugin,
+ plugin->select_entry_by_hash_and_type, 7, nrc->rbind, &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, gc->hashSize, &gc->hashSize,
+ MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
+ &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &gc->limit_off, GNUNET_YES,
+ -1);
+ }
+ }
+ else
+ {
+ if (gc->have_vhash)
+ {
+ ret =
+ prepared_statement_run_select
+ (plugin,
+ plugin->select_entry_by_hash_and_vhash, 7, nrc->rbind, &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, gc->hashSize, &gc->hashSize, MYSQL_TYPE_BLOB,
+ &gc->vhash, gc->hashSize, &gc->hashSize, MYSQL_TYPE_LONGLONG,
+ &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &gc->limit_off,
+ GNUNET_YES, -1);
+ }
+ else
+ {
+ ret =
+ prepared_statement_run_select
+ (plugin,
+ plugin->select_entry_by_hash, 7, nrc->rbind, &return_ok, NULL,
+ MYSQL_TYPE_BLOB, &gc->key, gc->hashSize, &gc->hashSize,
+ MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
+ &gc->limit_off, GNUNET_YES, -1);
+ }
+ }
+ if (gc->count + gc->off == gc->total)
+ nrc->last_vkey = 0; /* back to start */
+ return ret;
+}
+
+
/**
* Iterate over the results for a particular key
* in the datastore.
PluginIterator iter, void *iter_cls)
{
struct Plugin *plugin = cls;
- int count;
- unsigned long long total;
- int off;
int ret;
- unsigned int size;
- unsigned int rtype;
- unsigned int prio;
- unsigned int level;
- unsigned int limit_off;
- unsigned long long expiration;
- unsigned long long vkey;
- unsigned long long last_vkey;
+ MYSQL_BIND cbind[1];
+ struct GetContext *gc;
+ struct NextRequestClosure *nrc;
+ unsigned long long total;
unsigned long hashSize;
- unsigned long hashSize2;
- MYSQL_BIND rbind[7];
if (iter == NULL)
return;
return;
}
hashSize = sizeof (GNUNET_HashCode);
- hashSize2 = sizeof (GNUNET_HashCode);
- memset (rbind, 0, sizeof (rbind));
+ memset (cbind, 0, sizeof (cbind));
total = -1;
- rbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[0].buffer = &total;
- rbind[0].is_unsigned = GNUNET_YES;
+ cbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
+ cbind[0].buffer = &total;
+ cbind[0].is_unsigned = GNUNET_YES;
if (type != 0)
{
if (vhash != NULL)
ret =
prepared_statement_run_select
(plugin,
- plugin->count_entry_by_hash_vhash_and_type, 1, rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_BLOB,
- vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONG, &type, GNUNET_YES,
+ plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
+ vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &type, GNUNET_YES,
-1);
}
else
ret =
prepared_statement_run_select
(plugin,
- plugin->count_entry_by_hash_and_type, 1, rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_LONG,
+ plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG,
&type, GNUNET_YES, -1);
}
ret =
prepared_statement_run_select
(plugin,
- plugin->count_entry_by_hash_and_vhash, 1, rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize2, &hashSize2, MYSQL_TYPE_BLOB,
- vhash, hashSize2, &hashSize2, -1);
+ plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
+ vhash, hashSize, &hashSize, -1);
}
else
ret =
prepared_statement_run_select (plugin,
plugin->count_entry_by_hash,
- 1, rbind, &return_ok,
+ 1, cbind, &return_ok,
NULL, MYSQL_TYPE_BLOB,
- key, hashSize2,
- &hashSize2, -1);
+ key, hashSize,
+ &hashSize, -1);
}
}
- if ((ret != GNUNET_OK) || (-1 == total))
- return;
- if (total == 0)
- return;
-
- last_vkey = 0;
- count = 0;
- off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
-
- memset (rbind, 0, sizeof (rbind));
- rbind[0].buffer_type = MYSQL_TYPE_LONG;
- rbind[0].buffer = &size;
- rbind[0].is_unsigned = GNUNET_YES;
- rbind[1].buffer_type = MYSQL_TYPE_LONG;
- rbind[1].buffer = &rtype;
- rbind[1].is_unsigned = GNUNET_YES;
- rbind[2].buffer_type = MYSQL_TYPE_LONG;
- rbind[2].buffer = &prio;
- rbind[2].is_unsigned = GNUNET_YES;
- rbind[3].buffer_type = MYSQL_TYPE_LONG;
- rbind[3].buffer = &level;
- rbind[3].is_unsigned = GNUNET_YES;
- rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[4].buffer = &expiration;
- rbind[4].is_unsigned = GNUNET_YES;
- rbind[5].buffer_type = MYSQL_TYPE_BLOB;
- rbind[5].buffer = (void*) key;
- rbind[5].buffer_length = hashSize;
- rbind[5].length = &hashSize;
- rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[6].buffer = &vkey;
- rbind[6].is_unsigned = GNUNET_YES;
- while (1)
+ if ((ret != GNUNET_OK) || (-1 == total) || (0 == total))
{
- if (count == 0)
- limit_off = off;
- else
- limit_off = 0;
- if (type != 0)
- {
- if (vhash != NULL)
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_vhash_and_type, 7, rbind, &return_ok,
- NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
- MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
- MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
- -1);
- }
- else
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_and_type, 7, rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES,
- -1);
- }
- }
- else
- {
- if (vhash != NULL)
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash_and_vhash, 7, rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB,
- vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONGLONG,
- &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off,
- GNUNET_YES, -1);
- }
- else
- {
- ret =
- prepared_statement_run_select
- (plugin,
- plugin->select_entry_by_hash, 7, rbind, &return_ok, NULL,
- MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
- MYSQL_TYPE_LONGLONG, &last_vkey, GNUNET_YES, MYSQL_TYPE_LONG,
- &limit_off, GNUNET_YES, -1);
- }
- }
- if (ret != GNUNET_OK)
- break;
- last_vkey = vkey;
-#if FIXME
- datum = assembleDatum (rbind);
- if (datum == NULL)
- continue;
- count++;
- ret = iter (iter_cls, key, datum, vkey);
-#endif
- ret = GNUNET_SYSERR;
- if (ret == GNUNET_SYSERR)
- {
- break;
- }
- if (ret == GNUNET_NO)
- {
- do_delete_value (plugin, vkey);
- do_delete_entry_by_vkey (plugin, vkey);
- plugin->content_size -= size;
- }
- if (count + off == total)
- last_vkey = 0; /* back to start */
- if (count == total)
- break;
+ iter (iter_cls,
+ NULL, NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ gc = GNUNET_malloc (sizeof (struct GetContext));
+ gc->key = *key;
+ if (vhash != NULL)
+ {
+ gc->have_vhash = GNUNET_YES;
+ gc->vhash = *vhash;
}
+ gc->total = total;
+ gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
+
+
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->plugin = plugin;
+ nrc->type = type;
+ nrc->iter_select = -1;
+ nrc->dviter = iter;
+ nrc->dviter_cls = iter_cls;
+ nrc->prep = &get_statement_prepare;
+ nrc->prep_cls = gc;
+ nrc->last_vkey = 0; // FIXME: used to be 'vkey', why? where did vkey come from?
+ mysql_plugin_next_request (nrc, GNUNET_NO);
}
GNUNET_SCHEDULER_cancel (plugin->env->sched,
plugin->next_task);
plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+ plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
GNUNET_free (plugin->next_task_nc);
plugin->next_task_nc = NULL;
}