many useless changes to the dht testing driver, mostly for churn (which may not even...
authorNathan S. Evans <evans@in.tum.de>
Thu, 16 Sep 2010 14:13:57 +0000 (14:13 +0000)
committerNathan S. Evans <evans@in.tum.de>
Thu, 16 Sep 2010 14:13:57 +0000 (14:13 +0000)
src/dht/gnunet-dht-driver.c

index 07f1fbb04b0581ab254e05d9e444c3c48a86780a..cdd5e7c169ec33c93c35a45f7cd466d569a079d3 100644 (file)
@@ -59,7 +59,7 @@
 
 #define DEFAULT_BUCKET_SIZE 4
 
-#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2
+#define FIND_PEER_THRESHOLD 1
 
 /* If more than this many peers are added, slow down sending */
 #define MAX_FIND_PEER_CUTOFF 2500
@@ -79,6 +79,8 @@
 
 #define DEFAULT_TOPOLOGY_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
 
+#define DEFAULT_RECONNECT_ATTEMPTS 8
+
 /*
  * Default frequency for sending malicious get messages
  */
@@ -259,6 +261,104 @@ struct TopologyIteratorContext
   struct GNUNET_TIME_Relative timeout;
 };
 
+
+struct PeerCount
+{
+  /** Node in the heap */
+  struct GNUNET_CONTAINER_HeapNode *heap_node;
+
+  /** Peer the count refers to */
+  struct GNUNET_PeerIdentity peer_id;
+
+  /** Count of connections this peer has */
+  unsigned int count;
+};
+
+/**
+ * Context for sending out find peer requests.
+ */
+struct FindPeerContext
+{
+  /**
+   * How long to send find peer requests, once the settle time
+   * is over don't send any more out!
+   *
+   * TODO: Add option for settle time and find peer sending time?
+   */
+  struct GNUNET_TIME_Absolute endtime;
+
+  /**
+   * Number of connections in the current topology
+   * (after this round of find peer requests has ended).
+   */
+  unsigned int current_peers;
+
+  /**
+   * Number of connections in the current topology
+   * (before this round of find peer requests started).
+   */
+  unsigned int previous_peers;
+
+  /**
+   * Number of find peer requests we have currently
+   * outstanding.
+   */
+  unsigned int outstanding;
+
+  /**
+   * Number of find peer requests to send in this round.
+   */
+  unsigned int total;
+
+  /**
+   * Number of find peer requests sent last time around.
+   */
+  unsigned int last_sent;
+
+  /**
+   * Hashmap of peers in the current topology, value
+   * is a PeerCount, with the number of connections
+   * this peer has.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *peer_hash;
+
+  /**
+   * Min heap which orders values in the peer_hash for
+   * easy lookup.
+   */
+  struct GNUNET_CONTAINER_Heap *peer_min_heap;
+
+  /**
+   * Callback for counting the peers in the current topology.
+   */
+  GNUNET_TESTING_NotifyTopology count_peers_cb;
+};
+
+enum DHT_ROUND_TYPES
+{
+  /**
+   * Next full round (puts + gets).
+   */
+  DHT_ROUND_NORMAL,
+
+  /**
+   * Next round of gets.
+   */
+  DHT_ROUND_GET,
+
+  /**
+   * Next round of puts.
+   */
+  DHT_ROUND_PUT,
+
+  /**
+   * Next round of churn.
+   */
+  DHT_ROUND_CHURN
+};
+
+
+
 /* Globals */
 
 /**
@@ -281,7 +381,9 @@ static struct GNUNET_TIME_Relative find_peer_offset;
 
 static struct GNUNET_TIME_Relative seconds_per_peer_start;
 
-static int do_find_peer;
+static unsigned int do_find_peer;
+
+static unsigned int in_dht_replication;
 
 static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE;
 
@@ -295,6 +397,8 @@ static unsigned long long max_outstanding_find_peers;
 
 static unsigned long long malicious_putters;
 
+static unsigned long long round_delay;
+
 static unsigned long long malicious_droppers;
 
 static unsigned long long malicious_get_frequency;
@@ -309,6 +413,42 @@ static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
 
 static unsigned long long trialuid;
 
+/**
+ * If GNUNET_YES, insert data at the same peers every time.
+ * Otherwise, choose a new random peer to insert at each time.
+ */
+static unsigned int replicate_same;
+
+/**
+ * Number of rounds for testing (PUTS + GETS)
+ */
+static unsigned long long total_rounds;
+
+/**
+ * Number of rounds already run
+ */
+static unsigned int rounds_finished;
+
+/**
+ * Number of rounds of churn to read from the file (first line, should be a single number).
+ */
+static unsigned int churn_rounds;
+
+/**
+ * Current round we are in for churn, tells us how many peers to connect/disconnect.
+ */
+static unsigned int current_churn_round;
+
+/**
+ * Number of times to churn per round
+ */
+static unsigned long long churns_per_round;
+
+/**
+ * Array of churn values.
+ */
+static unsigned int *churn_array;
+
 /**
  * Hash map of stats contexts.
  */
@@ -449,6 +589,8 @@ static struct ProgressMeter *put_meter;
 
 static struct ProgressMeter *get_meter;
 
+static GNUNET_HashCode *known_keys;
+
 /* Global return value (0 for success, anything else for failure) */
 static int ok;
 
@@ -515,6 +657,24 @@ update_meter(struct ProgressMeter *meter)
   return GNUNET_NO;
 }
 
