*
* @param h handle to the datacache
* @param key key to store data under
+ * @param am_closest are we the closest peer?
* @param data_size number of bytes in @a data
* @param data data to store
* @param type type of the value
int
GNUNET_DATACACHE_put (struct GNUNET_DATACACHE_Handle *h,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t data_size,
const char *data,
enum GNUNET_BLOCK_Type type,
used = h->api->put (h->api->cls,
key,
+ am_closest,
data_size,
data,
type,
*/
struct GNUNET_CONTAINER_Heap *heap;
+ /**
+ * Heap from the plugin for "closest" values.
+ */
+ struct GNUNET_CONTAINER_Heap *cheap;
+
};
*/
unsigned int path_info_len;
+ /**
+ * Am I the closest peer? Determines which heap we are in!
+ */
+ int am_closest;
+
/**
* Type of the block.
*/
*/
const char *data;
+ /**
+ * Heap from the plugin for "closest" values.
+ */
+ struct GNUNET_CONTAINER_Heap *cheap;
+
/**
* Heap from the plugin.
*/
if ( (val->size == put_ctx->size) &&
(val->type == put_ctx->type) &&
- (0 == memcmp (&val[1], put_ctx->data, put_ctx->size)) )
+ (0 == memcmp (&val[1],
+ put_ctx->data,
+ put_ctx->size)) )
{
put_ctx->found = GNUNET_YES;
val->discard_time = GNUNET_TIME_absolute_max (val->discard_time,
*
* @param cls closure (our `struct Plugin`)
* @param key key to store data under
+ * @param am_closest are we the closest peer?
* @param size number of bytes in @a data
* @param data data to store
* @param type type of the value
static ssize_t
heap_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t size,
const char *data,
enum GNUNET_BLOCK_Type type,
put_ctx.found = GNUNET_NO;
put_ctx.heap = plugin->heap;
+ put_ctx.cheap = plugin->cheap;
put_ctx.data = data;
put_ctx.size = size;
put_ctx.path_info = path_info;
val->type = type;
val->discard_time = discard_time;
val->size = size;
+ val->am_closest = am_closest;
GNUNET_array_grow (val->path_info,
val->path_info_len,
path_info_len);
GNUNET_memcpy (val->path_info,
- path_info,
- path_info_len * sizeof (struct GNUNET_PeerIdentity));
+ path_info,
+ path_info_len * sizeof (struct GNUNET_PeerIdentity));
(void) GNUNET_CONTAINER_multihashmap_put (plugin->map,
&val->key,
val,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- val->hn = GNUNET_CONTAINER_heap_insert (plugin->heap,
+ val->hn = GNUNET_CONTAINER_heap_insert (am_closest
+ ? plugin->cheap
+ : plugin->heap,
val,
val->discard_time.abs_value_us);
return size + OVERHEAD;
struct Value *val;
val = GNUNET_CONTAINER_heap_remove_root (plugin->heap);
+ if (NULL == val)
+ val = GNUNET_CONTAINER_heap_remove_root (plugin->cheap);
if (NULL == val)
return GNUNET_SYSERR;
GNUNET_assert (GNUNET_YES ==
}
+/**
+ * Closure for #find_closest().
+ */
+struct GetClosestContext
+{
+ struct Value **values;
+
+ unsigned int num_results;
+
+ const struct GNUNET_HashCode *key;
+};
+
+
+static int
+find_closest (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GetClosestContext *gcc = cls;
+ struct Value *val = value;
+ unsigned int j;
+
+ if (1 != GNUNET_CRYPTO_hash_cmp (key,
+ gcc->key))
+ return GNUNET_OK; /* useless */
+ j = gcc->num_results;
+ for (unsigned int i=0;i<gcc->num_results;i++)
+ {
+ if (NULL == gcc->values[i])
+ {
+ j = i;
+ break;
+ }
+ if (1 == GNUNET_CRYPTO_hash_cmp (&gcc->values[i]->key,
+ key))
+ {
+ j = i;
+ break;
+ }
+ }
+ if (j == gcc->num_results)
+ return GNUNET_OK;
+ gcc->values[j] = val;
+ return GNUNET_OK;
+}
+
+
/**
* Iterate over the results that are "close" to a particular key in
* the datacache. "close" is defined as numerically larger than @a
GNUNET_DATACACHE_Iterator iter,
void *iter_cls)
{
- GNUNET_break (0); // not implemented!
- return 0;
+ struct Plugin *plugin = cls;
+ struct Value *values[num_results];
+ struct GetClosestContext gcc = {
+ .values = values,
+ .num_results = num_results,
+ .key = key
+ };
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->map,
+ &find_closest,
+ &gcc);
+ for (unsigned int i=0;i<num_results;i++)
+ {
+ if (NULL == values[i])
+ return i;
+ iter (iter_cls,
+ &values[i]->key,
+ values[i]->size,
+ (void *) &values[i][1],
+ values[i]->type,
+ values[i]->discard_time,
+ values[i]->path_info_len,
+ values[i]->path_info);
+ }
+ return num_results;
}
plugin->map = GNUNET_CONTAINER_multihashmap_create (1024, /* FIXME: base on quota! */
GNUNET_YES);
plugin->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ plugin->cheap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
plugin->env = env;
api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
api->cls = plugin;
GNUNET_free_non_null (val->path_info);
GNUNET_free (val);
}
+ while (NULL != (val = GNUNET_CONTAINER_heap_remove_root (plugin->cheap)))
+ {
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (plugin->map,
+ &val->key,
+ val));
+ GNUNET_free_non_null (val->path_info);
+ GNUNET_free (val);
+ }
GNUNET_CONTAINER_heap_destroy (plugin->heap);
+ GNUNET_CONTAINER_heap_destroy (plugin->cheap);
GNUNET_CONTAINER_multihashmap_destroy (plugin->map);
GNUNET_free (plugin);
GNUNET_free (api);
*
* @param cls closure (our `struct Plugin`)
* @param key key to store @a data under
+ * @param am_closest are we the closest peer?
* @param data_size number of bytes in @a data
* @param data data to store
* @param type type of the value
static ssize_t
postgres_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t data_size,
const char *data,
enum GNUNET_BLOCK_Type type,
* How much overhead do we assume per entry in the
* datacache?
*/
-#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 32)
+#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 36)
/**
* Context for all functions in this plugin.
*
* @param cls closure (our `struct Plugin`)
* @param key key to store @a data under
+ * @param am_closest are we the closest peer?
* @param size number of bytes in @a data
* @param data data to store
* @param type type of the value
static ssize_t
sqlite_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t size,
const char *data,
enum GNUNET_BLOCK_Type type,
{
struct Plugin *plugin = cls;
uint32_t type32 = type;
+ uint32_t prox = am_closest;
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_uint32 (&type32),
GNUNET_SQ_query_param_absolute_time (&discard_time),
GNUNET_SQ_query_param_auto_from_type (key),
+ GNUNET_SQ_query_param_uint32 (&prox),
GNUNET_SQ_query_param_fixed_size (data, size),
GNUNET_SQ_query_param_fixed_size (path_info,
path_info_len * sizeof (struct GNUNET_PeerIdentity)),
uint64_t rowid;
void *data;
size_t dsize;
+ uint32_t prox;
struct GNUNET_HashCode hc;
struct GNUNET_SQ_ResultSpec rs[] = {
GNUNET_SQ_result_spec_uint64 (&rowid),
GNUNET_SQ_query_param_uint64 (&rowid),
GNUNET_SQ_query_param_end
};
+ struct GNUNET_SQ_QueryParam prox_params[] = {
+ GNUNET_SQ_query_param_uint32 (&prox),
+ GNUNET_SQ_query_param_end
+ };
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Processing DEL\n");
+ prox = GNUNET_NO;
+ again:
+ if (GNUNET_OK !=
+ GNUNET_SQ_bind (plugin->del_select_stmt,
+ prox_params))
+ {
+ LOG_SQLITE (plugin->dbh,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ GNUNET_SQ_reset (plugin->dbh,
+ plugin->del_stmt);
+ return GNUNET_SYSERR;
+ }
if (SQLITE_ROW !=
sqlite3_step (plugin->del_select_stmt))
{
"sqlite3_step");
GNUNET_SQ_reset (plugin->dbh,
plugin->del_select_stmt);
+ if (GNUNET_NO == prox)
+ {
+ prox = GNUNET_YES;
+ goto again;
+ }
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
GNUNET_SQ_extract_result (plugin->del_select_stmt,
rs))
{
- GNUNET_break (0);
GNUNET_SQ_reset (plugin->dbh,
plugin->del_select_stmt);
+ if (GNUNET_NO == prox)
+ {
+ prox = GNUNET_YES;
+ goto again;
+ }
+ GNUNET_break (0);
return GNUNET_SYSERR;
}
GNUNET_SQ_cleanup_result (rs);
SQLITE3_EXEC (dbh, "PRAGMA sqlite_temp_store=3");
SQLITE3_EXEC (dbh,
- "CREATE TABLE ds090 (" " type INTEGER NOT NULL DEFAULT 0,"
- " expire INTEGER NOT NULL DEFAULT 0,"
+ "CREATE TABLE ds091 (" " type INTEGER NOT NULL DEFAULT 0,"
+ " expire INTEGER NOT NULL,"
" key BLOB NOT NULL DEFAULT '',"
- " value BLOB NOT NULL DEFAULT '',"
+ " prox INTEGER NOT NULL,"
+ " value BLOB NOT NULL,"
" path BLOB DEFAULT '')");
SQLITE3_EXEC (dbh, "CREATE INDEX idx_hashidx ON ds090 (key,type,expire)");
- SQLITE3_EXEC (dbh, "CREATE INDEX idx_expire ON ds090 (expire)");
+ SQLITE3_EXEC (dbh, "CREATE INDEX idx_expire ON ds090 (prox,expire)");
plugin = GNUNET_new (struct Plugin);
plugin->env = env;
plugin->dbh = dbh;
if ( (SQLITE_OK !=
sq_prepare (plugin->dbh,
- "INSERT INTO ds090 (type, expire, key, value, path) "
- "VALUES (?, ?, ?, ?, ?)",
+ "INSERT INTO ds091 (type, expire, key, prox, value, path) "
+ "VALUES (?, ?, ?, ?, ?, ?)",
&plugin->insert_stmt)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
- "SELECT count(*) FROM ds090 "
+ "SELECT count(*) FROM ds091 "
"WHERE key=? AND type=? AND expire >= ?",
&plugin->get_count_stmt)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
- "SELECT value,expire,path FROM ds090 "
+ "SELECT value,expire,path FROM ds091 "
"WHERE key=? AND type=? AND expire >= ? LIMIT 1 OFFSET ?",
&plugin->get_stmt)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
- "SELECT _ROWID_,key,value FROM ds090 ORDER BY expire ASC LIMIT 1",
+ "SELECT _ROWID_,key,value FROM ds091 WHERE prox=? ORDER BY expire ASC LIMIT 1",
&plugin->del_select_stmt)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
- "DELETE FROM ds090 WHERE _ROWID_=?",
+ "DELETE FROM ds091 WHERE _ROWID_=?",
&plugin->del_stmt)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
- "SELECT value,expire,path,key,type FROM ds090 "
+ "SELECT value,expire,path,key,type FROM ds091 "
"ORDER BY key LIMIT 1 OFFSET ?",
&plugin->get_random_stmt)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
- "SELECT value,expire,path,type,key FROM ds090 "
+ "SELECT value,expire,path,type,key FROM ds091 "
"WHERE key>=? AND expire >= ? ORDER BY KEY ASC LIMIT ?",
&plugin->get_closest_stmt))
)
*
* @param cls closure (our `struct Plugin`)
* @param key key to store @a data under
+ * @param am_closest are we the closest peer?
* @param size number of bytes in @a data
* @param data data to store
* @param type type of the value
static ssize_t
template_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t size,
const char *data,
enum GNUNET_BLOCK_Type type,
{
GNUNET_CRYPTO_hash (&k, sizeof (struct GNUNET_HashCode), &n);
ASSERT (GNUNET_OK ==
- GNUNET_DATACACHE_put (h, &k, sizeof (struct GNUNET_HashCode),
+ GNUNET_DATACACHE_put (h,
+ &k,
+ GNUNET_YES,
+ sizeof (struct GNUNET_HashCode),
(const char *) &n, 1 + i % 16, exp,
0, NULL));
k = n;
memset (&k, 42, sizeof (struct GNUNET_HashCode));
GNUNET_CRYPTO_hash (&k, sizeof (struct GNUNET_HashCode), &n);
ASSERT (GNUNET_OK ==
- GNUNET_DATACACHE_put (h, &k, sizeof (struct GNUNET_HashCode),
+ GNUNET_DATACACHE_put (h,
+ &k,
+ GNUNET_YES,
+ sizeof (struct GNUNET_HashCode),
(const char *) &n, 792,
GNUNET_TIME_UNIT_FOREVER_ABS,
0, NULL));
{
exp.abs_value_us++;
buf[j] = i;
- ASSERT (GNUNET_OK == GNUNET_DATACACHE_put (h, &k, j, buf, 1 + i, exp, 0, NULL));
+ ASSERT (GNUNET_OK == GNUNET_DATACACHE_put (h,
+ &k,
+ GNUNET_YES,
+ j,
+ buf,
+ 1 + i,
+ exp,
+ 0,
+ NULL));
ASSERT (0 < GNUNET_DATACACHE_get (h, &k, 1 + i, NULL, NULL));
}
k = n;
const void *data)
{
FPRINTF (stdout,
- _("Result %d, type %d:\n%.*s\n"),
+ (GNUNET_BLOCK_TYPE_TEST == type)
+ ? _("Result %d, type %d:\n%.*s\n")
+ : _("Result %d, type %d:\n"),
result_count,
type,
(unsigned int) size,
{
struct GNUNET_HashCode key;
-
-
cfg = c;
if (NULL == query_key)
{
query_type = GNUNET_BLOCK_TYPE_TEST;
GNUNET_CRYPTO_hash (query_key, strlen (query_key), &key);
if (verbose)
- FPRINTF (stderr, "%s `%s' \n", _("Issueing DHT GET with key"), GNUNET_h2s_full (&key));
+ FPRINTF (stderr, "%s `%s' \n",
+ _("Issueing DHT GET with key"),
+ GNUNET_h2s_full (&key));
GNUNET_SCHEDULER_add_shutdown (&cleanup_task, NULL);
tt = GNUNET_SCHEDULER_add_delayed (timeout_request,
- &timeout_task, NULL);
+ &timeout_task,
+ NULL);
get_handle =
GNUNET_DHT_get_start (dht_handle, query_type, &key, replication,
(demultixplex_everywhere) ? GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE : GNUNET_DHT_RO_NONE,
- NULL, 0, &get_result_iterator, NULL);
-
+ NULL, 0,
+ &get_result_iterator,
+ NULL);
}
+
/**
* Entry point for gnunet-dht-get
*
int
main (int argc, char *const *argv)
{
-
struct GNUNET_GETOPT_CommandLineOption options[] = {
-
GNUNET_GETOPT_option_string ('k',
"key",
"KEY",
gettext_noop ("the query key"),
&query_key),
-
GNUNET_GETOPT_option_uint ('r',
"replication",
"LEVEL",
const struct GNUNET_PeerIdentity *path,
const struct GNUNET_HashCode * key)
{
- FPRINTF (stdout, "GET #%u: type %d, key `%s'\n",
+ FPRINTF (stdout,
+ "GET #%u: type %d, key `%s'\n",
result_count,
(int) type,
GNUNET_h2s_full(key));
size_t size)
{
FPRINTF (stdout,
- "RESPONSE #%u: type %d, key `%s', data `%.*s'\n",
+ (GNUNET_BLOCK_TYPE_TEST == type)
+ ? "RESPONSE #%u (%s): type %d, key `%s', data `%.*s'\n"
+ : "RESPONSE #%u (%s): type %d, key `%s'\n",
result_count,
+ GNUNET_STRINGS_absolute_time_to_string (exp),
(int) type,
GNUNET_h2s_full (key),
(unsigned int) size,
size_t size)
{
FPRINTF (stdout,
- "PUT %u: type %d, key `%s', data `%.*s'\n",
+ (GNUNET_BLOCK_TYPE_TEST == type)
+ ? "PUT %u (%s): type %d, key `%s', data `%.*s'\n"
+ : "PUT %u (%s): type %d, key `%s'\n",
result_count,
+ GNUNET_STRINGS_absolute_time_to_string (exp),
(int) type,
GNUNET_h2s_full(key),
(unsigned int) size,
* @param c configuration
*/
static void
-run (void *cls, char *const *args, const char *cfgfile,
+run (void *cls,
+ char *const *args,
+ const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
struct GNUNET_HashCode *key;
main (int argc, char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
-
+
GNUNET_GETOPT_option_string ('k',
"key",
"KEY",
gettext_noop ("the query key"),
&query_key),
-
+
GNUNET_GETOPT_option_uint ('t',
"type",
"TYPE",
gettext_noop ("the type of data to look for"),
&block_type),
-
+
GNUNET_GETOPT_option_relative_time ('T',
"timeout",
"TIMEOUT",
gettext_noop ("how long should the monitor command run"),
&timeout_request),
-
+
GNUNET_GETOPT_option_flag ('V',
"verbose",
gettext_noop ("be verbose (print progress information)"),
&verbose),
-
+
GNUNET_GETOPT_OPTION_END
};
#include "platform.h"
#include "gnunet_datacache_lib.h"
#include "gnunet-service-dht_datacache.h"
+#include "gnunet-service-dht_neighbours.h"
#include "gnunet-service-dht_routing.h"
#include "gnunet-service-dht.h"
}
/* Put size is actual data size plus struct overhead plus path length (if any) */
GNUNET_STATISTICS_update (GDS_stats,
- gettext_noop ("# ITEMS stored in datacache"), 1,
+ gettext_noop ("# ITEMS stored in datacache"),
+ 1,
GNUNET_NO);
r = GNUNET_DATACACHE_put (datacache,
key,
+ GDS_am_closest_peer (key,
+ NULL),
data_size,
data,
type,
* Find the optimal bucket for this key.
*
* @param hc the hashcode to compare our identity to
- * @return the proper bucket index, or GNUNET_SYSERR
+ * @return the proper bucket index, or #GNUNET_SYSERR
* on error (same hashcode)
*/
static int
* @return #GNUNET_YES if node location is closest,
* #GNUNET_NO otherwise.
*/
-static int
-am_closest_peer (const struct GNUNET_HashCode *key,
- const struct GNUNET_CONTAINER_BloomFilter *bloom)
+int
+GDS_am_closest_peer (const struct GNUNET_HashCode *key,
+ const struct GNUNET_CONTAINER_BloomFilter *bloom)
{
int bits;
int other_bits;
payload);
/* store locally */
if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
- (am_closest_peer (&put->key, bf)))
+ (GDS_am_closest_peer (&put->key, bf)))
GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
&put->key,
putlen,
(unsigned int) ntohl (get->hop_count));
/* local lookup (this may update the reply_bf) */
if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
- (am_closest_peer (&get->key,
+ (GDS_am_closest_peer (&get->key,
peer_bf)))
{
if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
size_t data_size);
+/**
+ * Check whether my identity is closer than any known peers. If a
+ * non-null bloomfilter is given, check if this is the closest peer
+ * that hasn't already been routed to.
+ *
+ * @param key hash code to check closeness to
+ * @param bloom bloomfilter, exclude these entries from the decision
+ * @return #GNUNET_YES if node location is closest,
+ * #GNUNET_NO otherwise.
+ */
+int
+GDS_am_closest_peer (const struct GNUNET_HashCode *key,
+ const struct GNUNET_CONTAINER_BloomFilter *bloom);
+
+
+
/**
* Initialize neighbours subsystem.
*
active_cnt);
req->lr = GNUNET_GNS_lookup_with_tld (gns,
req->hostname,
- GNUNET_GNSRECORD_TYPE_ANY,
+ GNUNET_GNSRECORD_TYPE_GNS2DNS,
GNUNET_GNS_LO_DEFAULT,
&process_result,
req);
/**
* DHT replication level
*/
-#define DHT_GNS_REPLICATION_LEVEL 5
+#define DHT_GNS_REPLICATION_LEVEL 10
/**
* How deep do we allow recursions to go before we abort?
*
* @param h handle to the datacache
* @param key key to store data under
+ * @param am_closest am I the closest peer?
* @param data_size number of bytes in @a data
* @param data data to store
* @param type type of the value
int
GNUNET_DATACACHE_put (struct GNUNET_DATACACHE_Handle *h,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t data_size,
const char *data,
enum GNUNET_BLOCK_Type type,
*
* @param cls closure (internal context for the plugin)
* @param key key to store the value under
+ * @param am_closest are we the closest peer?
* @param size number of bytes in @a data
* @param data data to store
* @param type type of the value
*/
ssize_t (*put) (void *cls,
const struct GNUNET_HashCode *key,
+ int am_closest,
size_t size,
const char *data,
enum GNUNET_BLOCK_Type type,