From 797a58626759227befd9b055f01d2505216e2ccb Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Thu, 16 Sep 2010 14:13:57 +0000 Subject: [PATCH] many useless changes to the dht testing driver, mostly for churn (which may not even be useful) --- src/dht/gnunet-dht-driver.c | 1222 +++++++++++++++++++++++++++-------- 1 file changed, 937 insertions(+), 285 deletions(-) diff --git a/src/dht/gnunet-dht-driver.c b/src/dht/gnunet-dht-driver.c index 07f1fbb04..cdd5e7c16 100644 --- a/src/dht/gnunet-dht-driver.c +++ b/src/dht/gnunet-dht-driver.c @@ -59,7 +59,7 @@ #define DEFAULT_BUCKET_SIZE 4 -#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2 +#define FIND_PEER_THRESHOLD 1 /* If more than this many peers are added, slow down sending */ #define MAX_FIND_PEER_CUTOFF 2500 @@ -79,6 +79,8 @@ #define DEFAULT_TOPOLOGY_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8) +#define DEFAULT_RECONNECT_ATTEMPTS 8 + /* * Default frequency for sending malicious get messages */ @@ -259,6 +261,104 @@ struct TopologyIteratorContext struct GNUNET_TIME_Relative timeout; }; + +struct PeerCount +{ + /** Node in the heap */ + struct GNUNET_CONTAINER_HeapNode *heap_node; + + /** Peer the count refers to */ + struct GNUNET_PeerIdentity peer_id; + + /** Count of connections this peer has */ + unsigned int count; +}; + +/** + * Context for sending out find peer requests. + */ +struct FindPeerContext +{ + /** + * How long to send find peer requests, once the settle time + * is over don't send any more out! + * + * TODO: Add option for settle time and find peer sending time? + */ + struct GNUNET_TIME_Absolute endtime; + + /** + * Number of connections in the current topology + * (after this round of find peer requests has ended). + */ + unsigned int current_peers; + + /** + * Number of connections in the current topology + * (before this round of find peer requests started). + */ + unsigned int previous_peers; + + /** + * Number of find peer requests we have currently + * outstanding. + */ + unsigned int outstanding; + + /** + * Number of find peer requests to send in this round. + */ + unsigned int total; + + /** + * Number of find peer requests sent last time around. + */ + unsigned int last_sent; + + /** + * Hashmap of peers in the current topology, value + * is a PeerCount, with the number of connections + * this peer has. + */ + struct GNUNET_CONTAINER_MultiHashMap *peer_hash; + + /** + * Min heap which orders values in the peer_hash for + * easy lookup. + */ + struct GNUNET_CONTAINER_Heap *peer_min_heap; + + /** + * Callback for counting the peers in the current topology. + */ + GNUNET_TESTING_NotifyTopology count_peers_cb; +}; + +enum DHT_ROUND_TYPES +{ + /** + * Next full round (puts + gets). + */ + DHT_ROUND_NORMAL, + + /** + * Next round of gets. + */ + DHT_ROUND_GET, + + /** + * Next round of puts. + */ + DHT_ROUND_PUT, + + /** + * Next round of churn. + */ + DHT_ROUND_CHURN +}; + + + /* Globals */ /** @@ -281,7 +381,9 @@ static struct GNUNET_TIME_Relative find_peer_offset; static struct GNUNET_TIME_Relative seconds_per_peer_start; -static int do_find_peer; +static unsigned int do_find_peer; + +static unsigned int in_dht_replication; static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE; @@ -295,6 +397,8 @@ static unsigned long long max_outstanding_find_peers; static unsigned long long malicious_putters; +static unsigned long long round_delay; + static unsigned long long malicious_droppers; static unsigned long long malicious_get_frequency; @@ -309,6 +413,42 @@ static struct GNUNET_DHTLOG_Handle *dhtlog_handle; static unsigned long long trialuid; +/** + * If GNUNET_YES, insert data at the same peers every time. + * Otherwise, choose a new random peer to insert at each time. + */ +static unsigned int replicate_same; + +/** + * Number of rounds for testing (PUTS + GETS) + */ +static unsigned long long total_rounds; + +/** + * Number of rounds already run + */ +static unsigned int rounds_finished; + +/** + * Number of rounds of churn to read from the file (first line, should be a single number). + */ +static unsigned int churn_rounds; + +/** + * Current round we are in for churn, tells us how many peers to connect/disconnect. + */ +static unsigned int current_churn_round; + +/** + * Number of times to churn per round + */ +static unsigned long long churns_per_round; + +/** + * Array of churn values. + */ +static unsigned int *churn_array; + /** * Hash map of stats contexts. */ @@ -449,6 +589,8 @@ static struct ProgressMeter *put_meter; static struct ProgressMeter *get_meter; +static GNUNET_HashCode *known_keys; + /* Global return value (0 for success, anything else for failure) */ static int ok; @@ -515,6 +657,24 @@ update_meter(struct ProgressMeter *meter) return GNUNET_NO; } +/** + * Reset progress meter. + * + * @param meter the meter to reset + * + * @return GNUNET_YES if meter reset, + * GNUNET_SYSERR on error + */ +static int +reset_meter(struct ProgressMeter *meter) +{ + if (meter == NULL) + return GNUNET_SYSERR; + + meter->completed = 0; + return GNUNET_YES; +} + /** * Release resources for meter * @@ -550,6 +710,7 @@ put_disconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) test_put->disconnect_task = GNUNET_SCHEDULER_NO_TASK; GNUNET_DHT_disconnect(test_put->dht_handle); test_put->dht_handle = NULL; + test_put->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers)); } /** @@ -825,6 +986,475 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) ok = 1; } +/** + * Forward declaration. + */ +static void +do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc); + +/** + * Forward declaration. + */ +static void +do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc); + +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int remove_peer_count (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct FindPeerContext *find_peer_ctx = cls; + struct PeerCount *peer_count = value; + GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, peer_count->heap_node); + GNUNET_free(peer_count); + + return GNUNET_YES; +} + +/** + * Connect to all peers in the peer group and iterate over their + * connections. + */ +static void +count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct FindPeerContext *find_peer_context = cls; + find_peer_context->previous_peers = find_peer_context->current_peers; + find_peer_context->current_peers = 0; + GNUNET_TESTING_get_topology (pg, find_peer_context->count_peers_cb, find_peer_context); +} + +static void +decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct TestFindPeer *test_find_peer = cls; + GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0); + test_find_peer->find_peer_context->outstanding--; + test_find_peer->find_peer_context->total--; + if ((0 == test_find_peer->find_peer_context->total) && + (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value > 0)) + { + GNUNET_SCHEDULER_add_now(sched, &count_new_peers, test_find_peer->find_peer_context); + } + GNUNET_free(test_find_peer); +} + +/** + * A find peer request has been sent to the server, now we will schedule a task + * to wait the appropriate time to allow the request to go out and back. + * + * @param cls closure - a TestFindPeer struct + * @param tc context the task is being called with + */ +static void +handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct TestFindPeer *test_find_peer = cls; + + GNUNET_DHT_disconnect(test_find_peer->dht_handle); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, test_find_peer); +} + + +static void +send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct TestFindPeer *test_find_peer = cls; + + if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers) + { + GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, &send_find_peer_request, test_find_peer); + return; + } + + test_find_peer->find_peer_context->outstanding++; + if (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value == 0) + { + GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer); + return; + } + + test_find_peer->dht_handle = GNUNET_DHT_connect(sched, test_find_peer->daemon->cfg, 1); + GNUNET_assert(test_find_peer->dht_handle != NULL); + GNUNET_DHT_find_peers (test_find_peer->dht_handle, + &handle_find_peer_sent, test_find_peer); +} + +/** + * Set up a single find peer request for each peer in the topology. Do this + * until the settle time is over, limited by the number of outstanding requests + * and the time allowed for each one! + */ +static void +schedule_churn_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct FindPeerContext *find_peer_ctx = cls; + struct TestFindPeer *test_find_peer; + struct PeerCount *peer_count; + uint32_t i; + + if (find_peer_ctx->previous_peers == 0) /* First time, go slowly */ + find_peer_ctx->total = 1; + else if (find_peer_ctx->current_peers - find_peer_ctx->previous_peers > MAX_FIND_PEER_CUTOFF) /* Found LOTS of peers, still go slowly */ + find_peer_ctx->total = find_peer_ctx->last_sent - (find_peer_ctx->last_sent / 8); + else + find_peer_ctx->total = find_peer_ctx->last_sent * 2; + + if (find_peer_ctx->total > max_outstanding_find_peers) + find_peer_ctx->total = max_outstanding_find_peers; + + find_peer_ctx->last_sent = find_peer_ctx->total; + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (after churn)\n", find_peer_ctx->total); + + find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, find_peer_ctx->total); + for (i = 0; i < find_peer_ctx->total; i++) + { + test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer)); + /* If we have sent requests, choose peers with a low number of connections to send requests from */ + peer_count = GNUNET_CONTAINER_heap_remove_root(find_peer_ctx->peer_min_heap); + GNUNET_CONTAINER_multihashmap_remove(find_peer_ctx->peer_hash, &peer_count->peer_id.hashPubKey, peer_count); + test_find_peer->daemon = GNUNET_TESTING_daemon_get_by_id(pg, &peer_count->peer_id); + GNUNET_assert(test_find_peer->daemon != NULL); + test_find_peer->find_peer_context = find_peer_ctx; + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(find_peer_offset, i), &send_find_peer_request, test_find_peer); + } + + if ((find_peer_ctx->peer_hash == NULL) && (find_peer_ctx->peer_min_heap == NULL)) + { + find_peer_ctx->peer_hash = GNUNET_CONTAINER_multihashmap_create(num_peers); + find_peer_ctx->peer_min_heap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); + } + else + { + GNUNET_CONTAINER_multihashmap_iterate(find_peer_ctx->peer_hash, &remove_peer_count, find_peer_ctx); + GNUNET_CONTAINER_multihashmap_destroy(find_peer_ctx->peer_hash); + find_peer_ctx->peer_hash = GNUNET_CONTAINER_multihashmap_create(num_peers); + } + + GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(find_peer_ctx->peer_hash)); + GNUNET_assert(0 == GNUNET_CONTAINER_heap_get_size(find_peer_ctx->peer_min_heap)); +} + +/** + * Add a connection to the find_peer_context given. This may + * be complete overkill, but allows us to choose the peers with + * the least connections to initiate find peer requests from. + */ +static void add_new_connection(struct FindPeerContext *find_peer_context, + const struct GNUNET_PeerIdentity *first, + const struct GNUNET_PeerIdentity *second) +{ + struct PeerCount *first_count; + struct PeerCount *second_count; + + if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &first->hashPubKey)) + { + first_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &first->hashPubKey); + first_count->count++; + GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, first_count->heap_node, first_count->count); + } + else + { + first_count = GNUNET_malloc(sizeof(struct PeerCount)); + first_count->count = 1; + memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity)); + first_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, first_count->count); + GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &first->hashPubKey, first_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } + + if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &second->hashPubKey)) + { + second_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &second->hashPubKey); + second_count->count++; + GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, second_count->heap_node, second_count->count); + } + else + { + second_count = GNUNET_malloc(sizeof(struct PeerCount)); + second_count->count = 1; + memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity)); + second_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, second_count->count); + GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &second->hashPubKey, second_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } +} + + +/** + * Iterate over min heap of connections per peer. For any + * peer that has 0 connections, attempt to connect them to + * some random peer. + * + * @param cls closure a struct FindPeerContext + * @param node internal node of the heap + * @param element value stored, a struct PeerCount + * @param cost cost associated with the node + * @return GNUNET_YES if we should continue to iterate, + * GNUNET_NO if not. + */ +static int iterate_min_heap_peers (void *cls, + struct GNUNET_CONTAINER_HeapNode *node, + void *element, + GNUNET_CONTAINER_HeapCostType cost) +{ + struct FindPeerContext *find_peer_context = cls; + struct PeerCount *peer_count = element; + struct GNUNET_TESTING_Daemon *d1; + struct GNUNET_TESTING_Daemon *d2; + struct GNUNET_TIME_Relative timeout; + if (cost == 0) + { + d1 = GNUNET_TESTING_daemon_get_by_id (pg, &peer_count->peer_id); + d2 = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers)); + /** Just try to connect the peers, don't worry about callbacks, etc. **/ + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer %s has 0 connections. Trying to connect to %s...\n", GNUNET_i2s(&peer_count->peer_id), d2->shortname); + timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, DEFAULT_CONNECT_TIMEOUT); + if (GNUNET_TIME_relative_to_absolute(timeout).value > find_peer_context->endtime.value) + { + timeout = GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime); + } + GNUNET_TESTING_daemons_connect(d1, d2, timeout, DEFAULT_RECONNECT_ATTEMPTS, NULL, NULL); + } + if (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0) + return GNUNET_YES; + else + return GNUNET_NO; +} + +/** + * Callback for iterating over all the peer connections of a peer group. + * Used after we have churned on some peers to find which ones have zero + * connections so we can make them issue find peer requests. + */ +void count_peers_churn_cb (void *cls, + const struct GNUNET_PeerIdentity *first, + const struct GNUNET_PeerIdentity *second, + struct GNUNET_TIME_Relative latency, + uint32_t distance, + const char *emsg) +{ + struct FindPeerContext *find_peer_context = cls; + struct TopologyIteratorContext *topo_ctx; + struct PeerCount *peer_count; + + if ((first != NULL) && (second != NULL)) + { + add_new_connection(find_peer_context, first, second); + find_peer_context->current_peers++; + } + else + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections)\n", + find_peer_context->current_peers); + peer_count = GNUNET_CONTAINER_heap_peek(find_peer_context->peer_min_heap); + + /* WAIT. When peers are churned they will come back with their peers (at least in peerinfo), because the HOSTS file doesn't likely get removed. CRAP. */ + /* NO they won't, because we have disabled peerinfo writing to disk (remember?) so we WILL have to give them new connections */ + /* Best course of action: have DHT automatically try to add peers from peerinfo on startup. This way IF peerinfo writes to file + * then some peers will end up connected. + * + * Also, find any peers that have zero connections here and set up a task to choose at random another peer in the network to + * connect to. Of course, if they are blacklisted from that peer they won't be able to connect, so we will have to keep trying + * until they get a peer. + */ + /* However, they won't automatically be connected to any of their previous peers... How can we handle that? */ + /* So now we have choices: do we want them to come back with all their connections? Probably not, but it solves this mess. */ + + /* Second problem, which is still a problem, is that a FIND_PEER request won't work when a peer has no connections */ + + /** + * Okay, so here's how this *should* work now. + * + * 1. We check the min heap for any peers that have 0 connections. + * a. If any are found, we iterate over the heap and just randomly + * choose another peer and ask testing to please connect the two. + * This takes care of the case that a peer just randomly joins the + * network. However, if there are strict topology restrictions + * (imagine a ring) choosing randomly most likely won't help. + * We make sure the connection attempt doesn't take longer than + * the total timeout, but don't care too much about the result. + * b. After that, we still schedule the find peer requests (concurrently + * with the connect attempts most likely). This handles the case + * that the DHT iterates over peerinfo and just needs to try to send + * a message to get connected. This should handle the case that the + * topology is very strict. + * + * 2. If all peers have > 0 connections, we still send find peer requests + * as long as possible (until timeout is reached) to help out those + * peers that were newly churned and need more connections. This is because + * once all new peers have established a single connection, they won't be + * well connected. + * + * 3. Once we reach the timeout, we can do no more. We must schedule the + * next iteration of get requests regardless of connections that peers + * may or may not have. + * + * Caveat: it would be nice to get peers to take data offline with them and + * come back with it (or not) based on the testing framework. The + * same goes for remembering previous connections, but putting either + * into the general testing churn options seems like overkill because + * these are very specialized cases. + */ + if ((peer_count->count == 0) && (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Found peer with no connections, will choose some peers at random to connect to!\n"); + GNUNET_CONTAINER_heap_iterate (find_peer_context->peer_min_heap, &iterate_min_heap_peers, find_peer_context); + GNUNET_SCHEDULER_add_now(sched, &schedule_churn_find_peer_requests, find_peer_context); + } + else if (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0) + { + GNUNET_SCHEDULER_add_now(sched, &schedule_churn_find_peer_requests, find_peer_context); + } + else + { + GNUNET_CONTAINER_multihashmap_iterate(find_peer_context->peer_hash, &remove_peer_count, find_peer_context); + GNUNET_CONTAINER_multihashmap_destroy(find_peer_context->peer_hash); + GNUNET_CONTAINER_heap_destroy(find_peer_context->peer_min_heap); + GNUNET_free(find_peer_context); + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn round %u of %llu finished, scheduling next GET round.\n", current_churn_round, churn_rounds); + if (dhtlog_handle != NULL) + { + topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext)); + topo_ctx->cont = &do_get; + topo_ctx->cls = all_gets; + topo_ctx->timeout = DEFAULT_GET_TIMEOUT; + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT), + &end_badly, "from do gets (count_peers_churn_cb)"); + GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx); + } + else + { + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT), + &end_badly, "from do gets (count_peers_churn_cb)"); + GNUNET_SCHEDULER_add_now(sched, &do_get, all_gets); + } + } + } +} + +/** + * Called when churning of the topology has finished. + * + * @param cls closure unused + * @param emsg NULL on success, or a printable error on failure + */ +static void churn_complete (void *cls, const char *emsg) +{ + struct FindPeerContext *find_peer_context = cls; + struct PeerCount *peer_count; + unsigned int i; + struct GNUNET_TESTING_Daemon *temp_daemon; + struct TopologyIteratorContext *topo_ctx; + + if (emsg != NULL) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Ending test, churning of peers failed with error `%s'", emsg); + GNUNET_SCHEDULER_add_now(sched, &end_badly, (void *)emsg); + return; + } + + /** + * If we switched any peers on, we have to somehow force connect the new peer to + * SOME bootstrap peer in the network. First schedule a task to find all peers + * with no connections, then choose a random peer for each and connect them. + */ + if (find_peer_context != NULL) + { + for (i = 0; i < num_peers; i ++) + { + temp_daemon = GNUNET_TESTING_daemon_get(pg, i); + peer_count = GNUNET_malloc(sizeof(struct PeerCount)); + memcpy(&peer_count->peer_id, &temp_daemon->id, sizeof(struct GNUNET_PeerIdentity)); + peer_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, peer_count, peer_count->count); + GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &temp_daemon->id.hashPubKey, peer_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } + GNUNET_TESTING_get_topology (pg, &count_peers_churn_cb, find_peer_context); + } + else + { + if (dhtlog_handle != NULL) + { + topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext)); + topo_ctx->cont = &do_get; + topo_ctx->cls = all_gets; + topo_ctx->timeout = DEFAULT_GET_TIMEOUT; + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT), + &end_badly, "from do gets (churn_complete)"); + GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx); + } + else + { + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT), + &end_badly, "from do gets (churn_complete)"); + if (dhtlog_handle != NULL) + dhtlog_handle->insert_round(DHT_ROUND_GET, rounds_finished); + GNUNET_SCHEDULER_add_now(sched, &do_get, all_gets); + } + } +} + +/** + * Decide how many peers to turn on or off in this round, make sure the + * numbers actually make sense, then do so. This function sets in motion + * churn, find peer requests for newly joined peers, and issuing get + * requests once the new peers have done so. + * + * @param cls closure (unused) + * @param cls task context (unused) + */ +static void +churn_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + unsigned int count_running; + unsigned int churn_up; + unsigned int churn_down; + struct GNUNET_TIME_Relative timeout; + struct FindPeerContext *find_peer_context; + + churn_up = churn_down = 0; + count_running = GNUNET_TESTING_daemons_running(pg); + if (count_running > churn_array[current_churn_round]) + churn_down = count_running - churn_array[current_churn_round]; + else if (count_running < churn_array[current_churn_round]) + churn_up = churn_array[current_churn_round] - count_running; + else + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Not churning any peers, topology unchanged.\n"); + + if (churn_up > num_peers - count_running) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn file specified %u peers (up); only have %u!", churn_array[current_churn_round], num_peers); + churn_up = num_peers - count_running; + } + else if (churn_down > count_running) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn file specified %u peers (down); only have %u!", churn_array[current_churn_round], count_running); + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "This will leave NO peers running (mistake in churn configuration?)!"); + churn_down = count_running; + } + timeout = GNUNET_TIME_relative_multiply(seconds_per_peer_start, churn_up > 0 ? churn_up : churn_down); + + find_peer_context = NULL; + if (churn_up > 0) /* Only need to do find peer requests if we turned new peers on */ + { + find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext)); + find_peer_context->count_peers_cb = &count_peers_churn_cb; + find_peer_context->previous_peers = 0; + find_peer_context->current_peers = 0; + find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(timeout); + } + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "churn_peers: want %u total, %u running, starting %u, stopping %u\n", + churn_array[current_churn_round], count_running, churn_up, churn_down); + GNUNET_TESTING_daemons_churn(pg, churn_down, churn_up, timeout, &churn_complete, find_peer_context); +} + /** * Task to release DHT handle associated with GET request. */ @@ -837,22 +1467,78 @@ get_stop_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) GNUNET_DHT_disconnect(test_get->dht_handle); test_get->dht_handle = NULL; + /* Reset the uid (which item to search for) and the daemon (which peer to search from) for later get request iterations */ + test_get->uid = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_puts); + test_get->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers)); + #if VERBOSE > 1 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d gets succeeded, %d gets failed!\n", gets_completed, gets_failed); #endif update_meter(get_meter); if ((gets_completed + gets_failed == num_gets) && (outstanding_gets == 0)) { + fprintf(stderr, "Canceling die task (get_stop_finished) %llu gets completed, %llu gets failed\n", gets_completed, gets_failed); GNUNET_SCHEDULER_cancel(sched, die_task); - //GNUNET_SCHEDULER_add_now(sched, &finish_testing, NULL); - if (dhtlog_handle != NULL) + reset_meter(put_meter); + reset_meter(get_meter); + /** + * Handle all cases: + * 1) Testing is completely finished, call the topology iteration dealy and die + * 2) Testing is not finished, churn the network and do gets again (current_churn_round < churn_rounds) + * 3) Testing is not finished, reschedule all the PUTS *and* GETS again (num_rounds > 1) + */ + if (rounds_finished == total_rounds - 1) /* Everything is finished, end testing */ { - topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext)); - topo_ctx->cont = &log_dht_statistics; - GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx); + if (dhtlog_handle != NULL) + { + topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext)); + topo_ctx->cont = &log_dht_statistics; + GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx); + } + else + GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL); + } + else if (current_churn_round < churns_per_round * (rounds_finished + 1)) /* Do next round of churn */ + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Current churn round %u, real round %u, scheduling next round of churn.\n", current_churn_round, rounds_finished + 1); + gets_completed = 0; + gets_failed = 0; + current_churn_round++; + + if (dhtlog_handle != NULL) + dhtlog_handle->insert_round(DHT_ROUND_CHURN, rounds_finished); + + GNUNET_SCHEDULER_add_now(sched, &churn_peers, NULL); + } + else if (rounds_finished < total_rounds - 1) /* Start a new complete round */ + { + rounds_finished++; + gets_completed = 0; + gets_failed = 0; + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Round %u of %llu finished, scheduling next round.\n", rounds_finished, total_rounds); + /** Make sure we only get here after churning appropriately */ + GNUNET_assert(current_churn_round == churn_rounds); + current_churn_round = 0; + + /** We reset the peer daemon for puts and gets on each disconnect, so all we need to do is start another round! */ + if (GNUNET_YES == in_dht_replication) /* Replication done in DHT, don't redo puts! */ + { + if (dhtlog_handle != NULL) + dhtlog_handle->insert_round(DHT_ROUND_GET, rounds_finished); + + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT), + &end_badly, "from do gets (next round)"); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), &do_get, all_gets); + } + else + { + if (dhtlog_handle != NULL) + dhtlog_handle->insert_round(DHT_ROUND_NORMAL, rounds_finished); + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, num_puts * 2)), + &end_badly, "from do puts"); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), &do_put, all_puts); + } } - else - GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL); } } @@ -890,18 +1576,14 @@ void get_result_iterator (void *cls, const void *data) { struct TestGetContext *test_get = cls; - GNUNET_HashCode search_key; /* Key stored under */ - char original_data[test_data_size]; /* Made up data to store */ - - memset(original_data, test_get->uid, sizeof(original_data)); - GNUNET_CRYPTO_hash(original_data, test_data_size, &search_key); if (test_get->succeeded == GNUNET_YES) return; /* Get has already been successful, probably ending now */ - if ((0 != memcmp(&search_key, key, sizeof (GNUNET_HashCode))) || (0 != memcmp(original_data, data, sizeof(original_data)))) + if (0 != memcmp(&known_keys[test_get->uid], key, sizeof (GNUNET_HashCode))) /* || (0 != memcmp(original_data, data, sizeof(original_data))))*/ { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Key or data is not the same as was inserted!\n"); + gets_completed++; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Key or data is not the same as was inserted!\n"); } else { @@ -933,8 +1615,6 @@ static void do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) { struct TestGetContext *test_get = cls; - GNUNET_HashCode key; /* Made up key to store data under */ - char data[test_data_size]; /* Made up data to store */ if (num_gets == 0) { @@ -944,23 +1624,26 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) if (test_get == NULL) return; /* End of the list */ - memset(data, test_get->uid, sizeof(data)); - GNUNET_CRYPTO_hash(data, test_data_size, &key); + /* Set this here in case we are re-running gets */ + test_get->succeeded = GNUNET_NO; + /* Check if more gets are outstanding than should be */ if (outstanding_gets > max_outstanding_gets) { - GNUNET_SCHEDULER_add_delayed (sched, get_delay, &do_get, test_get); + GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 200), &do_get, test_get); return; } + /* Connect to the first peer's DHT */ test_get->dht_handle = GNUNET_DHT_connect(sched, test_get->daemon->cfg, 10); - /* Insert the data at the first peer */ GNUNET_assert(test_get->dht_handle != NULL); outstanding_gets++; + + /* Insert the data at the first peer */ test_get->get_handle = GNUNET_DHT_get_start(test_get->dht_handle, - GNUNET_TIME_relative_get_forever(), + get_delay, 1, - &key, + &known_keys[test_get->uid], &get_result_iterator, test_get, &get_continuation, @@ -971,6 +1654,8 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) test_get->daemon->shortname); #endif test_get->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, get_timeout, &get_stop_task, test_get); + + /* Schedule the next request in the linked list of get requests */ GNUNET_SCHEDULER_add_now (sched, &do_get, test_get->next); } @@ -989,6 +1674,10 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) fprintf(stderr, "PUT Request failed!\n"); + /* Reset the daemon (which peer to insert at) for later put request iterations */ + if (replicate_same == GNUNET_NO) + test_put->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers)); + GNUNET_SCHEDULER_cancel(sched, test_put->disconnect_task); test_put->disconnect_task = GNUNET_SCHEDULER_add_now(sched, &put_disconnect_task, test_put); if (GNUNET_YES == update_meter(put_meter)) @@ -1002,15 +1691,15 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) topo_ctx->cls = all_gets; topo_ctx->timeout = DEFAULT_GET_TIMEOUT; die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT), - &end_badly, "from do gets"); + &end_badly, "from do gets (put finished)"); GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx); } else { + fprintf(stderr, "Scheduling die task (put finished)\n"); die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), - &end_badly, "from do gets"); + &end_badly, "from do gets (put finished)"); GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_GET_TIMEOUT, &do_get, all_gets); - GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL); } return; } @@ -1023,97 +1712,44 @@ static void do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) { struct TestPutContext *test_put = cls; - GNUNET_HashCode key; /* Made up key to store data under */ char data[test_data_size]; /* Made up data to store */ uint32_t rand; + int i; if (test_put == NULL) return; /* End of list */ - memset(data, test_put->uid, sizeof(data)); - GNUNET_CRYPTO_hash(data, test_data_size, &key); - - if (outstanding_puts > max_outstanding_puts) - { - GNUNET_SCHEDULER_add_delayed (sched, put_delay, &do_put, test_put); - return; - } - -#if VERBOSE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting put for uid %u from peer %s\n", - test_put->uid, - test_put->daemon->shortname); -#endif - test_put->dht_handle = GNUNET_DHT_connect(sched, test_put->daemon->cfg, 10); - - GNUNET_assert(test_put->dht_handle != NULL); - outstanding_puts++; - GNUNET_DHT_put(test_put->dht_handle, - &key, - 1, - sizeof(data), data, - GNUNET_TIME_absolute_get_forever(), - GNUNET_TIME_relative_get_forever(), - &put_finished, test_put); - test_put->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_get_forever(), &put_disconnect_task, test_put); - rand = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 2); - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, test_put->next); -} - -/** - * Context for sending out find peer requests. - */ -struct FindPeerContext -{ - /** - * How long to send find peer requests, once the settle time - * is over don't send any more out! - * - * TODO: Add option for settle time and find peer sending time? - */ - struct GNUNET_TIME_Absolute endtime; - - /** - * Number of connections in the current topology - * (after this round of find peer requests has ended). - */ - unsigned int current_peers; - - /** - * Number of connections in the current topology - * (before this round of find peer requests started). - */ - unsigned int previous_peers; - - /** - * Number of find peer requests we have currently - * outstanding. - */ - unsigned int outstanding; - - /** - * Number of find peer requests to send in this round. - */ - unsigned int total; - - /** - * Number of find peer requests sent last time around. - */ - unsigned int last_sent; + for (i = 0; i < sizeof(data); i++) + { + memset(&data[i], GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1), 1); + } - /** - * Hashmap of peers in the current topology, value - * is a PeerCount, with the number of connections - * this peer has. - */ - struct GNUNET_CONTAINER_MultiHashMap *peer_hash; + if (outstanding_puts > max_outstanding_puts) + { + GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 200), &do_put, test_put); + return; + } - /** - * Min heap which orders values in the peer_hash for - * easy lookup. - */ - struct GNUNET_CONTAINER_Heap *peer_min_heap; -}; +#if VERBOSE > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting put for uid %u from peer %s\n", + test_put->uid, + test_put->daemon->shortname); +#endif + test_put->dht_handle = GNUNET_DHT_connect(sched, test_put->daemon->cfg, 10); + + GNUNET_assert(test_put->dht_handle != NULL); + outstanding_puts++; + GNUNET_DHT_put(test_put->dht_handle, + &known_keys[test_put->uid], + 1, + sizeof(data), data, + GNUNET_TIME_absolute_get_forever(), + put_delay, + &put_finished, test_put); + test_put->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_get_forever(), &put_disconnect_task, test_put); + rand = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 2); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, test_put->next); +} static void schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc); @@ -1139,61 +1775,6 @@ static unsigned int connection_estimate(unsigned int peer_count, unsigned int bu } -struct PeerCount -{ - /** Node in the heap */ - struct GNUNET_CONTAINER_HeapNode *heap_node; - - /** Peer the count refers to */ - struct GNUNET_PeerIdentity peer_id; - - /** Count of connections this peer has */ - unsigned int count; -}; - - -/** - * Add a connection to the find_peer_context given. This may - * be complete overkill, but allows us to choose the peers with - * the least connections to initiate find peer requests from. - */ -static void add_new_connection(struct FindPeerContext *find_peer_context, - const struct GNUNET_PeerIdentity *first, - const struct GNUNET_PeerIdentity *second) -{ - struct PeerCount *first_count; - struct PeerCount *second_count; - - if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &first->hashPubKey)) - { - first_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &first->hashPubKey); - first_count->count++; - GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, first_count->heap_node, first_count->count); - } - else - { - first_count = GNUNET_malloc(sizeof(struct PeerCount)); - first_count->count = 1; - memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity)); - first_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, first_count->count); - GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &first->hashPubKey, first_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - } - - if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &second->hashPubKey)) - { - second_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &second->hashPubKey); - second_count->count++; - GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, second_count->heap_node, second_count->count); - } - else - { - second_count = GNUNET_malloc(sizeof(struct PeerCount)); - second_count->count = 1; - memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity)); - second_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, second_count->count); - GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &second->hashPubKey, second_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - } -} /** * Callback for iterating over all the peer connections of a peer group. @@ -1213,112 +1794,29 @@ void count_peers_cb (void *cls, } else { - GNUNET_assert(dhtlog_handle != NULL); - /*GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections), %u new peers, connection estimate %u\n", find_peer_context->current_peers, find_peer_context->current_peers - find_peer_context->previous_peers, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));*/ + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections), %u new peers, connection estimate %u (double %u)\n", + find_peer_context->current_peers, + find_peer_context->current_peers - find_peer_context->previous_peers, + connection_estimate(num_peers, DEFAULT_BUCKET_SIZE), + 2 * connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)); + if ((find_peer_context->current_peers - find_peer_context->previous_peers > FIND_PEER_THRESHOLD) && - (find_peer_context->current_peers < connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) && + (find_peer_context->current_peers < 2 * connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) && (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)) { GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context); } else { + GNUNET_CONTAINER_multihashmap_iterate(find_peer_context->peer_hash, &remove_peer_count, find_peer_context); + GNUNET_CONTAINER_multihashmap_destroy(find_peer_context->peer_hash); + GNUNET_CONTAINER_heap_destroy(find_peer_context->peer_min_heap); + GNUNET_free(find_peer_context); fprintf(stderr, "Not sending any more find peer requests.\n"); } } } -/** - * Connect to all peers in the peer group and iterate over their - * connections. - */ -static void -count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) -{ - struct FindPeerContext *find_peer_context = cls; - find_peer_context->previous_peers = find_peer_context->current_peers; - find_peer_context->current_peers = 0; - GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context); -} - - -static void -decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) -{ - struct TestFindPeer *test_find_peer = cls; - GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0); - test_find_peer->find_peer_context->outstanding--; - test_find_peer->find_peer_context->total--; - if ((0 == test_find_peer->find_peer_context->total) && - (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value > 0)) - { - GNUNET_SCHEDULER_add_now(sched, &count_new_peers, test_find_peer->find_peer_context); - } - GNUNET_free(test_find_peer); -} - -/** - * A find peer request has been sent to the server, now we will schedule a task - * to wait the appropriate time to allow the request to go out and back. - * - * @param cls closure - a TestFindPeer struct - * @param tc context the task is being called with - */ -static void -handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) -{ - struct TestFindPeer *test_find_peer = cls; - - GNUNET_DHT_disconnect(test_find_peer->dht_handle); - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, test_find_peer); -} - -static void -send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) -{ - struct TestFindPeer *test_find_peer = cls; - - if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers) - { - GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, &send_find_peer_request, test_find_peer); - return; - } - - test_find_peer->find_peer_context->outstanding++; - if (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value == 0) - { - GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer); - return; - } - - test_find_peer->dht_handle = GNUNET_DHT_connect(sched, test_find_peer->daemon->cfg, 1); - GNUNET_assert(test_find_peer->dht_handle != NULL); - GNUNET_DHT_find_peers (test_find_peer->dht_handle, - &handle_find_peer_sent, test_find_peer); -} - -/** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int remove_peer_count (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct FindPeerContext *find_peer_ctx = cls; - struct PeerCount *peer_count = value; - GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, peer_count->heap_node); - GNUNET_free(peer_count); - - return GNUNET_YES; -} - /** * Set up a single find peer request for each peer in the topology. Do this @@ -1352,7 +1850,7 @@ schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContex find_peer_ctx->total = max_outstanding_find_peers; find_peer_ctx->last_sent = find_peer_ctx->total; - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal %u connections)\n", find_peer_ctx->total, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)); + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal at least %u connections)\n", find_peer_ctx->total, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)); find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, find_peer_ctx->total); for (i = 0; i < find_peer_ctx->total; i++) @@ -1420,13 +1918,16 @@ setup_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) uint32_t temp_daemon; struct TestPutContext *test_put; struct TestGetContext *test_get; +#if REMEMBER int remember[num_puts][num_peers]; - memset(&remember, 0, sizeof(int) * num_puts * num_peers); +#endif + known_keys = GNUNET_malloc(sizeof(GNUNET_HashCode) * num_puts); for (i = 0; i < num_puts; i++) { test_put = GNUNET_malloc(sizeof(struct TestPutContext)); test_put->uid = i; + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &known_keys[i]); temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers); test_put->daemon = GNUNET_TESTING_daemon_get(pg, temp_daemon); test_put->next = all_puts; @@ -1437,11 +1938,12 @@ setup_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) { test_get = GNUNET_malloc(sizeof(struct TestGetContext)); test_get->uid = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_puts); - temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers); +#if REMEMBER while (remember[test_get->uid][temp_daemon] == 1) temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers); - test_get->daemon = GNUNET_TESTING_daemon_get(pg, temp_daemon); remember[test_get->uid][temp_daemon] = 1; +#endif + test_get->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers)); test_get->next = all_gets; all_gets = test_get; } @@ -1488,6 +1990,7 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t if (GNUNET_YES == do_find_peer) { find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext)); + find_peer_context->count_peers_cb = &count_peers_cb; find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time)); GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context); } @@ -1627,20 +2130,22 @@ setup_malicious_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc GNUNET_SCHEDULER_add_now (sched, &set_malicious, ctx); } + /** + * If we have any malicious peers to set up, + * the malicious callback should call continue_gets_and_puts + */ if (malicious_getters + malicious_putters + malicious_droppers > 0) - die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (malicious_getters + malicious_putters + malicious_droppers) * 2), - &end_badly, "from set malicious"); - else { - if (dhtlog_handle != NULL) - GNUNET_SCHEDULER_add_now (sched, - &continue_puts_and_gets, NULL); - else - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), - &continue_puts_and_gets, NULL); + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Giving malicious set tasks some time before starting testing!\n"); + die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (malicious_getters + malicious_putters + malicious_droppers) * 2), + &end_badly, "from set malicious"); + } + else /* Otherwise, continue testing */ + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Scheduling continue_puts_and_gets now!\n"); + GNUNET_SCHEDULER_add_now (sched, + &continue_puts_and_gets, NULL); } - } /** @@ -1674,15 +2179,16 @@ topology_callback (void *cls, distance); #endif } -#if VERBOSE else { failed_connections++; +#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to connect peer %s to peer %s with error :\n%s\n", first_daemon->shortname, second_daemon->shortname, emsg); - } #endif + } + GNUNET_assert(peer_connect_meter != NULL); if (GNUNET_YES == update_meter(peer_connect_meter)) { @@ -1855,6 +2361,7 @@ run (void *cls, struct GNUNET_DHTLOG_TrialInfo trial_info; struct GNUNET_TESTING_Host *hosts; struct GNUNET_TESTING_Host *temphost; + struct GNUNET_TESTING_Host *tempnext; char *topology_str; char *connect_topology_str; char *blacklist_topology_str; @@ -1872,10 +2379,15 @@ run (void *cls, int strict_kademlia; char *buf; char *data; + char *churn_data; + char *churn_filename; int count; + int ret; + unsigned int line_number; sched = s; config = cfg; + rounds_finished = 0; memset(&trial_info, 0, sizeof(struct GNUNET_DHTLOG_TrialInfo)); /* Get path from configuration file */ if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_string(cfg, "paths", "servicehome", &test_directory)) @@ -1917,6 +2429,100 @@ run (void *cls, &trialmessage)) trialmessage = NULL; + churn_data = NULL; + /** Check for a churn file to do churny simulation */ + if (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "churn_file", + &churn_filename)) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Reading churn data from %s\n", churn_filename); + if (GNUNET_OK != GNUNET_DISK_file_test (churn_filename)) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading churn file!\n"); + return; + } + if ((0 != STAT (churn_filename, &frstat)) || (frstat.st_size == 0)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not open file specified for churn data, ending test!"); + ok = 1119; + GNUNET_free_non_null(trialmessage); + GNUNET_free(churn_filename); + return; + } + + churn_data = GNUNET_malloc_large (frstat.st_size); + GNUNET_assert(churn_data != NULL); + if (frstat.st_size != + GNUNET_DISK_fn_read (churn_filename, churn_data, frstat.st_size)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not read file %s specified for churn, ending test!", churn_filename); + GNUNET_free (churn_filename); + GNUNET_free (churn_data); + GNUNET_free_non_null(trialmessage); + return; + } + + GNUNET_free_non_null(churn_filename); + + buf = churn_data; + count = 0; + /* Read the first line */ + while (count < frstat.st_size) + { + count++; + if (((churn_data[count] == '\n')) && (buf != &churn_data[count])) + { + churn_data[count] = '\0'; + if (1 != sscanf(buf, "%u", &churn_rounds)) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to read number of rounds from %s, ending test!\n", churn_filename); + GNUNET_free_non_null(trialmessage); + GNUNET_free(churn_filename); + ret = 4200; + return; + } + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Read %u rounds from churn file\n", churn_rounds); + buf = &churn_data[count + 1]; + churn_array = GNUNET_malloc(sizeof(unsigned int) * churn_rounds); + } + } + + if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number(cfg, "dht_testing", "churns_per_round", &churns_per_round)) + { + churns_per_round = (unsigned long long)churn_rounds; + } + + line_number = 0; + while ((count < frstat.st_size) && (line_number < churn_rounds)) + { + count++; + if (((churn_data[count] == '\n')) && (buf != &churn_data[count])) + { + churn_data[count] = '\0'; + + ret = sscanf(buf, "%u", &churn_array[line_number]); + if (1 == ret) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Read %u peers in round %u\n", churn_array[line_number], line_number); + line_number++; + } + else + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading line `%s' in hostfile\n", buf); + buf = &churn_data[count + 1]; + continue; + } + buf = &churn_data[count + 1]; + } + else if (churn_data[count] == '\n') /* Blank line */ + buf = &churn_data[count + 1]; + } + } + GNUNET_free_non_null(churn_data); + + /** Check for a hostfile containing user@host:port triples */ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "testing", "hostfile", &hostfile)) @@ -1959,11 +2565,24 @@ run (void *cls, while (count < frstat.st_size) { count++; - if (((data[count] == '\n') || (data[count] == '\0')) && (buf != &data[count])) + /* if (((data[count] == '\n') || (data[count] == '\0')) && (buf != &data[count]))*/ + if (((data[count] == '\n')) && (buf != &data[count])) { data[count] = '\0'; temphost = GNUNET_malloc(sizeof(struct GNUNET_TESTING_Host)); - temphost->hostname = buf; + ret = sscanf(buf, "%a[a-zA-Z0-9]@%a[a-zA-Z0-9.]:%hd", &temphost->username, &temphost->hostname, &temphost->port); + if (3 == ret) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Successfully read host %s, port %d and user %s from file\n", temphost->hostname, temphost->port, temphost->username); + } + else + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading line `%s' in hostfile\n", buf); + GNUNET_free(temphost); + buf = &data[count + 1]; + continue; + } + /* temphost->hostname = buf; */ temphost->next = hosts; hosts = temphost; buf = &data[count + 1]; @@ -2043,7 +2662,7 @@ run (void *cls, &temp_config_number)) all_get_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, temp_config_number); else - all_get_timeout.value = get_timeout.value * ((num_gets / max_outstanding_gets) + 1); + all_get_timeout.value = get_timeout.value * num_gets; if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "get_delay", @@ -2076,6 +2695,10 @@ run (void *cls, /** * Get testing related options. */ + if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "DHT_TESTING", "REPLICATE_SAME")) + { + replicate_same = GNUNET_YES; + } if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING", "MALICIOUS_GET_FREQUENCY", @@ -2088,14 +2711,25 @@ run (void *cls, &malicious_put_frequency)) malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY; + + /* The normal behavior of the DHT is to do find peer requests + * on its own. Only if this is explicitly turned off should + * the testing driver issue find peer requests (even though + * this is likely the default when testing). + */ if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht", - "find_peers")) + "do_find_peer")) { - do_find_peer = GNUNET_NO; + do_find_peer = GNUNET_YES; + } + + if (GNUNET_YES == + GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht", + "republish")) + { + in_dht_replication = GNUNET_YES; } - else - do_find_peer = GNUNET_YES; if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING", "TRIAL_TO_RUN", @@ -2113,6 +2747,9 @@ run (void *cls, else find_peer_delay = DEFAULT_FIND_PEER_DELAY; + if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number(cfg, "DHT_TESTING", "ROUND_DELAY", &round_delay)) + round_delay = 0; + if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING", "OUTSTANDING_FIND_PEERS", &max_outstanding_find_peers)) @@ -2123,6 +2760,13 @@ run (void *cls, find_peer_offset = GNUNET_TIME_relative_divide (find_peer_delay, max_outstanding_find_peers); + if (GNUNET_SYSERR == + GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "num_rounds", + &total_rounds)) + { + total_rounds = 1; + } + topology_str = NULL; if ((GNUNET_YES == GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology", @@ -2214,6 +2858,7 @@ run (void *cls, /* Set peers_left so we know when all peers started */ peers_left = num_peers; + /* Set up a task to end testing if peer start fails */ die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(seconds_per_peer_start, num_peers), @@ -2268,8 +2913,15 @@ run (void *cls, &peers_started_callback, NULL, &topology_callback, NULL, hosts); - - GNUNET_free_non_null(temphost); + temphost = hosts; + while (temphost != NULL) + { + tempnext = temphost->next; + GNUNET_free (temphost->username); + GNUNET_free (temphost->hostname); + GNUNET_free (temphost); + temphost = tempnext; + } } -- 2.25.1