+/**
+ * Reset progress meter.
+ *
+ * @param meter the meter to reset
+ *
+ * @return GNUNET_YES if meter reset,
+ *         GNUNET_SYSERR on error
+ */
+static int
+reset_meter(struct ProgressMeter *meter)
+{
+  if (meter == NULL)
+    return GNUNET_SYSERR;
+
+  meter->completed = 0;
+  return GNUNET_YES;
+}
+
 /**
  * Release resources for meter
  *
@@ -550,6 +710,7 @@ put_disconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   test_put->disconnect_task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_DHT_disconnect(test_put->dht_handle);
   test_put->dht_handle = NULL;
+  test_put->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
 }
 
 /**
@@ -825,6 +986,475 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   ok = 1;
 }
 
+/**
+ * Forward declaration.
+ */
+static void
+do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc);
+
+/**
+ * Forward declaration.
+ */
+static void
+do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc);
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int remove_peer_count (void *cls,
+                              const GNUNET_HashCode * key,
+                              void *value)
+{
+  struct FindPeerContext *find_peer_ctx = cls;
+  struct PeerCount *peer_count = value;
+  GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, peer_count->heap_node);
+  GNUNET_free(peer_count);
+
+  return GNUNET_YES;
+}
+
+/**
+ * Connect to all peers in the peer group and iterate over their
+ * connections.
+ */
+static void
+count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  find_peer_context->previous_peers = find_peer_context->current_peers;
+  find_peer_context->current_peers = 0;
+  GNUNET_TESTING_get_topology (pg, find_peer_context->count_peers_cb, find_peer_context);
+}
+
+static void
+decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+  GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
+  test_find_peer->find_peer_context->outstanding--;
+  test_find_peer->find_peer_context->total--;
+  if ((0 == test_find_peer->find_peer_context->total) &&
+      (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value > 0))
+  {
+    GNUNET_SCHEDULER_add_now(sched, &count_new_peers, test_find_peer->find_peer_context);
+  }
+  GNUNET_free(test_find_peer);
+}
+
+/**
+ * A find peer request has been sent to the server, now we will schedule a task
+ * to wait the appropriate time to allow the request to go out and back.
+ *
+ * @param cls closure - a TestFindPeer struct
+ * @param tc context the task is being called with
+ */
+static void
+handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  GNUNET_DHT_disconnect(test_find_peer->dht_handle);
+  GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, test_find_peer);
+}
+
+
+static void
+send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers)
+  {
+    GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, &send_find_peer_request, test_find_peer);
+    return;
+  }
+
+  test_find_peer->find_peer_context->outstanding++;
+  if (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value == 0)
+  {
+    GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
+    return;
+  }
+
+  test_find_peer->dht_handle = GNUNET_DHT_connect(sched, test_find_peer->daemon->cfg, 1);
+  GNUNET_assert(test_find_peer->dht_handle != NULL);
+  GNUNET_DHT_find_peers (test_find_peer->dht_handle,
+                         &handle_find_peer_sent, test_find_peer);
+}
+
+/**
+ * Set up a single find peer request for each peer in the topology.  Do this
+ * until the settle time is over, limited by the number of outstanding requests
+ * and the time allowed for each one!
+ */
+static void
+schedule_churn_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_ctx = cls;
+  struct TestFindPeer *test_find_peer;
+  struct PeerCount *peer_count;
+  uint32_t i;
+
+  if (find_peer_ctx->previous_peers == 0) /* First time, go slowly */
+    find_peer_ctx->total = 1;
+  else if (find_peer_ctx->current_peers - find_peer_ctx->previous_peers > MAX_FIND_PEER_CUTOFF) /* Found LOTS of peers, still go slowly */
+    find_peer_ctx->total = find_peer_ctx->last_sent - (find_peer_ctx->last_sent / 8);
+  else
+    find_peer_ctx->total = find_peer_ctx->last_sent * 2;
+
+  if (find_peer_ctx->total > max_outstanding_find_peers)
+    find_peer_ctx->total = max_outstanding_find_peers;
+
+  find_peer_ctx->last_sent = find_peer_ctx->total;
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (after churn)\n", find_peer_ctx->total);
+
+  find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, find_peer_ctx->total);
+  for (i = 0; i < find_peer_ctx->total; i++)
+    {
+      test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer));
+      /* If we have sent requests, choose peers with a low number of connections to send requests from */
+      peer_count = GNUNET_CONTAINER_heap_remove_root(find_peer_ctx->peer_min_heap);
+      GNUNET_CONTAINER_multihashmap_remove(find_peer_ctx->peer_hash, &peer_count->peer_id.hashPubKey, peer_count);
+      test_find_peer->daemon = GNUNET_TESTING_daemon_get_by_id(pg, &peer_count->peer_id);
+      GNUNET_assert(test_find_peer->daemon != NULL);
+      test_find_peer->find_peer_context = find_peer_ctx;
+      GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(find_peer_offset, i), &send_find_peer_request, test_find_peer);
+    }
+
+  if ((find_peer_ctx->peer_hash == NULL) && (find_peer_ctx->peer_min_heap == NULL))
+    {
+      find_peer_ctx->peer_hash = GNUNET_CONTAINER_multihashmap_create(num_peers);
+      find_peer_ctx->peer_min_heap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
+    }
+  else
+    {
+      GNUNET_CONTAINER_multihashmap_iterate(find_peer_ctx->peer_hash, &remove_peer_count, find_peer_ctx);
+      GNUNET_CONTAINER_multihashmap_destroy(find_peer_ctx->peer_hash);
+      find_peer_ctx->peer_hash = GNUNET_CONTAINER_multihashmap_create(num_peers);
+    }
+
+  GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(find_peer_ctx->peer_hash));
+  GNUNET_assert(0 == GNUNET_CONTAINER_heap_get_size(find_peer_ctx->peer_min_heap));
+}
+
+/**
+ * Add a connection to the find_peer_context given.  This may
+ * be complete overkill, but allows us to choose the peers with
+ * the least connections to initiate find peer requests from.
+ */
+static void add_new_connection(struct FindPeerContext *find_peer_context,
+                               const struct GNUNET_PeerIdentity *first,
+                               const struct GNUNET_PeerIdentity *second)
+{
+  struct PeerCount *first_count;
+  struct PeerCount *second_count;
+
+  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &first->hashPubKey))
+  {
+    first_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &first->hashPubKey);
+    first_count->count++;
+    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, first_count->heap_node, first_count->count);
+  }
+  else
+  {
+    first_count = GNUNET_malloc(sizeof(struct PeerCount));
+    first_count->count = 1;
+    memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity));
+    first_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, first_count->count);
+    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &first->hashPubKey, first_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  }
+
+  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &second->hashPubKey))
+  {
+    second_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &second->hashPubKey);
+    second_count->count++;
+    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, second_count->heap_node, second_count->count);
+  }
+  else
+  {
+    second_count = GNUNET_malloc(sizeof(struct PeerCount));
+    second_count->count = 1;
+    memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity));
+    second_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, second_count->count);
+    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &second->hashPubKey, second_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  }
+}
+
+
+/**
+ * Iterate over min heap of connections per peer.  For any
+ * peer that has 0 connections, attempt to connect them to
+ * some random peer.
+ *
+ * @param cls closure a struct FindPeerContext
+ * @param node internal node of the heap
+ * @param element value stored, a struct PeerCount
+ * @param cost cost associated with the node
+ * @return GNUNET_YES if we should continue to iterate,
+ *         GNUNET_NO if not.
+ */
+static int iterate_min_heap_peers (void *cls,
+                                   struct GNUNET_CONTAINER_HeapNode *node,
+                                   void *element,
+                                   GNUNET_CONTAINER_HeapCostType cost)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  struct PeerCount *peer_count = element;
+  struct GNUNET_TESTING_Daemon *d1;
+  struct GNUNET_TESTING_Daemon *d2;
+  struct GNUNET_TIME_Relative timeout;
+  if (cost == 0)
+    {
+      d1 = GNUNET_TESTING_daemon_get_by_id (pg, &peer_count->peer_id);
+      d2 = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
+      /** Just try to connect the peers, don't worry about callbacks, etc. **/
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer %s has 0 connections.  Trying to connect to %s...\n", GNUNET_i2s(&peer_count->peer_id), d2->shortname);
+      timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, DEFAULT_CONNECT_TIMEOUT);
+      if (GNUNET_TIME_relative_to_absolute(timeout).value > find_peer_context->endtime.value)
+        {
+          timeout = GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime);
+        }
+      GNUNET_TESTING_daemons_connect(d1, d2, timeout, DEFAULT_RECONNECT_ATTEMPTS, NULL, NULL);
+    }
+  if (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)
+    return GNUNET_YES;
+  else
+    return GNUNET_NO;
+}
+
+/**
+ * Callback for iterating over all the peer connections of a peer group.
+ * Used after we have churned on some peers to find which ones have zero
+ * connections so we can make them issue find peer requests.
+ */
+void count_peers_churn_cb (void *cls,
+                           const struct GNUNET_PeerIdentity *first,
+                           const struct GNUNET_PeerIdentity *second,
+                           struct GNUNET_TIME_Relative latency,
+                           uint32_t distance,
+                           const char *emsg)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  struct TopologyIteratorContext *topo_ctx;
+  struct PeerCount *peer_count;
+
+  if ((first != NULL) && (second != NULL))
+    {
+      add_new_connection(find_peer_context, first, second);
+      find_peer_context->current_peers++;
+    }
+  else
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections)\n",
+                                            find_peer_context->current_peers);
+      peer_count = GNUNET_CONTAINER_heap_peek(find_peer_context->peer_min_heap);
+
+      /* WAIT. When peers are churned they will come back with their peers (at least in peerinfo), because the HOSTS file doesn't likely get removed. CRAP. */
+      /* NO they won't, because we have disabled peerinfo writing to disk (remember?) so we WILL have to give them new connections */
+      /* Best course of action: have DHT automatically try to add peers from peerinfo on startup. This way IF peerinfo writes to file
+       * then some peers will end up connected.
+       *
+       * Also, find any peers that have zero connections here and set up a task to choose at random another peer in the network to
+       * connect to.  Of course, if they are blacklisted from that peer they won't be able to connect, so we will have to keep trying
+       * until they get a peer.
+       */
+      /* However, they won't automatically be connected to any of their previous peers... How can we handle that? */
+      /* So now we have choices: do we want them to come back with all their connections?  Probably not, but it solves this mess. */
+
+      /* Second problem, which is still a problem, is that a FIND_PEER request won't work when a peer has no connections */
+
+      /**
+       * Okay, so here's how this *should* work now.
+       *
+       * 1. We check the min heap for any peers that have 0 connections.
+       *    a. If any are found, we iterate over the heap and just randomly
+       *       choose another peer and ask testing to please connect the two.
+       *       This takes care of the case that a peer just randomly joins the
+       *       network.  However, if there are strict topology restrictions
+       *       (imagine a ring) choosing randomly most likely won't help.
+       *       We make sure the connection attempt doesn't take longer than
+       *       the total timeout, but don't care too much about the result.
+       *    b. After that, we still schedule the find peer requests (concurrently
+       *       with the connect attempts most likely).  This handles the case
+       *       that the DHT iterates over peerinfo and just needs to try to send
+       *       a message to get connected.  This should handle the case that the
+       *       topology is very strict.
+       *
+       * 2. If all peers have > 0 connections, we still send find peer requests
+       *    as long as possible (until timeout is reached) to help out those
+       *    peers that were newly churned and need more connections.  This is because
+       *    once all new peers have established a single connection, they won't be
+       *    well connected.
+       *
+       * 3. Once we reach the timeout, we can do no more.  We must schedule the
+       *    next iteration of get requests regardless of connections that peers
+       *    may or may not have.
+       *
+       * Caveat: it would be nice to get peers to take data offline with them and
+       *         come back with it (or not) based on the testing framework.  The
+       *         same goes for remembering previous connections, but putting either
+       *         into the general testing churn options seems like overkill because
+       *         these are very specialized cases.
+       */
+      if ((peer_count->count == 0) && (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
+        {
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Found peer with no connections, will choose some peers at random to connect to!\n");
+          GNUNET_CONTAINER_heap_iterate (find_peer_context->peer_min_heap, &iterate_min_heap_peers, find_peer_context);
+          GNUNET_SCHEDULER_add_now(sched, &schedule_churn_find_peer_requests, find_peer_context);
+        }
+      else if (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)
+        {
+          GNUNET_SCHEDULER_add_now(sched, &schedule_churn_find_peer_requests, find_peer_context);
+        }
+      else
+        {
+          GNUNET_CONTAINER_multihashmap_iterate(find_peer_context->peer_hash, &remove_peer_count, find_peer_context);
+          GNUNET_CONTAINER_multihashmap_destroy(find_peer_context->peer_hash);
+          GNUNET_CONTAINER_heap_destroy(find_peer_context->peer_min_heap);
+          GNUNET_free(find_peer_context);
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn round %u of %llu finished, scheduling next GET round.\n", current_churn_round, churn_rounds);
+          if (dhtlog_handle != NULL)
+            {
+              topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+              topo_ctx->cont = &do_get;
+              topo_ctx->cls = all_gets;
+              topo_ctx->timeout = DEFAULT_GET_TIMEOUT;
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                       &end_badly, "from do gets (count_peers_churn_cb)");
+              GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
+            }
+          else
+            {
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                       &end_badly, "from do gets (count_peers_churn_cb)");
+              GNUNET_SCHEDULER_add_now(sched, &do_get, all_gets);
+            }
+        }
+    }
+}
+
+/**
+ * Called when churning of the topology has finished.
+ *
+ * @param cls closure unused
+ * @param emsg NULL on success, or a printable error on failure
+ */
+static void churn_complete (void *cls, const char *emsg)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  struct PeerCount *peer_count;
+  unsigned int i;
+  struct GNUNET_TESTING_Daemon *temp_daemon;
+  struct TopologyIteratorContext *topo_ctx;
+
+  if (emsg != NULL)
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Ending test, churning of peers failed with error `%s'", emsg);
+      GNUNET_SCHEDULER_add_now(sched, &end_badly, (void *)emsg);
+      return;
+    }
+
+  /**
+   * If we switched any peers on, we have to somehow force connect the new peer to
+   * SOME bootstrap peer in the network.  First schedule a task to find all peers
+   * with no connections, then choose a random peer for each and connect them.
+   */
+  if (find_peer_context != NULL)
+    {
+      for (i = 0; i < num_peers; i ++)
+        {
+          temp_daemon = GNUNET_TESTING_daemon_get(pg, i);
+          peer_count = GNUNET_malloc(sizeof(struct PeerCount));
+          memcpy(&peer_count->peer_id, &temp_daemon->id, sizeof(struct GNUNET_PeerIdentity));
+          peer_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, peer_count, peer_count->count);
+          GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &temp_daemon->id.hashPubKey, peer_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+        }
+      GNUNET_TESTING_get_topology (pg, &count_peers_churn_cb, find_peer_context);
+    }
+  else
+    {
+      if (dhtlog_handle != NULL)
+        {
+          topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+          topo_ctx->cont = &do_get;
+          topo_ctx->cls = all_gets;
+          topo_ctx->timeout = DEFAULT_GET_TIMEOUT;
+          die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                   &end_badly, "from do gets (churn_complete)");
+          GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
+        }
+      else
+        {
+          die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                   &end_badly, "from do gets (churn_complete)");
+          if (dhtlog_handle != NULL)
+            dhtlog_handle->insert_round(DHT_ROUND_GET, rounds_finished);
+          GNUNET_SCHEDULER_add_now(sched, &do_get, all_gets);
+        }
+    }
+}
+
+/**
+ * Decide how many peers to turn on or off in this round, make sure the
+ * numbers actually make sense, then do so.  This function sets in motion
+ * churn, find peer requests for newly joined peers, and issuing get
+ * requests once the new peers have done so.
+ *
+ * @param cls closure (unused)
+ * @param cls task context (unused)
+ */
+static void
+churn_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  unsigned int count_running;
+  unsigned int churn_up;
+  unsigned int churn_down;
+  struct GNUNET_TIME_Relative timeout;
+  struct FindPeerContext *find_peer_context;
+
+  churn_up = churn_down = 0;
+  count_running = GNUNET_TESTING_daemons_running(pg);
+  if (count_running > churn_array[current_churn_round])
+    churn_down = count_running - churn_array[current_churn_round];
+  else if (count_running < churn_array[current_churn_round])
+    churn_up = churn_array[current_churn_round] - count_running;
+  else
+    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Not churning any peers, topology unchanged.\n");
+
+  if (churn_up > num_peers - count_running)
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn file specified %u peers (up); only have %u!", churn_array[current_churn_round], num_peers);
+      churn_up = num_peers - count_running;
+    }
+  else if (churn_down > count_running)
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Churn file specified %u peers (down); only have %u!", churn_array[current_churn_round], count_running);
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "This will leave NO peers running (mistake in churn configuration?)!");
+      churn_down = count_running;
+    }
+  timeout = GNUNET_TIME_relative_multiply(seconds_per_peer_start, churn_up > 0 ? churn_up : churn_down);
+
+  find_peer_context = NULL;
+  if (churn_up > 0) /* Only need to do find peer requests if we turned new peers on */
+    {
+      find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+      find_peer_context->count_peers_cb = &count_peers_churn_cb;
+      find_peer_context->previous_peers = 0;
+      find_peer_context->current_peers = 0;
+      find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(timeout);
+    }
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "churn_peers: want %u total, %u running, starting %u, stopping %u\n",
+             churn_array[current_churn_round], count_running, churn_up, churn_down);
+  GNUNET_TESTING_daemons_churn(pg, churn_down, churn_up, timeout, &churn_complete, find_peer_context);
+}
+
 /**
  * Task to release DHT handle associated with GET request.
  */
@@ -837,22 +1467,78 @@ get_stop_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   GNUNET_DHT_disconnect(test_get->dht_handle);
   test_get->dht_handle = NULL;
 
+  /* Reset the uid (which item to search for) and the daemon (which peer to search from) for later get request iterations */
+  test_get->uid = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_puts);
+  test_get->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
+
 #if VERBOSE > 1
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d gets succeeded, %d gets failed!\n", gets_completed, gets_failed);
 #endif
   update_meter(get_meter);
   if ((gets_completed + gets_failed == num_gets) && (outstanding_gets == 0))
     {
+      fprintf(stderr, "Canceling die task (get_stop_finished) %llu gets completed, %llu gets failed\n", gets_completed, gets_failed);
       GNUNET_SCHEDULER_cancel(sched, die_task);
-      //GNUNET_SCHEDULER_add_now(sched, &finish_testing, NULL);
-      if (dhtlog_handle != NULL)
+      reset_meter(put_meter);
+      reset_meter(get_meter);
+      /**
+       *  Handle all cases:
+       *    1) Testing is completely finished, call the topology iteration dealy and die
+       *    2) Testing is not finished, churn the network and do gets again (current_churn_round < churn_rounds)
+       *    3) Testing is not finished, reschedule all the PUTS *and* GETS again (num_rounds > 1)
+       */
+      if (rounds_finished == total_rounds - 1) /* Everything is finished, end testing */
         {
-          topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
-          topo_ctx->cont = &log_dht_statistics;
-          GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
+          if (dhtlog_handle != NULL)
+            {
+              topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+              topo_ctx->cont = &log_dht_statistics;
+              GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
+            }
+          else
+            GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL);
+        }
+      else if (current_churn_round < churns_per_round * (rounds_finished + 1)) /* Do next round of churn */
+        {
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Current churn round %u, real round %u, scheduling next round of churn.\n", current_churn_round, rounds_finished + 1);
+          gets_completed = 0;
+          gets_failed = 0;
+          current_churn_round++;
+
+          if (dhtlog_handle != NULL)
+            dhtlog_handle->insert_round(DHT_ROUND_CHURN, rounds_finished);
+
+          GNUNET_SCHEDULER_add_now(sched, &churn_peers, NULL);
+        }
+      else if (rounds_finished < total_rounds - 1) /* Start a new complete round */
+        {
+          rounds_finished++;
+          gets_completed = 0;
+          gets_failed = 0;
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Round %u of %llu finished, scheduling next round.\n", rounds_finished, total_rounds);
+          /** Make sure we only get here after churning appropriately */
+          GNUNET_assert(current_churn_round == churn_rounds);
+          current_churn_round = 0;
+
+          /** We reset the peer daemon for puts and gets on each disconnect, so all we need to do is start another round! */
+          if (GNUNET_YES == in_dht_replication) /* Replication done in DHT, don't redo puts! */
+            {
+              if (dhtlog_handle != NULL)
+                dhtlog_handle->insert_round(DHT_ROUND_GET, rounds_finished);
+
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
+                                                       &end_badly, "from do gets (next round)");
+              GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), &do_get, all_gets);
+            }
+          else
+            {
+              if (dhtlog_handle != NULL)
+                dhtlog_handle->insert_round(DHT_ROUND_NORMAL, rounds_finished);
+              die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, num_puts * 2)),
+                                                       &end_badly, "from do puts");
+              GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, round_delay), &do_put, all_puts);
+            }
         }
-      else
-        GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL);
     }
 }
 
@@ -890,18 +1576,14 @@ void get_result_iterator (void *cls,
                           const void *data)
 {
   struct TestGetContext *test_get = cls;
-  GNUNET_HashCode search_key; /* Key stored under */
-  char original_data[test_data_size]; /* Made up data to store */
-
-  memset(original_data, test_get->uid, sizeof(original_data));
-  GNUNET_CRYPTO_hash(original_data, test_data_size, &search_key);
 
   if (test_get->succeeded == GNUNET_YES)
     return; /* Get has already been successful, probably ending now */
 
-  if ((0 != memcmp(&search_key, key, sizeof (GNUNET_HashCode))) || (0 != memcmp(original_data, data, sizeof(original_data))))
+  if (0 != memcmp(&known_keys[test_get->uid], key, sizeof (GNUNET_HashCode))) /* || (0 != memcmp(original_data, data, sizeof(original_data))))*/
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Key or data is not the same as was inserted!\n");
+      gets_completed++;
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Key or data is not the same as was inserted!\n");
     }
   else
     {
@@ -933,8 +1615,6 @@ static void
 do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   struct TestGetContext *test_get = cls;
-  GNUNET_HashCode key; /* Made up key to store data under */
-  char data[test_data_size]; /* Made up data to store */
 
   if (num_gets == 0)
     {
@@ -944,23 +1624,26 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   if (test_get == NULL)
     return; /* End of the list */
 
-  memset(data, test_get->uid, sizeof(data));
-  GNUNET_CRYPTO_hash(data, test_data_size, &key);
+  /* Set this here in case we are re-running gets */
+  test_get->succeeded = GNUNET_NO;
 
+  /* Check if more gets are outstanding than should be */
   if (outstanding_gets > max_outstanding_gets)
     {
-      GNUNET_SCHEDULER_add_delayed (sched, get_delay, &do_get, test_get);
+      GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 200), &do_get, test_get);
       return;
     }
 
+  /* Connect to the first peer's DHT */
   test_get->dht_handle = GNUNET_DHT_connect(sched, test_get->daemon->cfg, 10);
-  /* Insert the data at the first peer */
   GNUNET_assert(test_get->dht_handle != NULL);
   outstanding_gets++;
+
+  /* Insert the data at the first peer */
   test_get->get_handle = GNUNET_DHT_get_start(test_get->dht_handle,
-                                              GNUNET_TIME_relative_get_forever(),
+                                              get_delay,
                                               1,
-                                              &key,
+                                              &known_keys[test_get->uid],
                                               &get_result_iterator,
                                               test_get,
                                               &get_continuation,
@@ -971,6 +1654,8 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
              test_get->daemon->shortname);
 #endif
   test_get->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, get_timeout, &get_stop_task, test_get);
+
+  /* Schedule the next request in the linked list of get requests */
   GNUNET_SCHEDULER_add_now (sched, &do_get, test_get->next);
 }
 
@@ -989,6 +1674,10 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
     fprintf(stderr, "PUT Request failed!\n");
 
+  /* Reset the daemon (which peer to insert at) for later put request iterations */
+  if (replicate_same == GNUNET_NO)
+    test_put->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
+
   GNUNET_SCHEDULER_cancel(sched, test_put->disconnect_task);
   test_put->disconnect_task = GNUNET_SCHEDULER_add_now(sched, &put_disconnect_task, test_put);
   if (GNUNET_YES == update_meter(put_meter))
@@ -1002,15 +1691,15 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
           topo_ctx->cls = all_gets;
           topo_ctx->timeout = DEFAULT_GET_TIMEOUT;
           die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout), DEFAULT_TOPOLOGY_CAPTURE_TIMEOUT),
-                                                   &end_badly, "from do gets");
+                                                   &end_badly, "from do gets (put finished)");
           GNUNET_SCHEDULER_add_now(sched, &capture_current_topology, topo_ctx);
         }
       else
         {
+          fprintf(stderr, "Scheduling die task (put finished)\n");
           die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_add(DEFAULT_GET_TIMEOUT, all_get_timeout),
-                                                       &end_badly, "from do gets");
+                                                   &end_badly, "from do gets (put finished)");
           GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_GET_TIMEOUT, &do_get, all_gets);
-          GNUNET_SCHEDULER_add_now (sched, &finish_testing, NULL);
         }
       return;
     }
@@ -1023,97 +1712,44 @@ static void
 do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   struct TestPutContext *test_put = cls;
-  GNUNET_HashCode key; /* Made up key to store data under */
   char data[test_data_size]; /* Made up data to store */
   uint32_t rand;
+  int i;
 
   if (test_put == NULL)
     return; /* End of list */
 
-  memset(data, test_put->uid, sizeof(data));
-  GNUNET_CRYPTO_hash(data, test_data_size, &key);
-
-  if (outstanding_puts > max_outstanding_puts)
-    {
-      GNUNET_SCHEDULER_add_delayed (sched, put_delay, &do_put, test_put);
-      return;
-    }
-
-#if VERBOSE > 1
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting put for uid %u from peer %s\n",
-               test_put->uid,
-               test_put->daemon->shortname);
-#endif
-  test_put->dht_handle = GNUNET_DHT_connect(sched, test_put->daemon->cfg, 10);
-
-  GNUNET_assert(test_put->dht_handle != NULL);
-  outstanding_puts++;
-  GNUNET_DHT_put(test_put->dht_handle,
-                 &key,
-                 1,
-                 sizeof(data), data,
-                 GNUNET_TIME_absolute_get_forever(),
-                 GNUNET_TIME_relative_get_forever(),
-                 &put_finished, test_put);
-  test_put->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_get_forever(), &put_disconnect_task, test_put);
-  rand = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 2);
-  GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, test_put->next);
-}
-
-/**
- * Context for sending out find peer requests.
- */
-struct FindPeerContext
-{
-  /**
-   * How long to send find peer requests, once the settle time
-   * is over don't send any more out!
-   *
-   * TODO: Add option for settle time and find peer sending time?
-   */
-  struct GNUNET_TIME_Absolute endtime;
-
-  /**
-   * Number of connections in the current topology
-   * (after this round of find peer requests has ended).
-   */
-  unsigned int current_peers;
-
-  /**
-   * Number of connections in the current topology
-   * (before this round of find peer requests started).
-   */
-  unsigned int previous_peers;
-
-  /**
-   * Number of find peer requests we have currently
-   * outstanding.
-   */
-  unsigned int outstanding;
-
-  /**
-   * Number of find peer requests to send in this round.
-   */
-  unsigned int total;
-
-  /**
-   * Number of find peer requests sent last time around.
-   */
-  unsigned int last_sent;
+  for (i = 0; i < sizeof(data); i++)
+    {
+      memset(&data[i], GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1), 1);
+    }
 
-  /**
-   * Hashmap of peers in the current topology, value
-   * is a PeerCount, with the number of connections
-   * this peer has.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *peer_hash;
+  if (outstanding_puts > max_outstanding_puts)
+    {
+      GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 200), &do_put, test_put);
+      return;
+    }
 
-  /**
-   * Min heap which orders values in the peer_hash for
-   * easy lookup.
-   */
-  struct GNUNET_CONTAINER_Heap *peer_min_heap;
-};
+#if VERBOSE > 1
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting put for uid %u from peer %s\n",
+                test_put->uid,
+                test_put->daemon->shortname);
+#endif
+  test_put->dht_handle = GNUNET_DHT_connect(sched, test_put->daemon->cfg, 10);
+
+  GNUNET_assert(test_put->dht_handle != NULL);
+  outstanding_puts++;
+  GNUNET_DHT_put(test_put->dht_handle,
+                 &known_keys[test_put->uid],
+                 1,
+                 sizeof(data), data,
+                 GNUNET_TIME_absolute_get_forever(),
+                 put_delay,
+                 &put_finished, test_put);
+  test_put->disconnect_task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_get_forever(), &put_disconnect_task, test_put);
+  rand = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 2);
+  GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, test_put->next);
+}
 
 static void
 schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc);
@@ -1139,61 +1775,6 @@ static unsigned int connection_estimate(unsigned int peer_count, unsigned int bu
 
 }
 
-struct PeerCount
-{
-  /** Node in the heap */
-  struct GNUNET_CONTAINER_HeapNode *heap_node;
-
-  /** Peer the count refers to */
-  struct GNUNET_PeerIdentity peer_id;
-
-  /** Count of connections this peer has */
-  unsigned int count;
-};
-
-
-/**
- * Add a connection to the find_peer_context given.  This may
- * be complete overkill, but allows us to choose the peers with
- * the least connections to initiate find peer requests from.
- */
-static void add_new_connection(struct FindPeerContext *find_peer_context,
-                               const struct GNUNET_PeerIdentity *first,
-                               const struct GNUNET_PeerIdentity *second)
-{
-  struct PeerCount *first_count;
-  struct PeerCount *second_count;
-
-  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &first->hashPubKey))
-  {
-    first_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &first->hashPubKey);
-    first_count->count++;
-    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, first_count->heap_node, first_count->count);
-  }
-  else
-  {
-    first_count = GNUNET_malloc(sizeof(struct PeerCount));
-    first_count->count = 1;
-    memcpy(&first_count->peer_id, first, sizeof(struct GNUNET_PeerIdentity));
-    first_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, first_count, first_count->count);
-    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &first->hashPubKey, first_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  }
-
-  if (GNUNET_CONTAINER_multihashmap_contains(find_peer_context->peer_hash, &second->hashPubKey))
-  {
-    second_count = GNUNET_CONTAINER_multihashmap_get(find_peer_context->peer_hash, &second->hashPubKey);
-    second_count->count++;
-    GNUNET_CONTAINER_heap_update_cost(find_peer_context->peer_min_heap, second_count->heap_node, second_count->count);
-  }
-  else
-  {
-    second_count = GNUNET_malloc(sizeof(struct PeerCount));
-    second_count->count = 1;
-    memcpy(&second_count->peer_id, second, sizeof(struct GNUNET_PeerIdentity));
-    second_count->heap_node = GNUNET_CONTAINER_heap_insert(find_peer_context->peer_min_heap, second_count, second_count->count);
-    GNUNET_CONTAINER_multihashmap_put(find_peer_context->peer_hash, &second->hashPubKey, second_count, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  }
-}
 
 /**
  * Callback for iterating over all the peer connections of a peer group.
@@ -1213,112 +1794,29 @@ void count_peers_cb (void *cls,
     }
   else
     {
-      GNUNET_assert(dhtlog_handle != NULL);
-      /*GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections), %u new peers, connection estimate %u\n", find_peer_context->current_peers, find_peer_context->current_peers - find_peer_context->previous_peers, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));*/
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Peer count finished (%u connections), %u new peers, connection estimate %u (double %u)\n",
+                                            find_peer_context->current_peers,
+                                            find_peer_context->current_peers - find_peer_context->previous_peers,
+                                            connection_estimate(num_peers, DEFAULT_BUCKET_SIZE),
+                                            2 * connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
+
       if ((find_peer_context->current_peers - find_peer_context->previous_peers > FIND_PEER_THRESHOLD) &&
-          (find_peer_context->current_peers < connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) &&
+          (find_peer_context->current_peers < 2 * connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) &&
           (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
         {
           GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context);
         }
       else
         {
+          GNUNET_CONTAINER_multihashmap_iterate(find_peer_context->peer_hash, &remove_peer_count, find_peer_context);
+          GNUNET_CONTAINER_multihashmap_destroy(find_peer_context->peer_hash);
+          GNUNET_CONTAINER_heap_destroy(find_peer_context->peer_min_heap);
+          GNUNET_free(find_peer_context);
           fprintf(stderr, "Not sending any more find peer requests.\n");
         }
     }
 }
 
-/**
- * Connect to all peers in the peer group and iterate over their
- * connections.
- */
-static void
-count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
-{
-  struct FindPeerContext *find_peer_context = cls;
-  find_peer_context->previous_peers = find_peer_context->current_peers;
-  find_peer_context->current_peers = 0;
-  GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context);
-}
-
-
-static void
-decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
-{
-  struct TestFindPeer *test_find_peer = cls;
-  GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
-  test_find_peer->find_peer_context->outstanding--;
-  test_find_peer->find_peer_context->total--;
-  if ((0 == test_find_peer->find_peer_context->total) &&
-      (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value > 0))
-  {
-    GNUNET_SCHEDULER_add_now(sched, &count_new_peers, test_find_peer->find_peer_context);
-  }
-  GNUNET_free(test_find_peer);
-}
-
-/**
- * A find peer request has been sent to the server, now we will schedule a task
- * to wait the appropriate time to allow the request to go out and back.
- *
- * @param cls closure - a TestFindPeer struct
- * @param tc context the task is being called with
- */
-static void
-handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
-{
-  struct TestFindPeer *test_find_peer = cls;
-
-  GNUNET_DHT_disconnect(test_find_peer->dht_handle);
-  GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_divide(find_peer_delay, 2), &decrement_find_peers, test_find_peer);
-}
-
-static void
-send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
-{
-  struct TestFindPeer *test_find_peer = cls;
-
-  if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers)
-  {
-    GNUNET_SCHEDULER_add_delayed(sched, find_peer_offset, &send_find_peer_request, test_find_peer);
-    return;
-  }
-
-  test_find_peer->find_peer_context->outstanding++;
-  if (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value == 0)
-  {
-    GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
-    return;
-  }
-
-  test_find_peer->dht_handle = GNUNET_DHT_connect(sched, test_find_peer->daemon->cfg, 1);
-  GNUNET_assert(test_find_peer->dht_handle != NULL);
-  GNUNET_DHT_find_peers (test_find_peer->dht_handle,
-                         &handle_find_peer_sent, test_find_peer);
-}
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int remove_peer_count (void *cls,
-                              const GNUNET_HashCode * key,
-                              void *value)
-{
-  struct FindPeerContext *find_peer_ctx = cls;
-  struct PeerCount *peer_count = value;
-  GNUNET_CONTAINER_heap_remove_node(find_peer_ctx->peer_min_heap, peer_count->heap_node);
-  GNUNET_free(peer_count);
-
-  return GNUNET_YES;
-}
-
 
 /**
  * Set up a single find peer request for each peer in the topology.  Do this
@@ -1352,7 +1850,7 @@ schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContex
     find_peer_ctx->total = max_outstanding_find_peers;
 
   find_peer_ctx->last_sent = find_peer_ctx->total;
-  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal %u connections)\n", find_peer_ctx->total, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending %u find peer messages (goal at least %u connections)\n", find_peer_ctx->total, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
 
   find_peer_offset = GNUNET_TIME_relative_divide(find_peer_delay, find_peer_ctx->total);
   for (i = 0; i < find_peer_ctx->total; i++)
@@ -1420,13 +1918,16 @@ setup_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   uint32_t temp_daemon;
   struct TestPutContext *test_put;
   struct TestGetContext *test_get;
+#if REMEMBER
   int remember[num_puts][num_peers];
-
   memset(&remember, 0, sizeof(int) * num_puts * num_peers);
+#endif
+  known_keys = GNUNET_malloc(sizeof(GNUNET_HashCode) * num_puts);
   for (i = 0; i < num_puts; i++)
     {
       test_put = GNUNET_malloc(sizeof(struct TestPutContext));
       test_put->uid = i;
+      GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &known_keys[i]);
       temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
       test_put->daemon = GNUNET_TESTING_daemon_get(pg, temp_daemon);
       test_put->next = all_puts;
@@ -1437,11 +1938,12 @@ setup_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
     {
       test_get = GNUNET_malloc(sizeof(struct TestGetContext));
       test_get->uid = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_puts);
-      temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+#if REMEMBER
       while (remember[test_get->uid][temp_daemon] == 1)
         temp_daemon = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
-      test_get->daemon = GNUNET_TESTING_daemon_get(pg, temp_daemon);
       remember[test_get->uid][temp_daemon] = 1;
+#endif
+      test_get->daemon = GNUNET_TESTING_daemon_get(pg, GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers));
       test_get->next = all_gets;
       all_gets = test_get;
     }
@@ -1488,6 +1990,7 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t
   if (GNUNET_YES == do_find_peer)
   {
     find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+    find_peer_context->count_peers_cb = &count_peers_cb;
     find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time));
     GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context);
   }
@@ -1627,20 +2130,22 @@ setup_malicious_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc
       GNUNET_SCHEDULER_add_now (sched, &set_malicious, ctx);
     }
 
+  /**
+   * If we have any malicious peers to set up,
+   * the malicious callback should call continue_gets_and_puts
+   */
   if (malicious_getters + malicious_putters + malicious_droppers > 0)
-    die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (malicious_getters + malicious_putters + malicious_droppers) * 2),
-                                             &end_badly, "from set malicious");
-  else
     {
-      if (dhtlog_handle != NULL)
-        GNUNET_SCHEDULER_add_now (sched,
-                                  &continue_puts_and_gets, NULL);
-      else
-        GNUNET_SCHEDULER_add_delayed (sched,
-                                    GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time),
-                                    &continue_puts_and_gets, NULL);
+      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Giving malicious set tasks some time before starting testing!\n");
+      die_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (malicious_getters + malicious_putters + malicious_droppers) * 2),
+                                               &end_badly, "from set malicious");
+    }
+  else /* Otherwise, continue testing */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Scheduling continue_puts_and_gets now!\n");
+      GNUNET_SCHEDULER_add_now (sched,
+                                &continue_puts_and_gets, NULL);
     }
-
 }
 
 /**
@@ -1674,15 +2179,16 @@ topology_callback (void *cls,
                  distance);
 #endif
     }
-#if VERBOSE
   else
     {
       failed_connections++;
+#if VERBOSE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to connect peer %s to peer %s with error :\n%s\n",
                   first_daemon->shortname,
                   second_daemon->shortname, emsg);
-    }
 #endif
+    }
+
   GNUNET_assert(peer_connect_meter != NULL);
   if (GNUNET_YES == update_meter(peer_connect_meter))
     {
@@ -1855,6 +2361,7 @@ run (void *cls,
   struct GNUNET_DHTLOG_TrialInfo trial_info;
   struct GNUNET_TESTING_Host *hosts;
   struct GNUNET_TESTING_Host *temphost;
+  struct GNUNET_TESTING_Host *tempnext;
   char *topology_str;
   char *connect_topology_str;
   char *blacklist_topology_str;
@@ -1872,10 +2379,15 @@ run (void *cls,
   int strict_kademlia;
   char *buf;
   char *data;
+  char *churn_data;
+  char *churn_filename;
   int count;
+  int ret;
+  unsigned int line_number;
 
   sched = s;
   config = cfg;
+  rounds_finished = 0;
   memset(&trial_info, 0, sizeof(struct GNUNET_DHTLOG_TrialInfo));
   /* Get path from configuration file */
   if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_string(cfg, "paths", "servicehome", &test_directory))
@@ -1917,6 +2429,100 @@ run (void *cls,
                                              &trialmessage))
     trialmessage = NULL;
 
+  churn_data = NULL;
+  /** Check for a churn file to do churny simulation */
+  if (GNUNET_OK ==
+      GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "churn_file",
+                                            &churn_filename))
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Reading churn data from %s\n", churn_filename);
+      if (GNUNET_OK != GNUNET_DISK_file_test (churn_filename))
+        {
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading churn file!\n");
+          return;
+        }
+      if ((0 != STAT (churn_filename, &frstat)) || (frstat.st_size == 0))
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                      "Could not open file specified for churn data, ending test!");
+          ok = 1119;
+          GNUNET_free_non_null(trialmessage);
+          GNUNET_free(churn_filename);
+          return;
+        }
+
+      churn_data = GNUNET_malloc_large (frstat.st_size);
+      GNUNET_assert(churn_data != NULL);
+      if (frstat.st_size !=
+          GNUNET_DISK_fn_read (churn_filename, churn_data, frstat.st_size))
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                    "Could not read file %s specified for churn, ending test!", churn_filename);
+          GNUNET_free (churn_filename);
+          GNUNET_free (churn_data);
+          GNUNET_free_non_null(trialmessage);
+          return;
+        }
+
+      GNUNET_free_non_null(churn_filename);
+
+      buf = churn_data;
+      count = 0;
+      /* Read the first line */
+      while (count < frstat.st_size)
+        {
+          count++;
+          if (((churn_data[count] == '\n')) && (buf != &churn_data[count]))
+            {
+              churn_data[count] = '\0';
+              if (1 != sscanf(buf, "%u", &churn_rounds))
+                {
+                  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to read number of rounds from %s, ending test!\n", churn_filename);
+                  GNUNET_free_non_null(trialmessage);
+                  GNUNET_free(churn_filename);
+                  ret = 4200;
+                  return;
+                }
+              GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Read %u rounds from churn file\n", churn_rounds);
+              buf = &churn_data[count + 1];
+              churn_array = GNUNET_malloc(sizeof(unsigned int) * churn_rounds);
+            }
+        }
+
+      if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number(cfg, "dht_testing", "churns_per_round", &churns_per_round))
+        {
+          churns_per_round = (unsigned long long)churn_rounds;
+        }
+
+      line_number = 0;
+      while ((count < frstat.st_size) && (line_number < churn_rounds))
+        {
+          count++;
+          if (((churn_data[count] == '\n')) && (buf != &churn_data[count]))
+            {
+              churn_data[count] = '\0';
+
+              ret = sscanf(buf, "%u", &churn_array[line_number]);
+              if (1 == ret)
+                {
+                  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Read %u peers in round %u\n", churn_array[line_number], line_number);
+                  line_number++;
+                }
+              else
+                {
+                  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading line `%s' in hostfile\n", buf);
+                  buf = &churn_data[count + 1];
+                  continue;
+                }
+              buf = &churn_data[count + 1];
+            }
+          else if (churn_data[count] == '\n') /* Blank line */
+            buf = &churn_data[count + 1];
+        }
+    }
+  GNUNET_free_non_null(churn_data);
+
+  /** Check for a hostfile containing user@host:port triples */
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_string (cfg, "testing", "hostfile",
                                              &hostfile))
