uint32_t type GNUNET_PACKED;
/**
- * Offset of the result.
+ * UID at which to start the search
*/
- uint64_t offset GNUNET_PACKED;
+ uint64_t next_uid GNUNET_PACKED;
+
+ /**
+ * If true return a random result
+ */
+ uint32_t random GNUNET_PACKED;
/**
* Desired key.
uint32_t type GNUNET_PACKED;
/**
- * Offset of the result.
+ * UID at which to start the search
+ */
+ uint64_t next_uid GNUNET_PACKED;
+
+ /**
+ * If true return a random result
*/
- uint64_t offset GNUNET_PACKED;
+ uint32_t random GNUNET_PACKED;
};
uint32_t type GNUNET_PACKED;
/**
- * Offset of the result.
+ * UID at which to start the search
*/
- uint64_t offset GNUNET_PACKED;
+ uint64_t next_uid GNUNET_PACKED;
};
* Get a single zero-anonymity value from the datastore.
*
* @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
+ uint64_t next_uid,
unsigned int queue_priority,
unsigned int max_queue_size,
enum GNUNET_BLOCK_Type type,
GNUNET_assert (NULL != proc);
GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to get %llu-th zero-anonymity entry of type %d\n",
- (unsigned long long) offset,
+ "Asked to get a zero-anonymity entry of type %d\n",
type);
env = GNUNET_MQ_msg (m,
GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
m->type = htonl ((uint32_t) type);
- m->offset = GNUNET_htonll (offset);
+ m->next_uid = GNUNET_htonll (next_uid);
qc.rc.proc = proc;
qc.rc.proc_cls = proc_cls;
qe = make_queue_entry (h,
* will only be called once.
*
* @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param type desired type, 0 for any
* @param queue_priority ranking of this request in the priority queue
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
+ uint64_t next_uid,
+ bool random,
const struct GNUNET_HashCode *key,
enum GNUNET_BLOCK_Type type,
unsigned int queue_priority,
env = GNUNET_MQ_msg (gm,
GNUNET_MESSAGE_TYPE_DATASTORE_GET);
gm->type = htonl (type);
- gm->offset = GNUNET_htonll (offset);
+ gm->next_uid = GNUNET_htonll (next_uid);
+ gm->random = random;
}
else
{
env = GNUNET_MQ_msg (gkm,
GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
gkm->type = htonl (type);
- gkm->offset = GNUNET_htonll (offset);
+ gkm->next_uid = GNUNET_htonll (next_uid);
+ gkm->random = random;
gkm->key = *key;
}
qc.rc.proc = proc;
do_get ()
{
qe = GNUNET_DATASTORE_get_key (db_src,
- offset,
+ 0, false,
NULL, GNUNET_BLOCK_TYPE_ANY,
0, 1,
&do_put, NULL);
size,
&vhash);
plugin->api->get_key (plugin->api->cls,
- 0,
- &dm->key,
- &vhash,
+ 0,
+ false,
+ &dm->key,
+ &vhash,
ntohl (dm->type),
- &check_present,
- pc);
+ &check_present,
+ pc);
GNUNET_SERVICE_client_continue (client);
return;
}
1,
GNUNET_NO);
plugin->api->get_key (plugin->api->cls,
- GNUNET_ntohll (msg->offset),
+ GNUNET_ntohll (msg->next_uid),
+ msg->random,
NULL,
NULL,
ntohl (msg->type),
return;
}
plugin->api->get_key (plugin->api->cls,
- GNUNET_ntohll (msg->offset),
+ GNUNET_ntohll (msg->next_uid),
+ msg->random,
&msg->key,
NULL,
ntohl (msg->type),
1,
GNUNET_NO);
plugin->api->get_zero_anonymity (plugin->api->cls,
- GNUNET_ntohll (msg->offset),
+ GNUNET_ntohll (msg->next_uid),
type,
&transmit_item,
client);
(uint32_t) ntohl (dm->type));
plugin->api->get_key (plugin->api->cls,
0,
+ false,
&dm->key,
&vhash,
(enum GNUNET_BLOCK_Type) ntohl (dm->type),
{
/**
- * Desired result offset / number of results.
+ * Lowest uid to consider.
*/
- uint64_t offset;
+ uint64_t next_uid;
/**
- * The plugin.
+ * Value with lowest uid >= next_uid found so far.
*/
- struct Plugin *plugin;
+ struct Value *value;
/**
* Requested value hash.
*/
- const struct GNUNET_HashCode * vhash;
+ const struct GNUNET_HashCode *vhash;
/**
* Requested type.
enum GNUNET_BLOCK_Type type;
/**
- * Function to call with the result.
+ * If true, return a random value
*/
- PluginDatumProcessor proc;
+ bool random;
- /**
- * Closure for 'proc'.
- */
- void *proc_cls;
};
/**
- * Test if a value matches the specification from the 'get' context
- *
- * @param gc query
- * @param value the value to check against the query
- * @return GNUNET_YES if the value matches
- */
-static int
-match (const struct GetContext *gc,
- struct Value *value)
-{
- struct GNUNET_HashCode vh;
-
- if ( (gc->type != GNUNET_BLOCK_TYPE_ANY) &&
- (gc->type != value->type) )
- return GNUNET_NO;
- if (NULL != gc->vhash)
- {
- GNUNET_CRYPTO_hash (&value[1], value->size, &vh);
- if (0 != memcmp (&vh, gc->vhash, sizeof (struct GNUNET_HashCode)))
- return GNUNET_NO;
- }
- return GNUNET_YES;
-}
-
-
-/**
- * Count number of matching values.
- *
- * @param cls the 'struct GetContext'
- * @param key unused
- * @param val the 'struct Value'
- * @return GNUNET_YES (continue iteration)
- */
-static int
-count_iterator (void *cls,
- const struct GNUNET_HashCode *key,
- void *val)
-{
- struct GetContext *gc = cls;
- struct Value *value = val;
-
- if (GNUNET_NO == match (gc, value))
- return GNUNET_OK;
- gc->offset++;
- return GNUNET_OK;
-}
-
-
-/**
- * Obtain matching value at 'offset'.
+ * Obtain the matching value with the lowest uid >= next_uid.
*
* @param cls the 'struct GetContext'
* @param key unused
{
struct GetContext *gc = cls;
struct Value *value = val;
+ struct GNUNET_HashCode vh;
- if (GNUNET_NO == match (gc, value))
+ if ( (gc->type != GNUNET_BLOCK_TYPE_ANY) &&
+ (gc->type != value->type) )
return GNUNET_OK;
- if (0 != gc->offset--)
+ if (NULL != gc->vhash)
+ {
+ GNUNET_CRYPTO_hash (&value[1], value->size, &vh);
+ if (0 != memcmp (&vh, gc->vhash, sizeof (struct GNUNET_HashCode)))
+ return GNUNET_OK;
+ }
+ if (gc->random)
+ {
+ gc->value = value;
+ return GNUNET_NO;
+ }
+ if ( (uint64_t) (intptr_t) value < gc->next_uid)
return GNUNET_OK;
- if (GNUNET_NO ==
- gc->proc (gc->proc_cls,
- key,
- value->size,
- &value[1],
- value->type,
- value->priority,
- value->anonymity,
- value->expiration,
- (uint64_t) (long) value))
- delete_value (gc->plugin, value);
- return GNUNET_NO;
+ if ( (NULL != gc->value) &&
+ (value > gc->value) )
+ return GNUNET_OK;
+ gc->value = value;
+ return GNUNET_OK;
}
* Get one of the results for a particular key in the datastore.
*
* @param cls closure
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* @param proc_cls closure for proc
*/
static void
-heap_plugin_get_key (void *cls, uint64_t offset,
+heap_plugin_get_key (void *cls, uint64_t next_uid, bool random,
const struct GNUNET_HashCode *key,
const struct GNUNET_HashCode *vhash,
enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
struct Plugin *plugin = cls;
struct GetContext gc;
- gc.plugin = plugin;
- gc.offset = 0;
+ gc.value = NULL;
+ gc.next_uid = next_uid;
+ gc.random = random;
gc.vhash = vhash;
gc.type = type;
- gc.proc = proc;
- gc.proc_cls = proc_cls;
if (NULL == key)
{
- GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
- &count_iterator,
- &gc);
- if (0 == gc.offset)
- {
- proc (proc_cls,
- NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- gc.offset = offset % gc.offset;
GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
&get_iterator,
&gc);
}
else
{
- GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue,
- key,
- &count_iterator,
- &gc);
- if (0 == gc.offset)
- {
- proc (proc_cls,
- NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- gc.offset = offset % gc.offset;
GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue,
key,
&get_iterator,
&gc);
}
+ if (NULL == gc.value)
+ {
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
+ if (GNUNET_NO ==
+ proc (proc_cls,
+ &gc.value->key,
+ gc.value->size,
+ &gc.value[1],
+ gc.value->type,
+ gc.value->priority,
+ gc.value->anonymity,
+ gc.value->expiration,
+ (uint64_t) (intptr_t) gc.value))
+ {
+ delete_value (plugin, gc.value);
+ }
}
value->priority,
value->anonymity,
value->expiration,
- (uint64_t) (long) value))
+ (uint64_t) (intptr_t) value))
delete_value (plugin, value);
}
value->priority,
value->anonymity,
value->expiration,
- (uint64_t) (long) value))
+ (uint64_t) (intptr_t) value))
delete_value (plugin, value);
}
{
struct Value *value;
- value = (struct Value*) (long) uid;
+ value = (struct Value*) (intptr_t) uid;
GNUNET_assert (NULL != value);
if (value->expiration.abs_value_us != expire.abs_value_us)
{
* Call the given processor on an item with zero anonymity.
*
* @param cls our "struct Plugin*"
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
- * Use 0 for any type.
+ * Must not be zero (ANY).
* @param proc function to call on each matching value;
- * will be called with NULL if no value matches
+ * will be called with NULL if no value matches
* @param proc_cls closure for proc
*/
static void
-heap_plugin_get_zero_anonymity (void *cls, uint64_t offset,
+heap_plugin_get_zero_anonymity (void *cls, uint64_t next_uid,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
struct ZeroAnonByType *zabt;
- struct Value *value;
- uint64_t count;
+ struct Value *value = NULL;
- count = 0;
for (zabt = plugin->zero_head; NULL != zabt; zabt = zabt->next)
{
if ( (type != GNUNET_BLOCK_TYPE_ANY) &&
- (type != zabt->type) )
+ (type != zabt->type) )
continue;
- count += zabt->array_pos;
+ for (int i = 0; i < zabt->array_pos; ++i)
+ {
+ if ( (uint64_t) (intptr_t) zabt->array[i] < next_uid)
+ continue;
+ if ( (NULL != value) &&
+ (zabt->array[i] > value) )
+ continue;
+ value = zabt->array[i];
+ }
}
- if (0 == count)
+ if (NULL == value)
{
proc (proc_cls,
- NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- offset = offset % count;
- for (zabt = plugin->zero_head; NULL != zabt; zabt = zabt->next)
- {
- if ( (type != GNUNET_BLOCK_TYPE_ANY) &&
- (type != zabt->type) )
- continue;
- if (offset >= zabt->array_pos)
- {
- offset -= zabt->array_pos;
- continue;
- }
- break;
- }
- GNUNET_assert (NULL != zabt);
- value = zabt->array[offset];
if (GNUNET_NO ==
proc (proc_cls,
&value->key,
value->priority,
value->anonymity,
value->expiration,
- (uint64_t) (long) value))
+ (uint64_t) (intptr_t) value))
delete_value (plugin, value);
}
#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
-#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?"
- struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash;
+#define SELECT_ENTRY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
+ struct GNUNET_MYSQL_StatementHandle *select_entry;
-#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
-#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?"
- struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
-
-#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
-#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?"
- struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
-
-#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
-#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?"
- struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
-
-#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
+#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
"FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
- "WHERE anonLevel=0 AND type=? AND "\
- "(rvalue >= ? OR"\
- " NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
- "ORDER BY rvalue ASC LIMIT 1"
+ "WHERE anonLevel=0 AND type=? AND uid >= ? "\
+ "ORDER BY uid LIMIT 1"
struct GNUNET_MYSQL_StatementHandle *zero_iter;
#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
* Get one of the results for a particular key in the datastore.
*
* @param cls closure
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key key to match, never NULL
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
*/
static void
mysql_plugin_get_key (void *cls,
- uint64_t offset,
+ uint64_t next_uid,
+ bool random,
const struct GNUNET_HashCode *key,
const struct GNUNET_HashCode *vhash,
enum GNUNET_BLOCK_Type type,
void *proc_cls)
{
struct Plugin *plugin = cls;
- int ret;
- uint64_t total;
- struct GNUNET_MY_ResultSpec results_get[] = {
- GNUNET_MY_result_spec_uint64 (&total),
- GNUNET_MY_result_spec_end
- };
+ uint64_t rvalue;
- total = UINT64_MAX;
- if (0 != type)
+ if (random)
{
- if (NULL != vhash)
- {
- struct GNUNET_MY_QueryParam params_get[] = {
- GNUNET_MY_query_param_auto_from_type (key),
- GNUNET_MY_query_param_auto_from_type (vhash),
- GNUNET_MY_query_param_uint32 (&type),
- GNUNET_MY_query_param_end
- };
-
- ret =
- GNUNET_MY_exec_prepared (plugin->mc,
- plugin->count_entry_by_hash_vhash_and_type,
- params_get);
- GNUNET_break (GNUNET_OK == ret);
- if (GNUNET_OK == ret)
- ret =
- GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
- results_get);
- if (GNUNET_OK == ret)
- GNUNET_break (GNUNET_NO ==
- GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
- NULL));
- }
- else
- {
- struct GNUNET_MY_QueryParam params_get[] = {
- GNUNET_MY_query_param_auto_from_type (key),
- GNUNET_MY_query_param_uint32 (&type),
- GNUNET_MY_query_param_end
- };
-
- ret =
- GNUNET_MY_exec_prepared (plugin->mc,
- plugin->count_entry_by_hash_and_type,
- params_get);
- GNUNET_break (GNUNET_OK == ret);
- if (GNUNET_OK == ret)
- ret =
- GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
- results_get);
- if (GNUNET_OK == ret)
- GNUNET_break (GNUNET_NO ==
- GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
- NULL));
- }
+ rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ next_uid = 0;
}
else
- {
- if (NULL != vhash)
- {
- struct GNUNET_MY_QueryParam params_get[] = {
- GNUNET_MY_query_param_auto_from_type (key),
- GNUNET_MY_query_param_auto_from_type (vhash),
- GNUNET_MY_query_param_end
- };
+ rvalue = 0;
- ret =
- GNUNET_MY_exec_prepared (plugin->mc,
- plugin->count_entry_by_hash_and_vhash,
- params_get);
- GNUNET_break (GNUNET_OK == ret);
- if (GNUNET_OK == ret)
- ret =
- GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
- results_get);
- if (GNUNET_OK == ret)
- GNUNET_break (GNUNET_NO ==
- GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
- NULL));
- }
- else
- {
- struct GNUNET_MY_QueryParam params_get[] = {
- GNUNET_MY_query_param_auto_from_type (key),
- GNUNET_MY_query_param_end
- };
-
- ret =
- GNUNET_MY_exec_prepared (plugin->mc,
- plugin->count_entry_by_hash,
- params_get);
- GNUNET_break (GNUNET_OK == ret);
- if (GNUNET_OK == ret)
- ret =
- GNUNET_MY_extract_result (plugin->count_entry_by_hash,
- results_get);
- if (GNUNET_OK == ret)
- GNUNET_break (GNUNET_NO ==
- GNUNET_MY_extract_result (plugin->count_entry_by_hash,
- NULL));
- }
- }
- if ( (GNUNET_OK != ret) ||
- (0 >= total) )
+ if (NULL == key)
{
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
+ struct GNUNET_MY_QueryParam params_select[] = {
+ GNUNET_MY_query_param_uint64 (&next_uid),
+ GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_end
+ };
+
+ execute_select (plugin,
+ plugin->select_entry,
+ proc,
+ proc_cls,
+ params_select);
}
- offset = offset % total;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Obtaining %llu/%lld result for GET `%s'\n",
- (unsigned long long) offset,
- (unsigned long long) total,
- GNUNET_h2s (key));
- if (type != GNUNET_BLOCK_TYPE_ANY)
+ else if (type != GNUNET_BLOCK_TYPE_ANY)
{
if (NULL != vhash)
{
GNUNET_MY_query_param_auto_from_type (key),
GNUNET_MY_query_param_auto_from_type (vhash),
GNUNET_MY_query_param_uint32 (&type),
- GNUNET_MY_query_param_uint64 (&offset),
+ GNUNET_MY_query_param_uint64 (&next_uid),
+ GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_uint64 (&rvalue),
GNUNET_MY_query_param_end
};
struct GNUNET_MY_QueryParam params_select[] = {
GNUNET_MY_query_param_auto_from_type (key),
GNUNET_MY_query_param_uint32 (&type),
- GNUNET_MY_query_param_uint64 (&offset),
+ GNUNET_MY_query_param_uint64 (&next_uid),
+ GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_uint64 (&rvalue),
GNUNET_MY_query_param_end
};
struct GNUNET_MY_QueryParam params_select[] = {
GNUNET_MY_query_param_auto_from_type (key),
GNUNET_MY_query_param_auto_from_type (vhash),
- GNUNET_MY_query_param_uint64 (&offset),
+ GNUNET_MY_query_param_uint64 (&next_uid),
+ GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_uint64 (&rvalue),
GNUNET_MY_query_param_end
};
{
struct GNUNET_MY_QueryParam params_select[] = {
GNUNET_MY_query_param_auto_from_type (key),
- GNUNET_MY_query_param_uint64 (&offset),
+ GNUNET_MY_query_param_uint64 (&next_uid),
+ GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_uint64 (&rvalue),
GNUNET_MY_query_param_end
};
* Get a zero-anonymity datum from the datastore.
*
* @param cls our `struct Plugin *`
- * @param offset offset of the result
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param proc function to call on a matching value or NULL
+ * Must not be zero (ANY).
+ * @param proc function to call on a matching value;
+ * will be called with NULL if no value matches
* @param proc_cls closure for @a proc
*/
static void
mysql_plugin_get_zero_anonymity (void *cls,
- uint64_t offset,
+ uint64_t next_uid,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
uint32_t typei = (uint32_t) type;
- uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT64_MAX);
+
struct GNUNET_MY_QueryParam params_zero_iter[] = {
GNUNET_MY_query_param_uint32 (&typei),
- GNUNET_MY_query_param_uint64 (&rvalue),
- GNUNET_MY_query_param_uint32 (&typei),
- GNUNET_MY_query_param_uint64 (&rvalue),
+ GNUNET_MY_query_param_uint64 (&next_uid),
GNUNET_MY_query_param_end
};
") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
PINIT (plugin->insert_entry, INSERT_ENTRY) ||
PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
+ PINIT (plugin->select_entry, SELECT_ENTRY) ||
PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
PINIT (plugin->select_entry_by_hash_and_vhash,
SELECT_ENTRY_BY_HASH_AND_VHASH) ||
SELECT_ENTRY_BY_HASH_AND_TYPE) ||
PINIT (plugin->select_entry_by_hash_vhash_and_type,
SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
- PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
PINIT (plugin->get_size, SELECT_SIZE) ||
- PINIT (plugin->count_entry_by_hash_and_vhash,
- COUNT_ENTRY_BY_HASH_AND_VHASH) ||
- PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
- || PINIT (plugin->count_entry_by_hash_vhash_and_type,
- COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
PINIT (plugin->update_entry, UPDATE_ENTRY) ||
PINIT (plugin->dec_repl, DEC_REPL) ||
PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
* we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
* we do math or inequality tests, so we can't handle the entire range of uint32_t.
* This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
+ * PostgreSQL also recommends against using WITH OIDS.
*/
ret =
PQexec (plugin->dbh,
}
PQclear (ret);
if ((GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND vhash=$2 AND type=$3 "
- "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND type=$2 "
- "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
- "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND vhash=$2 "
- "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
- (GNUNET_OK !=
GNUNET_POSTGRES_prepare (plugin->dbh, "get",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "count_getvt",
- "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "count_gett",
- "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "count_getv",
- "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2)) ||
- (GNUNET_OK !=
- GNUNET_POSTGRES_prepare (plugin->dbh, "count_get",
- "SELECT count(*) FROM gn090 WHERE hash=$1", 1)) ||
+ "WHERE oid >= $1::bigint AND "
+ "(rvalue >= $2 OR 0 = $3::smallint) AND "
+ "(hash = $4 OR 0 = $5::smallint) AND "
+ "(vhash = $6 OR 0 = $7::smallint) AND "
+ "(type = $8 OR 0 = $9::smallint) "
+ "ORDER BY oid ASC LIMIT 1", 9)) ||
(GNUNET_OK !=
GNUNET_POSTGRES_prepare (plugin->dbh, "put",
"INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
- "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
+ "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) ||
(GNUNET_OK !=
GNUNET_POSTGRES_prepare (plugin->dbh, "update",
"UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
(GNUNET_OK !=
GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
- 1)) ||
+ "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
+ "ORDER BY oid ASC LIMIT 1",
+ 2)) ||
(GNUNET_OK !=
GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
"(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
struct Plugin *plugin = cls;
uint32_t utype = type;
struct GNUNET_HashCode vhash;
+ uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
PGresult *ret;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint32 (&replication),
GNUNET_PQ_query_param_uint32 (&priority),
GNUNET_PQ_query_param_uint32 (&anonymity),
GNUNET_PQ_query_param_absolute_time (&expiration),
+ GNUNET_PQ_query_param_uint64 (&rvalue),
GNUNET_PQ_query_param_auto_from_type (key),
GNUNET_PQ_query_param_auto_from_type (&vhash),
GNUNET_PQ_query_param_fixed_size (data, size),
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
*
* @param cls closure with the 'struct Plugin'
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* @param type entries of which type are relevant?
* Use 0 for any type.
* @param proc function to call on the matching value;
- * will be called once with a NULL if no value matches
- * @param proc_cls closure for iter
+ * will be called with NULL if nothing matches
+ * @param proc_cls closure for @a proc
*/
static void
postgres_plugin_get_key (void *cls,
- uint64_t offset,
+ uint64_t next_uid,
+ bool random,
const struct GNUNET_HashCode *key,
const struct GNUNET_HashCode *vhash,
enum GNUNET_BLOCK_Type type,
- PluginDatumProcessor proc,
+ PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
uint32_t utype = type;
+ uint16_t use_rvalue = random;
+ uint16_t use_key = NULL != key;
+ uint16_t use_vhash = NULL != vhash;
+ uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
+ uint64_t rvalue;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint64 (&next_uid),
+ GNUNET_PQ_query_param_uint64 (&rvalue),
+ GNUNET_PQ_query_param_uint16 (&use_rvalue),
+ GNUNET_PQ_query_param_auto_from_type (key),
+ GNUNET_PQ_query_param_uint16 (&use_key),
+ GNUNET_PQ_query_param_auto_from_type (vhash),
+ GNUNET_PQ_query_param_uint16 (&use_vhash),
+ GNUNET_PQ_query_param_uint32 (&utype),
+ GNUNET_PQ_query_param_uint16 (&use_type),
+ GNUNET_PQ_query_param_end
+ };
PGresult *ret;
- uint64_t total;
- uint64_t limit_off;
- if (0 != type)
+ if (random)
{
- if (NULL != vhash)
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_auto_from_type (vhash),
- GNUNET_PQ_query_param_uint32 (&utype),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "count_getvt",
- params);
- }
- else
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_uint32 (&utype),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "count_gett",
- params);
- }
+ rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ next_uid = 0;
}
else
- {
- if (NULL != vhash)
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_auto_from_type (vhash),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "count_getv",
- params);
- }
- else
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "count_get",
- params);
- }
- }
+ rvalue = 0;
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh,
- ret,
- PGRES_TUPLES_OK,
- "PQexecParams",
- "count"))
- {
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- if ( (PQntuples (ret) != 1) ||
- (PQnfields (ret) != 1) ||
- (PQgetlength (ret, 0, 0) != sizeof (uint64_t)))
- {
- GNUNET_break (0);
- PQclear (ret);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- total = GNUNET_ntohll (*(const uint64_t *) PQgetvalue (ret, 0, 0));
- PQclear (ret);
- if (0 == total)
- {
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- limit_off = offset % total;
-
- if (0 != type)
- {
- if (NULL != vhash)
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_auto_from_type (vhash),
- GNUNET_PQ_query_param_uint32 (&utype),
- GNUNET_PQ_query_param_uint64 (&limit_off),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "getvt",
- params);
- }
- else
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_uint32 (&utype),
- GNUNET_PQ_query_param_uint64 (&limit_off),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "gett",
- params);
- }
- }
- else
- {
- if (NULL != vhash)
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_auto_from_type (vhash),
- GNUNET_PQ_query_param_uint64 (&limit_off),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "getv",
- params);
- }
- else
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (key),
- GNUNET_PQ_query_param_uint64 (&limit_off),
- GNUNET_PQ_query_param_end
- };
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "get",
- params);
- }
- }
+ ret = GNUNET_PQ_exec_prepared (plugin->dbh,
+ "get",
+ params);
process_result (plugin,
proc,
proc_cls,
* the given iterator for each of them.
*
* @param cls our `struct Plugin *`
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
- * Use 0 for any type.
+ * Must not be zero (ANY).
* @param proc function to call on the matching value;
- * will be called with a NULL if no value matches
+ * will be called with NULL if no value matches
* @param proc_cls closure for @a proc
*/
static void
postgres_plugin_get_zero_anonymity (void *cls,
- uint64_t offset,
+ uint64_t next_uid,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc,
- void *proc_cls)
+ void *proc_cls)
{
struct Plugin *plugin = cls;
uint32_t utype = type;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint32 (&utype),
- GNUNET_PQ_query_param_uint64 (&offset),
+ GNUNET_PQ_query_param_uint64 (&next_uid),
GNUNET_PQ_query_param_end
};
PGresult *ret;
/**
* Precompiled SQL for selection
*/
- sqlite3_stmt *count_key;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *count_key_vhash;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *count_key_type;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *count_key_vhash_type;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *get_key;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *get_key_vhash;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *get_key_type;
-
- /**
- * Precompiled SQL for selection
- */
- sqlite3_stmt *get_key_vhash_type;
+ sqlite3_stmt *get;
/**
* Should the database be dropped on shutdown?
#if SQLITE_VERSION_NUMBER >= 3007000
"INDEXED BY idx_anon_type_hash "
#endif
- "WHERE (anonLevel = 0 AND type=?1) "
- "ORDER BY hash DESC LIMIT 1 OFFSET ?2",
+ "WHERE _ROWID_ >= ? AND "
+ "anonLevel = 0 AND "
+ "type = ? "
+ "ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->selZeroAnon)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
"INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
&plugin->insertContent)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=?",
- &plugin->count_key)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?",
- &plugin->count_key_vhash)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=? AND type=?",
- &plugin->count_key_type)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?",
- &plugin->count_key_vhash_type)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
- "WHERE hash=?"
- "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
- &plugin->get_key)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
- "WHERE hash=? AND vhash=?"
- "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
- &plugin->get_key_vhash)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
"SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
- "WHERE hash=? AND type=?"
- "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
- &plugin->get_key_type)) ||
- (SQLITE_OK !=
- sq_prepare (plugin->dbh,
- "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
- "WHERE hash=? AND vhash=? AND type=?"
- "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
- &plugin->get_key_vhash_type)) ||
+ "WHERE _ROWID_ >= ? AND "
+ "(rvalue >= ? OR 0 = ?) AND "
+ "(hash = ? OR 0 = ?) AND "
+ "(vhash = ? OR 0 = ?) AND "
+ "(type = ? OR 0 = ?) "
+ "ORDER BY _ROWID_ ASC LIMIT 1",
+ &plugin->get)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
"DELETE FROM gn090 WHERE _ROWID_ = ?",
sqlite3_finalize (plugin->selZeroAnon);
if (NULL != plugin->insertContent)
sqlite3_finalize (plugin->insertContent);
- if (NULL != plugin->count_key)
- sqlite3_finalize (plugin->count_key);
- if (NULL != plugin->count_key_vhash)
- sqlite3_finalize (plugin->count_key_vhash);
- if (NULL != plugin->count_key_type)
- sqlite3_finalize (plugin->count_key_type);
- if (NULL != plugin->count_key_vhash_type)
- sqlite3_finalize (plugin->count_key_vhash_type);
- if (NULL != plugin->count_key)
- sqlite3_finalize (plugin->get_key);
- if (NULL != plugin->count_key_vhash)
- sqlite3_finalize (plugin->get_key_vhash);
- if (NULL != plugin->count_key_type)
- sqlite3_finalize (plugin->get_key_type);
- if (NULL != plugin->count_key_vhash_type)
- sqlite3_finalize (plugin->get_key_vhash_type);
+ if (NULL != plugin->get)
+ sqlite3_finalize (plugin->get);
result = sqlite3_close (plugin->dbh);
#if SQLITE_VERSION_NUMBER >= 3007000
if (result == SQLITE_BUSY)
* the given processor for the item.
*
* @param cls our plugin context
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param proc function to call on each matching value;
- * will be called once with a NULL value at the end
+ * Must not be zero (ANY).
+ * @param proc function to call on the matching value;
+ * will be called with NULL if no value matches
* @param proc_cls closure for @a proc
*/
static void
sqlite_plugin_get_zero_anonymity (void *cls,
- uint64_t offset,
+ uint64_t next_uid,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
struct GNUNET_SQ_QueryParam params[] = {
+ GNUNET_SQ_query_param_uint64 (&next_uid),
GNUNET_SQ_query_param_uint32 (&type),
- GNUNET_SQ_query_param_uint64 (&offset),
GNUNET_SQ_query_param_end
};
- sqlite3_stmt *stmt = plugin->selZeroAnon;
GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
if (GNUNET_OK !=
- GNUNET_SQ_bind (stmt,
+ GNUNET_SQ_bind (plugin->selZeroAnon,
params))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- execute_get (plugin, stmt, proc, proc_cls);
+ execute_get (plugin, plugin->selZeroAnon, proc, proc_cls);
}
* Get results for a particular key in the datastore.
*
* @param cls closure
- * @param offset offset (mod count).
- * @param key key to match, never NULL
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
+ * @param key maybe NULL (to match all entries)
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* Note that for DBlocks there is no difference
*/
static void
sqlite_plugin_get_key (void *cls,
- uint64_t offset,
+ uint64_t next_uid,
+ bool random,
const struct GNUNET_HashCode *key,
const struct GNUNET_HashCode *vhash,
enum GNUNET_BLOCK_Type type,
void *proc_cls)
{
struct Plugin *plugin = cls;
+ uint64_t rvalue;
+ uint16_t use_rvalue = random;
uint32_t type32 = (uint32_t) type;
- int ret;
- int total;
- uint32_t limit_off;
- struct GNUNET_SQ_QueryParam count_params_key[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam count_params_key_vhash[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_auto_from_type (vhash),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam count_params_key_type[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_uint32 (&type32),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam count_params_key_vhash_type[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_auto_from_type (vhash),
- GNUNET_SQ_query_param_uint32 (&type32),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam get_params_key[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_uint32 (&limit_off),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam get_params_key_vhash[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_auto_from_type (vhash),
- GNUNET_SQ_query_param_uint32 (&limit_off),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam get_params_key_type[] = {
- GNUNET_SQ_query_param_auto_from_type (key),
- GNUNET_SQ_query_param_uint32 (&type32),
- GNUNET_SQ_query_param_uint32 (&limit_off),
- GNUNET_SQ_query_param_end
- };
- struct GNUNET_SQ_QueryParam get_params_key_vhash_type[] = {
+ uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
+ uint16_t use_key = NULL != key;
+ uint16_t use_vhash = NULL != vhash;
+ struct GNUNET_SQ_QueryParam params[] = {
+ GNUNET_SQ_query_param_uint64 (&next_uid),
+ GNUNET_SQ_query_param_uint64 (&rvalue),
+ GNUNET_SQ_query_param_uint16 (&use_rvalue),
GNUNET_SQ_query_param_auto_from_type (key),
+ GNUNET_SQ_query_param_uint16 (&use_key),
GNUNET_SQ_query_param_auto_from_type (vhash),
+ GNUNET_SQ_query_param_uint16 (&use_vhash),
GNUNET_SQ_query_param_uint32 (&type32),
- GNUNET_SQ_query_param_uint32 (&limit_off),
+ GNUNET_SQ_query_param_uint16 (&use_type),
GNUNET_SQ_query_param_end
};
- struct GNUNET_SQ_QueryParam *count_params;
- sqlite3_stmt *count_stmt;
- struct GNUNET_SQ_QueryParam *get_params;
- sqlite3_stmt *get_stmt;
- if (NULL == vhash)
+ if (random)
{
- if (GNUNET_BLOCK_TYPE_ANY == type)
- {
- count_params = count_params_key;
- count_stmt = plugin->count_key;
- get_params = get_params_key;
- get_stmt = plugin->get_key;
- }
- else
- {
- count_params = count_params_key_type;
- count_stmt = plugin->count_key_type;
- get_params = get_params_key_type;
- get_stmt = plugin->get_key_type;
- }
+ rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ next_uid = 0;
}
else
- {
- if (GNUNET_BLOCK_TYPE_ANY == type)
- {
- count_params = count_params_key_vhash;
- count_stmt = plugin->count_key_vhash;
- get_params = get_params_key_vhash;
- get_stmt = plugin->get_key_vhash;
- }
- else
- {
- count_params = count_params_key_vhash_type;
- count_stmt = plugin->count_key_vhash_type;
- get_params = get_params_key_vhash_type;
- get_stmt = plugin->get_key_vhash_type;
- }
- }
- if (GNUNET_OK !=
- GNUNET_SQ_bind (count_stmt,
- count_params))
- {
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- ret = sqlite3_step (count_stmt);
- if (ret != SQLITE_ROW)
- {
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite_step");
- GNUNET_SQ_reset (plugin->dbh,
- count_stmt);
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- total = sqlite3_column_int (count_stmt,
- 0);
- GNUNET_SQ_reset (plugin->dbh,
- count_stmt);
- if (0 == total)
- {
- proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- return;
- }
- limit_off = (uint32_t) (offset % total);
+ rvalue = 0;
+
if (GNUNET_OK !=
- GNUNET_SQ_bind (get_stmt,
- get_params))
+ GNUNET_SQ_bind (plugin->get,
+ params))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
execute_get (plugin,
- get_stmt,
+ plugin->get,
proc,
proc_cls);
- GNUNET_SQ_reset (plugin->dbh,
- get_stmt);
}
* Get one of the results for a particular key in the datastore.
*
* @param cls closure
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* @param proc_cls closure for proc
*/
static void
-template_plugin_get_key (void *cls, uint64_t offset,
+template_plugin_get_key (void *cls, uint64_t next_uid, bool random,
const struct GNUNET_HashCode * key,
const struct GNUNET_HashCode * vhash,
enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
* Call the given processor on an item with zero anonymity.
*
* @param cls our "struct Plugin*"
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param proc function to call on each matching value;
- * will be called with NULL if no value matches
+ * Must not be zero (ANY).
+ * @param proc function to call on the matching value;
+ * will be called with NULL if no value matches
* @param proc_cls closure for proc
*/
static void
-template_plugin_get_zero_anonymity (void *cls, uint64_t offset,
+template_plugin_get_zero_anonymity (void *cls, uint64_t next_uid,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc, void *proc_cls)
{
void *data;
size_t size;
- uint64_t uid;
- uint64_t offset;
uint64_t first_uid;
};
GNUNET_assert (priority == get_priority (i));
GNUNET_assert (anonymity == get_anonymity (i));
GNUNET_assert (expiration.abs_value_us == get_expiration (i).abs_value_us);
- crc->offset++;
if (crc->i == 0)
{
crc->phase = RP_DEL;
case RP_GET_MULTIPLE:
crc->phase = RP_GET_MULTIPLE_NEXT;
crc->first_uid = uid;
- crc->offset++;
break;
case RP_GET_MULTIPLE_NEXT:
GNUNET_assert (uid != crc->first_uid);
crc->phase = RP_ERROR;
break;
}
- if (priority == get_priority (42))
- crc->uid = uid;
GNUNET_SCHEDULER_add_now (&run_continuation, crc);
}
sizeof (int),
&crc->key);
GNUNET_DATASTORE_get_key (datastore,
- crc->offset,
+ 0,
+ false,
&crc->key,
get_type (crc->i),
1,
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
GNUNET_assert (NULL !=
GNUNET_DATASTORE_get_key (datastore,
- crc->offset,
+ 0,
+ false,
&crc->key,
get_type (crc->i),
1,
crc->i);
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
GNUNET_assert (NULL !=
- GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key,
- get_type (crc->i), 1, 1,
- &check_nothing, crc));
+ GNUNET_DATASTORE_get_key (datastore,
+ 0,
+ false,
+ &crc->key,
+ get_type (crc->i),
+ 1,
+ 1,
+ &check_nothing,
+ crc));
break;
case RP_RESERVE:
crc->phase = RP_PUT_MULTIPLE;
case RP_GET_MULTIPLE:
GNUNET_assert (NULL !=
GNUNET_DATASTORE_get_key (datastore,
- crc->offset,
+ 0,
+ false,
&crc->key,
- get_type (42), 1, 1,
- &check_multiple, crc));
+ get_type (42),
+ 1,
+ 1,
+ &check_multiple,
+ crc));
break;
case RP_GET_MULTIPLE_NEXT:
GNUNET_assert (NULL !=
GNUNET_DATASTORE_get_key (datastore,
- crc->offset,
+ crc->first_uid + 1,
+ false,
&crc->key,
get_type (42),
- 1, 1,
- &check_multiple, crc));
+ 1,
+ 1,
+ &check_multiple,
+ crc));
break;
case RP_DONE:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
const struct GNUNET_CONFIGURATION_Handle *cfg;
void *data;
enum RunPhase phase;
- uint64_t offset;
};
GNUNET_assert (priority == get_priority (i));
GNUNET_assert (anonymity == get_anonymity (i));
GNUNET_assert (expiration.abs_value_us == get_expiration (i).abs_value_us);
- crc->offset++;
crc->i--;
if (crc->i == 0)
crc->phase = RP_DONE;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "GET",
crc->i);
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key,
- get_type (crc->i), 1, 1,
+ GNUNET_DATASTORE_get_key (datastore,
+ 0,
+ false,
+ &crc->key,
+ get_type (crc->i),
+ 1,
+ 1,
&check_value,
crc);
break;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "GET(f)",
crc->i);
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key,
- get_type (crc->i), 1, 1,
+ GNUNET_DATASTORE_get_key (datastore,
+ 0,
+ false,
+ &crc->key,
+ get_type (crc->i),
+ 1,
+ 1,
&check_nothing,
crc);
break;
enum RunPhase phase;
unsigned int cnt;
unsigned int i;
- uint64_t offset;
};
"Looking for %s\n",
GNUNET_h2s (&key));
crc->api->get_key (crc->api->cls,
- crc->offset++,
+ 0,
+ false,
&key,
NULL,
GNUNET_BLOCK_TYPE_ANY,
*/
struct GNUNET_CRYPTO_FileHashContext *fhc;
- /**
- * Which values have we seen already?
- */
- struct GNUNET_CONTAINER_MultiHashMap *seen_dh;
-
/**
* Overall size of the file.
*/
uint64_t file_size;
- /**
- * Random offset given to #GNUNET_DATASTORE_get_key.
- */
- uint64_t roff;
-
/**
* When did we start?
*/
uc->fh = NULL;
GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO);
uc->dsh = NULL;
- GNUNET_CONTAINER_multihashmap_destroy (uc->seen_dh);
- uc->seen_dh = NULL;
uc->state = UNINDEX_STATE_FS_NOTIFY;
GNUNET_FS_unindex_sync_ (uc);
uc->mq = GNUNET_CLIENT_connect (uc->h->cfg,
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Failed to remove UBlock: %s\n"),
msg);
- GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
uc->ksk_offset++;
GNUNET_FS_unindex_do_remove_kblocks_ (uc);
}
const struct UBlock *ub;
struct GNUNET_FS_Uri *chk_uri;
struct GNUNET_HashCode query;
- struct GNUNET_HashCode dh;
uc->dqe = NULL;
if (NULL == data)
{
/* no result */
- GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
uc->ksk_offset++;
GNUNET_FS_unindex_do_remove_kblocks_ (uc);
return;
}
- GNUNET_CRYPTO_hash (data,
- size,
- &dh);
- if (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_contains (uc->seen_dh,
- &dh))
- {
- GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
- uc->ksk_offset++;
- GNUNET_FS_unindex_do_remove_kblocks_ (uc);
- return;
- }
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (uc->seen_dh,
- &dh,
- uc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type);
if (size < sizeof (struct UBlock))
{
GNUNET_FS_uri_destroy (chk_uri);
/* matches! */
uc->dqe = GNUNET_DATASTORE_remove (uc->dsh,
- key,
+ key,
size,
data,
- 0 /* priority */,
+ 0 /* priority */,
1 /* queue size */,
- &continue_after_remove,
- uc);
+ &continue_after_remove,
+ uc);
return;
get_next:
uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh,
- uc->roff++,
- &uc->uquery,
- GNUNET_BLOCK_TYPE_FS_UBLOCK,
- 0 /* priority */,
+ uid + 1 /* next_uid */,
+ false /* random */,
+ &uc->uquery,
+ GNUNET_BLOCK_TYPE_FS_UBLOCK,
+ 0 /* priority */,
1 /* queue size */,
- &process_kblock_for_unindex,
- uc);
+ &process_kblock_for_unindex,
+ uc);
}
sizeof (dpub),
&uc->uquery);
uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh,
- uc->roff++,
- &uc->uquery,
- GNUNET_BLOCK_TYPE_FS_UBLOCK,
- 0 /* priority */,
+ 0 /* next_uid */,
+ false /* random */,
+ &uc->uquery,
+ GNUNET_BLOCK_TYPE_FS_UBLOCK,
+ 0 /* priority */,
1 /* queue size */,
- &process_kblock_for_unindex,
- uc);
+ &process_kblock_for_unindex,
+ uc);
}
uc->start_time = GNUNET_TIME_absolute_get ();
uc->file_size = size;
uc->client_info = cctx;
- uc->seen_dh = GNUNET_CONTAINER_multihashmap_create (4,
- GNUNET_NO);
GNUNET_FS_unindex_sync_ (uc);
pi.status = GNUNET_FS_STATUS_UNINDEX_START;
pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL;
GNUNET_NO);
refresh_timeout_task (sc);
sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
- 0,
- &sqm->query,
- ntohl (sqm->type),
- 0 /* priority */,
- GSF_datastore_queue_size,
- &handle_datastore_reply,
+ 0 /* next_uid */,
+ false /* random */,
+ &sqm->query,
+ ntohl (sqm->type),
+ 0 /* priority */,
+ GSF_datastore_queue_size,
+ &handle_datastore_reply,
sc);
if (NULL == sc->qe)
{
struct GNUNET_SCHEDULER_Task * warn_task;
/**
- * Current offset for querying our local datastore for results.
- * Starts at a random value, incremented until we get the same
- * UID again (detected using 'first_uid'), which is then used
- * to termiante the iteration.
+ * Do we have a first UID yet?
+ */
+ bool have_first_uid;
+
+ /**
+ * Have we seen a NULL result yet?
*/
- uint64_t local_result_offset;
+ bool seen_null;
/**
* Unique ID of the first result from the local datastore;
- * used to detect wrap-around of the offset.
+ * used to terminate the loop.
*/
uint64_t first_uid;
+ /**
+ * Result count.
+ */
+ size_t result_count;
+
/**
* How often have we retried this request via 'cadet'?
* (used to bound overall retries).
*/
unsigned int replies_seen_size;
- /**
- * Do we have a first UID yet?
- */
- unsigned int have_first_uid;
-
};
if (NULL != target)
extra += sizeof (struct GNUNET_PeerIdentity);
pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
- pr->local_result_offset =
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
pr->public_data.query = *query;
eptr = (struct GNUNET_HashCode *) &pr[1];
if (NULL != target)
}
+/* Call our continuation (if we have any) */
+static void
+call_continuation (struct GSF_PendingRequest *pr)
+{
+ GSF_LocalLookupContinuation cont = pr->llc_cont;
+
+ GNUNET_assert (NULL == pr->qe);
+ if (NULL != pr->warn_task)
+ {
+ GNUNET_SCHEDULER_cancel (pr->warn_task);
+ pr->warn_task = NULL;
+ }
+ if (NULL == cont)
+ return; /* no continuation */
+ pr->llc_cont = NULL;
+ if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
+ {
+ if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
+ {
+ /* Signal that we are done and that there won't be any
+ additional results to allow client to clean up state. */
+ pr->rh (pr->rh_cls,
+ GNUNET_BLOCK_EVALUATION_OK_LAST,
+ pr,
+ UINT32_MAX,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_BLOCK_TYPE_ANY,
+ NULL,
+ 0);
+ }
+ /* Finally, call our continuation to signal that we are
+ done with local processing of this request; i.e. to
+ start reading again from the client. */
+ cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
+ return;
+ }
+
+ cont (pr->llc_cont_cls, pr, pr->local_result);
+}
+
+
+/* Update stats and call continuation */
+static void
+no_more_local_results (struct GSF_PendingRequest *pr)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "No further local responses available.\n");
+#if INSANE_STATISTICS
+ if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
+ (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type) )
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requested DBLOCK or IBLOCK not found"),
+ 1,
+ GNUNET_NO);
+#endif
+ call_continuation (pr);
+}
+
+
+/* forward declaration */
+static void
+process_local_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid);
+
+
+/* Start a local query */
+static void
+start_local_query (struct GSF_PendingRequest *pr,
+ uint64_t next_uid,
+ bool random)
+{
+ pr->qe_start = GNUNET_TIME_absolute_get ();
+ pr->warn_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &warn_delay_task,
+ pr);
+ pr->qe =
+ GNUNET_DATASTORE_get_key (GSF_dsh,
+ next_uid,
+ random,
+ &pr->public_data.query,
+ pr->public_data.type ==
+ GNUNET_BLOCK_TYPE_FS_DBLOCK ?
+ GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
+ (0 !=
+ (GSF_PRO_PRIORITY_UNLIMITED & pr->
+ public_data.options)) ? UINT_MAX : 1
+ /* queue priority */ ,
+ (0 !=
+ (GSF_PRO_PRIORITY_UNLIMITED & pr->
+ public_data.options)) ? UINT_MAX :
+ GSF_datastore_queue_size
+ /* max queue size */ ,
+ &process_local_reply, pr);
+ if (NULL != pr->qe)
+ return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
+ GNUNET_h2s (&pr->public_data.query),
+ pr->public_data.type,
+ (unsigned long long) next_uid);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Datastore lookups concluded (error queueing)"),
+ 1,
+ GNUNET_NO);
+ call_continuation (pr);
+}
+
+
/**
* We're processing (local) results for a search request
* from another peer. Pass applicable results to the
uint64_t uid)
{
struct GSF_PendingRequest *pr = cls;
- GSF_LocalLookupContinuation cont;
struct ProcessReplyClosure prq;
struct GNUNET_HashCode query;
unsigned int old_rf;
GNUNET_SCHEDULER_cancel (pr->warn_task);
pr->warn_task = NULL;
- if (NULL != pr->qe)
+ if (NULL == pr->qe)
+ goto called_from_on_demand;
+ pr->qe = NULL;
+ if ( (NULL == key) &&
+ pr->seen_null &&
+ !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
{
- pr->qe = NULL;
- if (NULL == key)
- {
+ /* No results */
#if INSANE_STATISTICS
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# Datastore lookups concluded (no results)"),
- 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# Datastore lookups concluded (no results)"),
+ 1, GNUNET_NO);
#endif
- }
- if (GNUNET_NO == pr->have_first_uid)
- {
- pr->first_uid = uid;
- pr->have_first_uid = 1;
- }
- else
- {
- if ((uid == pr->first_uid) && (key != NULL))
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# Datastore lookups concluded (seen all)"),
- 1, GNUNET_NO);
- key = NULL; /* all replies seen! */
- }
- pr->have_first_uid++;
- if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL))
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# Datastore lookups aborted (more than MAX_RESULTS)"),
- 1, GNUNET_NO);
- key = NULL; /* all replies seen! */
- }
- }
+ no_more_local_results (pr);
+ return;
+ }
+ if ( ( (NULL == key) &&
+ pr->seen_null ) || /* We have hit the end for the 2nd time OR */
+ ( pr->seen_null &&
+ pr->have_first_uid &&
+ (uid >= pr->first_uid) ) ) /* We have hit the end and past first UID */
+ {
+ /* Seen all results */
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# Datastore lookups concluded (seen all)"),
+ 1, GNUNET_NO);
+ no_more_local_results (pr);
+ return;
}
if (NULL == key)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "No further local responses available.\n");
-#if INSANE_STATISTICS
- if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
- (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK))
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# requested DBLOCK or IBLOCK not found"), 1,
- GNUNET_NO);
-#endif
- goto check_error_and_continue;
+ GNUNET_assert (!pr->seen_null);
+ pr->seen_null = true;
+ start_local_query (pr,
+ 0 /* next_uid */,
+ false /* random */);
+ return;
+ }
+ if (!pr->have_first_uid)
+ {
+ pr->first_uid = uid;
+ pr->have_first_uid = true;
+ }
+ pr->result_count++;
+ if (pr->result_count > MAX_RESULTS)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# Datastore lookups aborted (more than MAX_RESULTS)"),
+ 1, GNUNET_NO);
+ no_more_local_results (pr);
+ return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received reply for `%s' of type %d with UID %llu from datastore.\n",
GNUNET_h2s (key), type, (unsigned long long) uid);
- if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found ONDEMAND block, performing on-demand encoding\n");
gettext_noop ("# on-demand lookups failed"), 1,
GNUNET_NO);
GNUNET_SCHEDULER_cancel (pr->warn_task);
- pr->warn_task =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
- &warn_delay_task, pr);
- pr->qe =
- GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
- &pr->public_data.query,
- pr->public_data.type ==
- GNUNET_BLOCK_TYPE_FS_DBLOCK ?
- GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED &
- pr->public_data.options)) ? UINT_MAX : 1
- /* queue priority */ ,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED &
- pr->public_data.options)) ? UINT_MAX :
- GSF_datastore_queue_size
- /* max queue size */ ,
- &process_local_reply, pr);
- if (NULL != pr->qe)
- return; /* we're done */
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# Datastore lookups concluded (error queueing)"),
- 1, GNUNET_NO);
- goto check_error_and_continue;
+ start_local_query (pr,
+ uid + 1 /* next_uid */,
+ false /* random */);
+ return;
}
+called_from_on_demand:
old_rf = pr->public_data.results_found;
memset (&prq, 0, sizeof (prq));
prq.data = data;
GNUNET_break (0);
GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1,
NULL, NULL);
- pr->qe_start = GNUNET_TIME_absolute_get ();
- pr->warn_task =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
- &warn_delay_task, pr);
- pr->qe =
- GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
- &pr->public_data.query,
- pr->public_data.type ==
- GNUNET_BLOCK_TYPE_FS_DBLOCK ?
- GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED &
- pr->public_data.options)) ? UINT_MAX : 1
- /* queue priority */ ,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED &
- pr->public_data.options)) ? UINT_MAX :
- GSF_datastore_queue_size
- /* max queue size */ ,
- &process_local_reply, pr);
- if (NULL == pr->qe)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# Datastore lookups concluded (error queueing)"),
- 1, GNUNET_NO);
- goto check_error_and_continue;
- }
+ start_local_query (pr,
+ uid + 1 /* next_uid */,
+ false /* random */);
return;
}
prq.type = type;
prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
process_reply (&prq, key, pr);
pr->local_result = prq.eval;
- if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
+ if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# Datastore lookups concluded (found last result)"),
1,
GNUNET_NO);
- goto check_error_and_continue;
+ call_continuation (pr);
+ return;
}
if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
gettext_noop ("# Datastore lookups concluded (load too high)"),
1,
GNUNET_NO);
- goto check_error_and_continue;
- }
- pr->qe_start = GNUNET_TIME_absolute_get ();
- pr->warn_task =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
- &warn_delay_task,
- pr);
- pr->qe =
- GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
- &pr->public_data.query,
- pr->public_data.type ==
- GNUNET_BLOCK_TYPE_FS_DBLOCK ?
- GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED & pr->
- public_data.options)) ? UINT_MAX : 1
- /* queue priority */ ,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED & pr->
- public_data.options)) ? UINT_MAX :
- GSF_datastore_queue_size
- /* max queue size */ ,
- &process_local_reply, pr);
- /* check if we successfully queued another datastore request;
- * if so, return, otherwise call our continuation (if we have
- * any) */
-check_error_and_continue:
- if (NULL != pr->qe)
+ call_continuation (pr);
return;
- if (NULL != pr->warn_task)
- {
- GNUNET_SCHEDULER_cancel (pr->warn_task);
- pr->warn_task = NULL;
}
- if (NULL == (cont = pr->llc_cont))
- return; /* no continuation */
- pr->llc_cont = NULL;
- if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
- {
- if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
- {
- /* Signal that we are done and that there won't be any
- additional results to allow client to clean up state. */
- pr->rh (pr->rh_cls,
- GNUNET_BLOCK_EVALUATION_OK_LAST,
- pr,
- UINT32_MAX,
- GNUNET_TIME_UNIT_ZERO_ABS,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_BLOCK_TYPE_ANY,
- NULL, 0);
- }
- /* Finally, call our continuation to signal that we are
- done with local processing of this request; i.e. to
- start reading again from the client. */
- cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
- return;
- }
-
- cont (pr->llc_cont_cls, pr, pr->local_result);
+ start_local_query (pr,
+ uid + 1 /* next_uid */,
+ false /* random */);
}
GNUNET_assert (NULL == pr->llc_cont);
pr->llc_cont = cont;
pr->llc_cont_cls = cont_cls;
- pr->qe_start = GNUNET_TIME_absolute_get ();
- pr->warn_task =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
- &warn_delay_task,
- pr);
#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Datastore lookups initiated"), 1,
GNUNET_NO);
#endif
- pr->qe =
- GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
- &pr->public_data.query,
- pr->public_data.type ==
- GNUNET_BLOCK_TYPE_FS_DBLOCK ?
- GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED & pr->
- public_data.options)) ? UINT_MAX : 1
- /* queue priority */ ,
- (0 !=
- (GSF_PRO_PRIORITY_UNLIMITED & pr->
- public_data.options)) ? UINT_MAX :
- GSF_datastore_queue_size
- /* max queue size */ ,
- &process_local_reply, pr);
- if (NULL != pr->qe)
- return;
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# Datastore lookups concluded (error queueing)"),
- 1, GNUNET_NO);
- GNUNET_SCHEDULER_cancel (pr->warn_task);
- pr->warn_task = NULL;
- pr->llc_cont = NULL;
- if (NULL != cont)
- cont (cont_cls, pr, pr->local_result);
+ start_local_query(pr,
+ 0 /* next_uid */,
+ true /* random */);
}
uint64_t zero_anonymity_count_estimate;
/**
- * Current offset when iterating the database.
+ * Count of results received from the database.
*/
- uint64_t current_offset;
+ uint64_t result_count;
+
+ /**
+ * Next UID to request when iterating the database.
+ */
+ uint64_t next_uid;
};
*/
static void
process_dht_put_content (void *cls,
- const struct GNUNET_HashCode * key,
- size_t size,
+ const struct GNUNET_HashCode * key,
+ size_t size,
const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
- struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
{
struct PutOperator *po = cls;
po->dht_qe = NULL;
if (key == NULL)
{
- po->zero_anonymity_count_estimate = po->current_offset - 1;
- po->current_offset = 0;
+ po->zero_anonymity_count_estimate = po->result_count;
+ po->result_count = 0;
+ po->next_uid = 0;
po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
return;
}
+ po->result_count++;
+ po->next_uid = uid + 1;
po->zero_anonymity_count_estimate =
- GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
+ GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
type);
po->dht_put = GNUNET_DHT_put (GSF_dht,
key,
DEFAULT_PUT_REPLICATION,
- GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
type,
size,
data,
- expiration,
- &delay_dht_put_blocks, po);
+ expiration,
+ &delay_dht_put_blocks,
+ po);
}
po->dht_task = NULL;
po->dht_qe =
- GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
+ GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
+ po->next_uid,
+ 0,
UINT_MAX,
po->dht_put_type,
- &process_dht_put_content, po);
+ &process_dht_put_content,
+ po);
if (NULL == po->dht_qe)
po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
}
* Get one of the results for a particular key in the datastore.
*
* @param cls closure
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
- * @param key key to match, never NULL
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
+ * @param key maybe NULL (to match all entries)
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* Note that for DBlocks there is no difference
* @param type entries of which type are relevant?
* Use 0 for any type.
* @param proc function to call on the matching value;
- * proc should be called with NULL if there is no result
+ * will be called with NULL if nothing matches
* @param proc_cls closure for @a proc
*/
typedef void
(*PluginGetKey) (void *cls,
- uint64_t offset,
- const struct GNUNET_HashCode *key,
- const struct GNUNET_HashCode *vhash,
- enum GNUNET_BLOCK_Type type,
- PluginDatumProcessor proc,
- void *proc_cls);
+ uint64_t next_uid,
+ bool random,
+ const struct GNUNET_HashCode *key,
+ const struct GNUNET_HashCode *vhash,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls);
/**
/**
- * Select a single item from the datastore at the specified offset
- * (among those applicable).
+ * Select a single item from the datastore (among those applicable).
*
* @param cls closure
- * @param offset offset of the result (modulo num-results);
- * specific ordering does not matter for the offset
+ * @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
* Must not be zero (ANY).
- * @param proc function to call on the matching value
+ * @param proc function to call on the matching value;
+ * will be called with NULL if no value matches
* @param proc_cls closure for @a proc
*/
typedef void
(*PluginGetType) (void *cls,
- uint64_t offset,
- enum GNUNET_BLOCK_Type type,
- PluginDatumProcessor proc,
- void *proc_cls);
+ uint64_t next_uid,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls);
/**
/**
* Get datum (of the specified type) with anonymity level zero.
- * This function is allowed to ignore the 'offset' argument
- * and instead return a random result (with zero anonymity of
- * the correct type) if implementing an offset is expensive.
*/
PluginGetType get_zero_anonymity;
* will only be called once.
*
* @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param type desired type, 0 for any
* @param queue_priority ranking of this request in the priority queue
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
+ uint64_t next_uid,
+ bool random,
const struct GNUNET_HashCode *key,
enum GNUNET_BLOCK_Type type,
unsigned int queue_priority,
/**
* Get a single zero-anonymity value from the datastore.
- * Note that some implementations can ignore the 'offset' and
- * instead return a random zero-anonymity value. In that case,
- * detecting the wrap-around based on a repeating UID is at best
- * probabilistic.
*
* @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
+ uint64_t next_uid,
unsigned int queue_priority,
unsigned int max_queue_size,
enum GNUNET_BLOCK_Type type,
#include <stdlib.h>
#include <stdint.h>
#include <stdarg.h>
+#include <stdbool.h>
#include <errno.h>
#include <signal.h>
#include <libgen.h>