#define PRINT_TABLES GNUNET_NO
-#define REAL_DISTANCE GNUNET_YES
+#define REAL_DISTANCE GNUNET_NO
#define EXTRA_CHECKS GNUNET_NO
/**
*/
#define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
+/**
+ * Default replication parameter for find peer messages sent by the dht service.
+ */
+#define DHT_DEFAULT_PUT_REPLICATION 4
+
/**
* Default replication parameter for find peer messages sent by the dht service.
*/
GNUNET_SCHEDULER_TaskIdentifier remove_task;
};
+struct RepublishContext
+{
+ /**
+ * Key to republish.
+ */
+ GNUNET_HashCode key;
+
+ /**
+ * Type of the data.
+ */
+ unsigned int type;
+
+};
+
/**
* Which kind of convergence will we be using?
*/
* to closest peer; initially send requests to 3
* peers.
*/
-static int strict_kademlia;
+static unsigned int strict_kademlia;
/**
* Routing option to end routing when closest peer found.
*/
-static int stop_on_closest;
+static unsigned int stop_on_closest;
/**
* Routing option to end routing when data is found.
*/
-static int stop_on_found;
+static unsigned int stop_on_found;
/**
* Whether DHT needs to manage find peer requests, or
* an external force will do it on behalf of the DHT.
*/
-static int do_find_peer;
+static unsigned int do_find_peer;
+
+static unsigned int use_real_distance;
/**
* How many peers have we added since we sent out our last
* @param peer identifies the peer
* @param bpm_in set to the current bandwidth limit (receiving) for this peer
* @param bpm_out set to the current bandwidth limit (sending) for this peer
- * @param latency current latency estimate, "FOREVER" if we have been
- * disconnected
* @param amount set to the amount that was actually reserved or unreserved;
* either the full requested amount or zero (no partial reservations)
* @param preference current traffic preference for the given peer
*/
static void
update_core_preference_finish (void *cls,
- const struct
- GNUNET_PeerIdentity * peer,
+ const struct GNUNET_PeerIdentity * peer,
struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
- int amount,
- uint64_t preference)
+ int amount, uint64_t preference)
{
struct PeerInfo *peer_info = cls;
peer_info->info_ctx = NULL;
{
struct PeerInfo *peer = cls;
uint64_t preference;
-
+ unsigned int matching;
if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
{
return;
}
-
- preference = 2 << matching_bits(&my_identity.hashPubKey, &peer->id.hashPubKey);
+ matching = matching_bits(&my_identity.hashPubKey, &peer->id.hashPubKey);
+ if (matching >= 64)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer identifier matches by %u bits, only shifting as much as we can!\n", matching_bits);
+ matching = 63;
+ }
+ preference = 1LL << matching;
peer->info_ctx = GNUNET_CORE_peer_change_preference (sched, cfg,
&peer->id,
GNUNET_TIME_relative_get_forever(),
/**
* Attempt to add a peer to our k-buckets.
*
- * @param peer, the peer identity of the peer being added
+ * @param peer the peer identity of the peer being added
+ * @param bucket the bucket that we want this peer to go in
+ * @param latency transport latency of this peer
+ * @param distance transport distance to this peer
*
* @return NULL if the peer was not added,
* pointer to PeerInfo for new peer otherwise
* Main function that handles whether or not to route a result
* message to other peers, or to send to our local client.
*
+ * @param cls closure (unused, always should be NULL)
* @param msg the result message to be routed
+ * @param message_context context of the message we are routing
+ *
* @return the number of peers the message was routed to,
* GNUNET_SYSERR on failure
*/
*
*/
static void
-handle_dht_find_peer (void *cls,
+handle_dht_find_peer (void *cls,
const struct GNUNET_MessageHeader *find_msg,
struct DHT_MessageContext *message_context)
{
GNUNET_free(find_peer_result);
}
+/**
+ * Task used to republish data.
+ * Forward declaration; function call loop.
+ *
+ * @param cls closure (a struct RepublishContext)
+ * @param tc runtime context for this task
+ */
+static void
+republish_content(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
* Server handler for initiating local dht put requests
struct GNUNET_DHT_PutMessage *put_msg;
size_t put_type;
size_t data_size;
+ int ret;
+ struct RepublishContext *put_context;
GNUNET_assert (ntohs (msg->size) >=
sizeof (struct GNUNET_DHT_PutMessage));
increment_stats(STAT_PUTS_INSERTED);
if (datacache != NULL)
- GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
- (char *) &put_msg[1], put_type,
- GNUNET_TIME_absolute_ntoh(put_msg->expiration));
+ {
+ ret = GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
+ (char *) &put_msg[1], put_type,
+ GNUNET_TIME_absolute_ntoh(put_msg->expiration));
+
+ if (ret == GNUNET_YES)
+ {
+ put_context = GNUNET_malloc(sizeof(struct RepublishContext));
+ memcpy(&put_context->key, &message_context->key, sizeof(GNUNET_HashCode));
+ put_context->type = put_type;
+ GNUNET_SCHEDULER_add_delayed (sched, DHT_REPUBLISH_FREQUENCY, &republish_content, put_context);
+ }
+ }
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s:%s': %s request received, but have no datacache!\n",
* to a closer peer (if closer peers exist) or to choose
* from the whole set of peers.
*
+ * @param target the key of the request
+ * @param bloom bloomfilter of peers this request has already traversed
* @param hops number of hops this message has already traveled
+ *
+ * @return GNUNET_YES if we should try to route to a closer peer
+ * than ourselves (and one exists), GNUNET_NO if we should
+ * choose from the set of all known peers
+ *
*/
int
-route_closer (const GNUNET_HashCode *target, struct GNUNET_CONTAINER_BloomFilter *bloom,
+route_closer (const GNUNET_HashCode *target,
+ struct GNUNET_CONTAINER_BloomFilter *bloom,
unsigned int hops)
{
unsigned int my_matching_bits;
*
* @param target the key we are selecting a peer to route to
* @param bloom a bloomfilter containing entries this request has seen already
+ * @param hops the number of hops this message has already traversed
*
* @return Peer to route to, or NULL on error
*/
static struct PeerInfo *
select_peer (const GNUNET_HashCode * target,
- struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
+ struct GNUNET_CONTAINER_BloomFilter *bloom,
+ unsigned int hops)
{
unsigned int distance;
unsigned int bc;
unsigned int count;
unsigned int my_matching_bits;
unsigned long long largest_distance;
-#if REAL_DISTANCE
- unsigned long long total_distance;
- unsigned long long selected;
-#else
+ unsigned long long total_real_distance;
+ unsigned long long real_selected;
unsigned int total_distance;
unsigned int selected;
-#endif
-
+ unsigned int match_num;
int only_closer;
struct PeerInfo *pos;
struct PeerInfo *chosen;
char *temp_stat;
+#if DEBUG_DHT_ROUTING
+ double sum;
+#endif
my_matching_bits = matching_bits(target, &my_identity.hashPubKey);
only_closer = route_closer(target, bloom, hops);
if ((GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) &&
((only_closer == GNUNET_NO) || (matching_bits(target, &pos->id.hashPubKey) >= my_matching_bits)))
{
-#if REAL_DISTANCE /* Use the "real" distance as computed by the inverse_distance function */
- /** The "real" distance is best for routing to the closest peer, but in practice
- * (with our routing algorithm) it is usually better to use the squared bit distance.
- * This gives us a higher probability of routing towards close peers.
- */
- total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
-#else
- total_distance += matching_bits(target, &pos->id.hashPubKey) * matching_bits(target ,&pos->id.hashPubKey);
-#endif
+ if (GNUNET_YES == use_real_distance)
+ total_real_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
+ else
+ {
+ /* Always add 1, in case 0 bits match! */
+ match_num = 1 + (matching_bits(target, &pos->id.hashPubKey) * matching_bits(target ,&pos->id.hashPubKey));
+ total_distance += match_num;
+ }
}
#if DEBUG_DHT > 1
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
count++;
}
}
- if (total_distance == 0)
+
+ if (((GNUNET_YES == use_real_distance) && (total_real_distance == 0)) || (total_distance == 0))
{
increment_stats("# select_peer, total_distance == 0");
return NULL;
}
- selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
+#if DEBUG_DHT_ROUTING
for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
{
pos = k_buckets[bc].head;
if ((GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) &&
((only_closer == GNUNET_NO) || (matching_bits(target, &pos->id.hashPubKey) >= my_matching_bits)))
{
-#if REAL_DISTANCE
- distance = inverse_distance (target, &pos->id.hashPubKey);
-#else
- distance = matching_bits(target, &pos->id.hashPubKey) * matching_bits(target, &pos->id.hashPubKey);
+ if (GNUNET_YES == use_real_distance)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "REAL: Choose peer with %d matching bits (%.2f percent)\n", matching_bits(&pos->id.hashPubKey, target), (inverse_distance (target, &pos->id.hashPubKey) / (double)total_real_distance) * 100);
+ sum += inverse_distance (target, &pos->id.hashPubKey) / (double)total_real_distance;
+ }
+ else
+ {
+ match_num = 1 + (matching_bits(&pos->id.hashPubKey, target) * matching_bits(&pos->id.hashPubKey, target));
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Choose peer with %d matching bits (%.2f percent)\n", matching_bits(&pos->id.hashPubKey, target), (match_num / (double)total_distance) * 100);
+ sum += match_num / (double)total_distance;
+ }
+ }
+ pos = pos->next;
+ count++;
+ }
+ }
+ fprintf(stdout, "Sum is %f\n", sum);
#endif
- if (distance > selected)
+
+ real_selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total_real_distance);
+ selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
+
+ for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+ {
+ pos = k_buckets[bc].head;
+ count = 0;
+ while ((pos != NULL) && (count < bucket_size))
+ {
+ if ((GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) &&
+ ((only_closer == GNUNET_NO) || (matching_bits(target, &pos->id.hashPubKey) >= my_matching_bits)))
+ {
+ if (GNUNET_YES == use_real_distance)
+ {
+ distance = inverse_distance (target, &pos->id.hashPubKey);
+ if (distance > real_selected)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Selected peer with %u matching bits to route to\n", distance);
+ return pos;
+ }
+ real_selected -= distance;
+ }
+ else
{
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Selected peer with %u matching bits to route to\n", distance);
- return pos;
+ distance = 1 + (matching_bits(target, &pos->id.hashPubKey) * matching_bits(target, &pos->id.hashPubKey));
+ if (distance > selected)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Selected peer with %u matching bits to route to\n", distance);
+ return pos;
+ }
+ selected -= distance;
}
- selected -= distance;
}
else
{
return forward_count;
}
+/**
+ * Iterator for local get request results,
+ *
+ * @param cls closure for iterator, NULL
+ * @param exp when does this value expire?
+ * @param key the key this data is stored under
+ * @param size the size of the data identified by key
+ * @param data the actual data
+ * @param type the type of the data
+ *
+ * @return GNUNET_OK to continue iteration, anything else
+ * to stop iteration.
+ */
+static int
+republish_content_iterator (void *cls,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ uint32_t size, const char *data, uint32_t type)
+{
+
+ struct DHT_MessageContext *new_msg_ctx;
+ struct GNUNET_DHT_PutMessage *put_msg;
+#if DEBUG_DHT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
+#endif
+ new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
+
+ put_msg =
+ GNUNET_malloc (sizeof (struct GNUNET_DHT_PutMessage) + size);
+ put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
+ put_msg->header.size = htons (sizeof (struct GNUNET_DHT_PutMessage) + size);
+ put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
+ put_msg->type = htons (type);
+ memcpy (&put_msg[1], data, size);
+ new_msg_ctx->unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
+ new_msg_ctx->replication = ntohl (DHT_DEFAULT_PUT_REPLICATION);
+ new_msg_ctx->msg_options = ntohl (0);
+ new_msg_ctx->network_size = estimate_diameter();
+ new_msg_ctx->peer = &my_identity;
+ new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
+ new_msg_ctx->hop_count = 0;
+ new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
+ new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
+ increment_stats(STAT_PUT_START);
+ route_message(cls, &put_msg->header, new_msg_ctx);
+
+ GNUNET_free(new_msg_ctx);
+ GNUNET_free (put_msg);
+ return GNUNET_OK;
+}
+
+/**
+ * Task used to republish data.
+ *
+ * @param cls closure (a struct RepublishContext)
+ * @param tc runtime context for this task
+ */
+static void
+republish_content(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct RepublishContext *put_context = cls;
+
+ unsigned int results;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+ {
+ GNUNET_free(put_context);
+ return;
+ }
+
+ GNUNET_assert (datacache != NULL); /* If we have no datacache we never should have scheduled this! */
+ results = GNUNET_DATACACHE_get(datacache, &put_context->key, put_context->type, &republish_content_iterator, NULL);
+ if (results == 0) /* Data must have expired */
+ GNUNET_free(put_context);
+ else /* Reschedule task for next time period */
+ GNUNET_SCHEDULER_add_delayed(sched, DHT_REPUBLISH_FREQUENCY, &republish_content, put_context);
+
+}
+
/**
* Find a client if it exists, add it otherwise.
*
else
do_find_peer = GNUNET_YES;
+ if (GNUNET_YES ==
+ GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+ "use_real_distance"))
+ use_real_distance = GNUNET_YES;
+
if (GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
"mysql_logging_extended"))
#include "gnunet_protocols.h"
#include "dhtlog.h"
-#define VERBOSE GNUNET_YES
+#define VERBOSE GNUNET_NO
static int ok;
struct GNUNET_PeerIdentity p2;
struct GNUNET_PeerIdentity p3;
struct GNUNET_PeerIdentity p4;
-
+ struct GNUNET_DHTLOG_TrialInfo trial_info;
GNUNET_HashCode k1;
GNUNET_HashCode k2;
-
int ret;
unsigned int i = 42;
- unsigned long long trialuid;
unsigned long long sqlqueryuid;
unsigned long long sqlrouteuid = 0;
unsigned long long nodeuid = 0;
memset (&k1, 0, sizeof (GNUNET_HashCode));
memset (&k2, 1, sizeof (GNUNET_HashCode));
-
+ memset(&trial_info, 0, sizeof(struct GNUNET_DHTLOG_TrialInfo));
+ trial_info.other_identifier = 777;
+ trial_info.num_nodes = i;
+ trial_info.topology = 5;
+ trial_info.blacklist_topology = 4;
+ trial_info.connect_topology = 3;
+ trial_info.connect_topology_option = 2;
+ trial_info.connect_topology_option_modifier = .75;
+ trial_info.topology_percentage = .25;
+ trial_info.topology_probability = .5;
+ trial_info.puts = 42;
+ trial_info.gets = 14;
+ trial_info.concurrent = 5;
+ trial_info.settle_time = 1;
+ trial_info.num_rounds = 12;
+ trial_info.malicious_getters = 0;
+ trial_info.malicious_putters = 0;
+ trial_info.malicious_droppers = 0;
+ trial_info.malicious_get_frequency = 1;
+ trial_info.malicious_put_frequency = 0;
+ trial_info.stop_closest = 1;
+ trial_info.stop_found = 0;
+ trial_info.strict_kademlia = 1;
+ trial_info.message = GNUNET_strdup("TEST INSERT_TRIAL");
ret =
- api->insert_trial (&trialuid, i, 5, 4, 3, 2,
- .75, .25, .5, 42, 14,
- 5, 1, 12, 0, 0, 0, 1, 0, 1,
- 0, 1, 0, "TEST INSERT TRIAL");
+ api->insert_trial (&trial_info);
+ GNUNET_free(trial_info.message);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert trial succeeded!\n");
+#endif
ret = api->insert_topology(500);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert topology succeeded!\n");
+#endif
ret = api->insert_node (&nodeuid, &p1);
CHECK(ret);
ret = api->insert_node (&nodeuid, &p2);
CHECK(ret);
ret = api->insert_node (&nodeuid, &p4);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert node succeeded!\n");
+#endif
ret = api->set_malicious(&p1);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Set malicious succeeded!\n");
+#endif
ret = api->insert_topology(0);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert topology succeeded!\n");
+#endif
ret = api->insert_extended_topology(&p1, &p2);
CHECK(ret);
ret = api->insert_extended_topology(&p3, &p4);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert extended topology succeeded!\n");
+#endif
ret = api->update_topology(101);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Update topology succeeded!\n");
+#endif
ret = api->insert_dhtkey (&dhtkeyuid, &k1);
CHECK(ret);
ret = api->insert_dhtkey (&dhtkeyuid, &k2);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert dhtkey succeeded!\n");
+#endif
ret = api->insert_query (&sqlqueryuid, internaluid, 2, 4, 0, &p2, &k1);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert query succeeded!\n");
+#endif
ret =
api->insert_route (&sqlrouteuid, sqlqueryuid, 1, 1, DHTLOG_GET, &p1, &k2,
&p4, &p3);
api->insert_route (&sqlrouteuid, sqlqueryuid, 4, 7, DHTLOG_ROUTE, &p3, &k2,
NULL, NULL);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert route succeeded!\n");
+#endif
sleep (1);
ret = api->insert_stat(&p1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17);
CHECK(ret);
ret = api->insert_stat(&p2, 12, 23, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
CHECK(ret);
- ret = api->update_trial (trialuid, 787);
+#if VERBOSE
+ fprintf(stderr, "Insert stat succeeded!\n");
+#endif
+ ret = api->update_trial (trial_info.trialuid, 787);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Update trial succeeded!\n");
+#endif
ret = api->add_generic_stat (&p2, "nonsense", "section", 77765);
CHECK(ret);
+#if VERBOSE
+ fprintf(stderr, "Insert generic stat succeeded!\n");
+#endif
return 0;
}