@@ -1959,11 +2565,24 @@ run (void *cls,
     while (count < frstat.st_size)
       {
         count++;
-        if (((data[count] == '\n') || (data[count] == '\0')) && (buf != &data[count]))
+        /* if (((data[count] == '\n') || (data[count] == '\0')) && (buf != &data[count]))*/
+        if (((data[count] == '\n')) && (buf != &data[count]))
           {
             data[count] = '\0';
             temphost = GNUNET_malloc(sizeof(struct GNUNET_TESTING_Host));
-            temphost->hostname = buf;
+            ret = sscanf(buf, "%a[a-zA-Z0-9]@%a[a-zA-Z0-9.]:%hd", &temphost->username, &temphost->hostname, &temphost->port);
+            if (3 == ret)
+              {
+                GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Successfully read host %s, port %d and user %s from file\n", temphost->hostname, temphost->port, temphost->username);
+              }
+            else
+              {
+                GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Error reading line `%s' in hostfile\n", buf);
+                GNUNET_free(temphost);
+                buf = &data[count + 1];
+                continue;
+              }
+            /* temphost->hostname = buf; */
             temphost->next = hosts;
             hosts = temphost;
             buf = &data[count + 1];
@@ -2043,7 +2662,7 @@ run (void *cls,
                                                &temp_config_number))
     all_get_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, temp_config_number);
   else
-    all_get_timeout.value = get_timeout.value * ((num_gets / max_outstanding_gets) + 1);
+    all_get_timeout.value = get_timeout.value * num_gets;
 
   if (GNUNET_OK ==
         GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "get_delay",
@@ -2076,6 +2695,10 @@ run (void *cls,
   /**
    * Get testing related options.
    */
+  if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "DHT_TESTING", "REPLICATE_SAME"))
+    {
+      replicate_same = GNUNET_YES;
+    }
 
   if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING",
                                                           "MALICIOUS_GET_FREQUENCY",
@@ -2088,14 +2711,25 @@ run (void *cls,
                                                           &malicious_put_frequency))
     malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
 
+
+  /* The normal behavior of the DHT is to do find peer requests
+   * on its own.  Only if this is explicitly turned off should
+   * the testing driver issue find peer requests (even though
+   * this is likely the default when testing).
+   */
   if (GNUNET_NO ==
         GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
-                                             "find_peers"))
+                                             "do_find_peer"))
     {
-      do_find_peer = GNUNET_NO;
+      do_find_peer = GNUNET_YES;
+    }
+
+  if (GNUNET_YES ==
+        GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+                                             "republish"))
+    {
+      in_dht_replication = GNUNET_YES;
     }
-  else
-    do_find_peer = GNUNET_YES;
 
   if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING",
                                                           "TRIAL_TO_RUN",
@@ -2113,6 +2747,9 @@ run (void *cls,
   else
     find_peer_delay = DEFAULT_FIND_PEER_DELAY;
 
+  if (GNUNET_YES != GNUNET_CONFIGURATION_get_value_number(cfg, "DHT_TESTING", "ROUND_DELAY", &round_delay))
+    round_delay = 0;
+
   if (GNUNET_NO == GNUNET_CONFIGURATION_get_value_number (cfg, "DHT_TESTING",
                                                             "OUTSTANDING_FIND_PEERS",
                                                             &max_outstanding_find_peers))
@@ -2123,6 +2760,13 @@ run (void *cls,
 
   find_peer_offset = GNUNET_TIME_relative_divide (find_peer_delay, max_outstanding_find_peers);
 
+  if (GNUNET_SYSERR ==
+        GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "num_rounds",
+                                               &total_rounds))
+    {
+      total_rounds = 1;
+    }
+
   topology_str = NULL;
   if ((GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology",
@@ -2214,6 +2858,7 @@ run (void *cls,
   /* Set peers_left so we know when all peers started */
   peers_left = num_peers;
 
+
   /* Set up a task to end testing if peer start fails */
   die_task = GNUNET_SCHEDULER_add_delayed (sched,
                                            GNUNET_TIME_relative_multiply(seconds_per_peer_start, num_peers),
@@ -2268,8 +2913,15 @@ run (void *cls,
                                      &peers_started_callback, NULL,
                                      &topology_callback, NULL,
                                      hosts);
-
-  GNUNET_free_non_null(temphost);
+  temphost = hosts;
+  while (temphost != NULL)
+    {
+      tempnext = temphost->next;
+      GNUNET_free (temphost->username);
+      GNUNET_free (temphost->hostname);
+      GNUNET_free (temphost);
+      temphost = tempnext;
+    }
 }