assorted dht changes, fixes, etc.
authorNathan S. Evans <evans@in.tum.de>
Mon, 30 Aug 2010 16:33:42 +0000 (16:33 +0000)
committerNathan S. Evans <evans@in.tum.de>
Mon, 30 Aug 2010 16:33:42 +0000 (16:33 +0000)
src/dht/dht.h
src/dht/dht_api.c
src/dht/gnunet-dht-driver.c
src/dht/gnunet-service-dht.c

index 0080bec8fb4ae85b3517197be7d4e43705bf5d32..c73a07a66c2b482c7b1f28d8e6ee22d40401cb0f 100644 (file)
@@ -61,6 +61,8 @@
 #define STAT_GET_RESPONSE_START "# DHT GET Responses Initiated"
 #define STAT_HELLOS_PROVIDED "# HELLO Messages given to transport"
 #define STAT_DISCONNECTS "# Disconnects received"
+#define STAT_DUPLICATE_UID "# Duplicate UID's encountered (bad if any!)"
+#define STAT_RECENT_SEEN "# recent requests seen again (routing loops, alternate paths)"
 
 typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls,
                                                    const struct GNUNET_MessageHeader
index cb2adf978498c8d241e400dd811ebda4217fc310..4d671148d3f88beb7f8f2435ab94e16fd510ddc4 100644 (file)
@@ -871,6 +871,55 @@ int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequ
   return GNUNET_YES;
 }
 
+/**
+ * Send a message to the DHT telling it to issue a single find
+ * peer request using the peers unique identifier as key.  This
+ * is used to fill the routing table, and is normally controlled
+ * by the DHT itself.  However, for testing and perhaps more
+ * close control over the DHT, this can be explicitly managed.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
+                           GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
 /**
  * Send a message to the DHT telling it to start issuing random PUT
  * requests every 'frequency' milliseconds.
index 78e6850f42891392e976fbdddb2a25da385223b5..db7bf215942c73692a70dae93d3ec6d43234d7c3 100644 (file)
 /* Timeout for waiting for puts to be sent to the service */
 #define DEFAULT_PUT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 10)
 
+/* Timeout for waiting for puts to be sent to the service */
+#define DEFAULT_FIND_PEER_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 40)
+
 #define DEFAULT_SECONDS_PER_PEER_START GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 45)
 
 #define DEFAULT_TEST_DATA_SIZE 8
 
+#define DEFAULT_BUCKET_SIZE 4
+
+#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2
+
 #define DEFAULT_MAX_OUTSTANDING_PUTS 10
 
+#define DEFAULT_MAX_OUTSTANDING_FIND_PEERS 1
+
 #define DEFAULT_MAX_OUTSTANDING_GETS 10
 
 #define DEFAULT_CONNECT_TIMEOUT 60
@@ -97,6 +106,30 @@ struct MaliciousContext
   int malicious_type;
 };
 
+struct TestFindPeer
+{
+  /* This is a linked list */
+  struct TestFindPeer *next;
+
+  /* Handle to the bigger context */
+  struct FindPeerContext *find_peer_context;
+
+  /**
+   * Handle to the peer's DHT service (via the API)
+   */
+  struct GNUNET_DHT_Handle *dht_handle;
+
+  /**
+   *  Handle to the peer daemon
+   */
+  struct GNUNET_TESTING_Daemon *daemon;
+
+  /**
+   * Task for disconnecting DHT handles
+   */
+  GNUNET_SCHEDULER_TaskIdentifier disconnect_task;
+};
+
 struct TestPutContext
 {
   /* This is a linked list */
@@ -232,8 +265,12 @@ static struct GNUNET_TIME_Relative get_delay;
 
 static struct GNUNET_TIME_Relative put_delay;
 
+static struct GNUNET_TIME_Relative find_peer_delay;
+
 static struct GNUNET_TIME_Relative seconds_per_peer_start;
 
+static int do_find_peer;
+
 static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE;
 
 static unsigned long long max_outstanding_puts = DEFAULT_MAX_OUTSTANDING_PUTS;
@@ -242,6 +279,8 @@ static unsigned long long max_outstanding_gets = DEFAULT_MAX_OUTSTANDING_GETS;
 
 static unsigned long long malicious_getters;
 
+static unsigned long long max_outstanding_find_peers;
+
 static unsigned long long malicious_putters;
 
 static unsigned long long malicious_droppers;
@@ -651,6 +690,7 @@ static int stats_handle  (void *cls,
       stats_ctx->peer = peer;
       GNUNET_CONTAINER_multihashmap_put(stats_map, &peer->hashPubKey, stats_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
     }
+  GNUNET_assert(stats_ctx != NULL);
 
   if (strcmp(name, STAT_ROUTES) == 0)
     stats_ctx->stat_routes = value;
@@ -696,6 +736,7 @@ static void
 log_dht_statistics (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   stats_map = GNUNET_CONTAINER_multihashmap_create(num_peers);
+  fprintf(stderr, "Starting statistics logging\n");
   GNUNET_TESTING_get_statistics(pg, &stats_finished, &stats_handle, NULL);
 }
 
@@ -1005,6 +1046,169 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
   GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, test_put->next);
 }
 
+/**
+ * Context for sending out find peer requests.
+ */
+struct FindPeerContext
+{
+  struct GNUNET_DHT_Handle *dht_handle;
+  struct GNUNET_TIME_Absolute endtime;
+  unsigned int current_peers;
+  unsigned int previous_peers;
+  unsigned int outstanding;
+  unsigned int total;
+};
+
+static void
+schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc);
+
+/**
+ * Given a number of total peers and a bucket size, estimate the number of
+ * connections in a perfect kademlia topology.
+ */
+static unsigned int connection_estimate(unsigned int peer_count, unsigned int bucket_size)
+{
+  unsigned int i;
+  unsigned int filled;
+  i = num_peers;
+
+  filled = 0;
+  while (i > bucket_size)
+    {
+      filled++;
+      i = i/2;
+    }
+  return filled * bucket_size * peer_count;
+
+}
+
+/**
+ * Callback for iterating over all the peer connections of a peer group.
+ */
+void count_peers_cb (void *cls,
+                      const struct GNUNET_PeerIdentity *first,
+                      const struct GNUNET_PeerIdentity *second,
+                      struct GNUNET_TIME_Relative latency,
+                      uint32_t distance,
+                      const char *emsg)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  if ((first != NULL) && (second != NULL))
+    {
+      find_peer_context->current_peers++;
+    }
+  else
+    {
+      GNUNET_assert(dhtlog_handle != NULL);
+      fprintf(stderr, "peer count finished (%u connections), %u new peers, connection estimate %u\n", find_peer_context->current_peers, find_peer_context->current_peers - find_peer_context->previous_peers, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE));
+      if ((find_peer_context->current_peers - find_peer_context->previous_peers > FIND_PEER_THRESHOLD) &&
+          (find_peer_context->current_peers < connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) &&
+          (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0))
+        {
+          fprintf(stderr, "Scheduling another round of find peer requests.\n");
+          GNUNET_SCHEDULER_add_now(sched, schedule_find_peer_requests, find_peer_context);
+        }
+      else
+        {
+          fprintf(stderr, "Not sending any more find peer requests.\n");
+        }
+    }
+}
+
+/**
+ * Connect to all peers in the peer group and iterate over their
+ * connections.
+ */
+static void
+count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_context = cls;
+  find_peer_context->previous_peers = find_peer_context->current_peers;
+  find_peer_context->current_peers = 0;
+  GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context);
+}
+
+
+static void
+decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+  GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0);
+  test_find_peer->find_peer_context->outstanding--;
+  test_find_peer->find_peer_context->total--;
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%d find_peers remaining\n", test_find_peer->find_peer_context->total);
+  if ((0 == test_find_peer->find_peer_context->total) &&
+      (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value > 0))
+  {
+    GNUNET_SCHEDULER_add_now(sched, &count_new_peers, test_find_peer->find_peer_context);
+  }
+  GNUNET_free(test_find_peer);
+}
+
+/**
+ * A find peer request has been sent to the server, now we will schedule a task
+ * to wait the appropriate time to allow the request to go out and back.
+ *
+ * @param cls closure - a TestFindPeer struct
+ * @param tc context the task is being called with
+ */
+static void
+handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  GNUNET_DHT_disconnect(test_find_peer->dht_handle);
+  GNUNET_SCHEDULER_add_delayed(sched, find_peer_delay, &decrement_find_peers, test_find_peer);
+}
+
+static void
+send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct TestFindPeer *test_find_peer = cls;
+
+  if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers)
+  {
+    GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 300), &send_find_peer_request, test_find_peer);
+    return;
+  }
+
+  test_find_peer->find_peer_context->outstanding++;
+  if (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value == 0)
+  {
+    GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer);
+    return;
+  }
+
+  test_find_peer->dht_handle = GNUNET_DHT_connect(sched, test_find_peer->daemon->cfg, 1);
+  GNUNET_assert(test_find_peer->dht_handle != NULL);
+  fprintf(stderr, "calling GNUNET_DHT_find_peers\n");
+  GNUNET_DHT_find_peers (test_find_peer->dht_handle,
+                         &handle_find_peer_sent, test_find_peer);
+}
+
+/**
+ * Set up a single find peer request for each peer in the topology.  Do this
+ * until the settle time is over, limited by the number of outstanding requests
+ * and the time allowed for each one!
+ */
+static void
+schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
+{
+  struct FindPeerContext *find_peer_ctx = cls;
+  struct TestFindPeer *test_find_peer;
+  uint32_t i;
+  uint32_t random;
+
+  for (i = 0; i < max_outstanding_find_peers; i++)
+    {
+      test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer));
+      random = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+      test_find_peer->daemon  = GNUNET_TESTING_daemon_get(pg, random);
+      test_find_peer->find_peer_context = find_peer_ctx;
+      find_peer_ctx->total++;
+      GNUNET_SCHEDULER_add_now(sched, &send_find_peer_request, test_find_peer);
+    }
+}
 
 /**
  * Set up some all of the put and get operations we want
@@ -1061,6 +1265,7 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t
   int i;
   int max;
   struct TopologyIteratorContext *topo_ctx;
+  struct FindPeerContext *find_peer_context;
   if (dhtlog_handle != NULL)
     {
       if (settle_time >= 60 * 2)
@@ -1078,7 +1283,14 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t
       GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), &capture_current_topology, topo_ctx);
     }
   else
-    GNUNET_SCHEDULER_add_now (sched, &setup_puts_and_gets, NULL);
+    GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), &setup_puts_and_gets, NULL);
+
+  if (GNUNET_YES == do_find_peer)
+  {
+    find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext));
+    find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time));
+    GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context);
+  }
 }
 
 /**
@@ -1509,6 +1721,7 @@ run (void *cls,
     hostfile = NULL;
 
   hosts = NULL;
+  temphost = NULL;
   if (hostfile != NULL)
     {
       if (GNUNET_OK != GNUNET_DISK_file_test (hostfile))
@@ -1533,6 +1746,7 @@ run (void *cls,
                   "Could not read file %s specified for host list, ending test!", hostfile);
         GNUNET_free (hostfile);
         GNUNET_free (data);
+        GNUNET_free_non_null(trialmessage);
         return;
       }
 
@@ -1587,6 +1801,20 @@ run (void *cls,
                                              &num_gets))
     num_gets = num_peers;
 
+  if (GNUNET_OK ==
+        GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "find_peer_delay",
+                                               &temp_config_number))
+    find_peer_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, temp_config_number);
+  else
+    find_peer_delay = DEFAULT_FIND_PEER_DELAY;
+
+  if (GNUNET_OK ==
+        GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "concurrent_find_peers",
+                                               &temp_config_number))
+    max_outstanding_find_peers = temp_config_number;
+  else
+    max_outstanding_find_peers = DEFAULT_MAX_OUTSTANDING_FIND_PEERS;
+
   if (GNUNET_OK ==
         GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "get_timeout",
                                                &temp_config_number))
@@ -1658,6 +1886,15 @@ run (void *cls,
                                                           &malicious_put_frequency))
     malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
 
+  if (GNUNET_NO ==
+        GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+                                             "find_peers"))
+    {
+      do_find_peer = GNUNET_NO;
+    }
+  else
+    do_find_peer = GNUNET_YES;
+
   topology_str = NULL;
   if ((GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology",
@@ -1745,7 +1982,7 @@ run (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Number of peers must be specified in section %s option %s\n", topology_str, "TESTING", "NUM_PEERS");
     }
-
+  GNUNET_assert(num_peers > 0 && num_peers < (unsigned long long)-1);
   /* Set peers_left so we know when all peers started */
   peers_left = num_peers;
 
@@ -1800,6 +2037,7 @@ run (void *cls,
                                      &topology_callback, NULL,
                                      hosts);
 
+  GNUNET_free_non_null(temphost);
 }
 
 
index 98e02d2a0861caf7068a8c5cdab2d74b4ae5a21c..e28682ee45d57f60b128518212203335b25bb038 100644 (file)
  */
 #define MINIMUM_PEER_THRESHOLD 20
 
-
-
 #define DHT_MAX_RECENT 100
 
+#define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
 /**
  * Default time to wait to send messages on behalf of other peers.
  */
-#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
+#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
 
 /**
  * Default importance for handling messages on behalf of other peers.
  */
 #define DHT_DEFAULT_P2P_IMPORTANCE 0
 
+/**
+ * How long to keep recent requests arounds by default.
+ */
+#define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60)
+
 /**
  * Default time to wait to send find peer messages sent by the dht service.
  */
-#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
+#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
 
 /**
  * Default importance for find peer messages sent by the dht service.
 /**
  * Default options for find peer requests sent by the dht service.
  */
-#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE
+#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE
 
 /**
  * How long at least to wait before sending another find peer request.
 /**
  * How long at most to wait before sending another find peer request.
  */
-#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
+#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
 
 /**
  * How often to update our preference levels for peers in our routing tables.
  */
 #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
 
+/**
+ * How long at most on average will we allow a reply forward to take
+ * (before we quit sending out new requests)
+ */
+#define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
+
 /**
  * How many initial requests to send out (in true Kademlia fashion)
  */
  */
 #define MAX_HOPS 20
 
+/**
+ * How many time differences between requesting a core send and
+ * the actual callback to remember.
+ */
+#define MAX_REPLY_TIMES 8
+
 /**
  * Linked list of messages to send to clients.
  */
@@ -164,6 +181,11 @@ struct P2PPendingMessage
    */
   unsigned int importance;
 
+  /**
+   * Time when this request was scheduled to be sent.
+   */
+  struct GNUNET_TIME_Absolute scheduled;
+
   /**
    * How long to wait before sending message.
    */
@@ -251,7 +273,6 @@ struct PeerInfo
    * Task for scheduling periodic ping messages for this peer.
    */
   GNUNET_SCHEDULER_TaskIdentifier ping_task;
-
 };
 
 /**
@@ -329,7 +350,6 @@ struct ClientList
    * Tail of linked list of pending messages for this client
    */
   struct PendingMessage *pending_tail;
-
 };
 
 
@@ -353,7 +373,7 @@ struct DHT_MessageContext
   /**
    * The key this request was about
    */
-  const GNUNET_HashCode *key;
+  GNUNET_HashCode key;
 
   /**
    * The unique identifier of this request
@@ -483,6 +503,21 @@ struct DHTQueryRecord
 
 };
 
+/**
+ * Context used to calculate the number of find peer messages
+ * per X time units since our last scheduled find peer message
+ * was sent.  If we have seen too many messages, delay or don't
+ * send our own out.
+ */
+struct FindPeerMessageContext
+{
+  unsigned int count;
+
+  struct GNUNET_TIME_Absolute start;
+
+  struct GNUNET_TIME_Absolute end;
+};
+
 /**
  * DHT Routing results structure
  */
@@ -518,17 +553,49 @@ struct RecentRequests
 
 struct RecentRequest
 {
+  /**
+   * Position of this node in the min heap.
+   */
+  struct GNUNET_CONTAINER_HeapNode *heap_node;
+
+  /**
+   * Bloomfilter containing entries for peers
+   * we forwarded this request to.
+   */
+  struct GNUNET_CONTAINER_BloomFilter *bloom;
+
+  /**
+   * Timestamp of this request, for ordering
+   * the min heap.
+   */
+  struct GNUNET_TIME_Absolute timestamp;
+
+  /**
+   * Key of this request.
+   */
   GNUNET_HashCode key;
+
+  /**
+   * Unique identifier for this request.
+   */
   uint64_t uid;
-};
 
+  /**
+   * Task to remove this entry on timeout.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier remove_task;
+};
 
-#if 0
 /**
  * Recent requests by hash/uid and by time inserted.
  */
 static struct RecentRequests recent;
-#endif
+
+/**
+ * Context to use to calculate find peer rates.
+ */
+static struct FindPeerMessageContext find_peer_context;
+
 /**
  * Don't use our routing algorithm, always route
  * to closest peer; initially send requests to 3
@@ -546,6 +613,12 @@ static int stop_on_closest;
  */
 static int stop_on_found;
 
+/**
+ * Whether DHT needs to manage find peer requests, or
+ * an external force will do it on behalf of the DHT.
+ */
+static int do_find_peer;
+
 /**
  * How many peers have we added since we sent out our last
  * find peer request?
@@ -662,25 +735,113 @@ static unsigned int malicious_dropper;
  */
 static unsigned int malicious_getter;
 
-/*
+/**
  * GNUNET_YES or GNUNET_NO, whether or not to act as
  * a malicious node which sends out lots of PUTS
  */
 static unsigned int malicious_putter;
 
+/**
+ * Frequency for malicious get requests.
+ */
 static unsigned long long malicious_get_frequency;
 
+/**
+ * Frequency for malicious put requests.
+ */
 static unsigned long long malicious_put_frequency;
 
+/**
+ * Reply times for requests, if we are busy, don't send any
+ * more requests!
+ */
+static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
+
+/**
+ * Current counter for replies.
+ */
+static unsigned int reply_counter;
+
 /**
  * Forward declaration.
  */
 static size_t send_generic_reply (void *cls, size_t size, void *buf);
 
-/* Declare here so retry_core_send is aware of it */
+/** Declare here so retry_core_send is aware of it */
 size_t core_transmit_notify (void *cls,
                              size_t size, void *buf);
 
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
+static void
+hash_from_uid (uint64_t uid,
+               GNUNET_HashCode *hash)
+{
+  memset (hash, 0, sizeof(GNUNET_HashCode));
+  *((uint64_t*)hash) = uid;
+}
+
+#if AVG
+/**
+ * Calculate the average send time between messages so that we can
+ * ignore certain requests if we get too busy.
+ *
+ * @return the average time between asking core to send a message
+ *         and when the buffer for copying it is passed
+ */
+static struct GNUNET_TIME_Relative get_average_send_delay()
+{
+  unsigned int i;
+  unsigned int divisor;
+  struct GNUNET_TIME_Relative average_time;
+  average_time = GNUNET_TIME_relative_get_zero();
+  divisor = 0;
+  for (i = 0; i < MAX_REPLY_TIMES; i++)
+  {
+    average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]);
+    if (reply_times[i].value == (uint64_t)0)
+      continue;
+    else
+      divisor++;
+  }
+  if (divisor == 0)
+  {
+    return average_time;
+  }
+
+  average_time = GNUNET_TIME_relative_divide(average_time, divisor);
+  fprintf(stderr, "Avg send delay: %u sends is %llu\n", divisor, (long long unsigned int)average_time.value);
+  return average_time;
+}
+#endif
+
+/**
+ * Find the maximum send time of the recently sent values.
+ *
+ * @return the average time between asking core to send a message
+ *         and when the buffer for copying it is passed
+ */
+static struct GNUNET_TIME_Relative get_max_send_delay()
+{
+  unsigned int i;
+  struct GNUNET_TIME_Relative max_time;
+  max_time = GNUNET_TIME_relative_get_zero();
+
+  for (i = 0; i < MAX_REPLY_TIMES; i++)
+  {
+    if (reply_times[i].value > max_time.value)
+      max_time.value = reply_times[i].value;
+  }
+
+  if (max_time.value > MAX_REQUEST_TIME.value)
+    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", (long long unsigned int)max_time.value);
+  return max_time;
+}
+
 static void
 increment_stats(const char *value)
 {
@@ -718,6 +879,10 @@ try_core_send (void *cls,
                 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
                 "DHT", ssize, GNUNET_i2s(&peer->id));
 #endif
+      pending->scheduled = GNUNET_TIME_absolute_get();
+      reply_counter++;
+      if (reply_counter >= MAX_REPLY_TIMES)
+       reply_counter = 0;
       peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
                                                    pending->timeout, &peer->id,
                                                    ssize, &core_transmit_notify, peer);
@@ -759,7 +924,7 @@ static void forward_result_message (void *cls,
   result_message->hop_count = htonl(msg_ctx->hop_count + 1);
   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
   result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
-  memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+  memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
   memcpy(&result_message[1], msg, ntohs(msg->size));
 #if DEBUG_DHT > 1
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
@@ -802,6 +967,7 @@ size_t core_transmit_notify (void *cls,
   peer->th = NULL;
   off = 0;
   pending = peer->head;
+  reply_times[reply_counter] = GNUNET_TIME_absolute_get_difference(pending->scheduled, GNUNET_TIME_absolute_get());
   msize = ntohs(pending->msg->size);
   if (msize <= size)
     {
@@ -1246,6 +1412,7 @@ static int move_lowest_bucket (void *cls,
   struct PeerInfo *peer = value;
   int new_bucket;
 
+  GNUNET_assert(lowest_bucket > 0);
   new_bucket = lowest_bucket - 1;
   remove_peer(peer, lowest_bucket);
   GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
@@ -1358,7 +1525,7 @@ static void forward_message (void *cls,
 
   increment_stats(STAT_ROUTE_FORWARDS);
 
-  if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(msg_ctx->key)))
+  if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(&msg_ctx->key)))
     increment_stats(STAT_ROUTE_FORWARDS_CLOSEST);
 
   msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
@@ -1378,8 +1545,7 @@ static void forward_message (void *cls,
   route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
   if (msg_ctx->bloom != NULL)
     GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
-  if (msg_ctx->key != NULL)
-    memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+  memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
   memcpy(&route_message[1], msg, ntohs(msg->size));
 #if DEBUG_DHT > 1
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
@@ -1625,6 +1791,8 @@ static int consider_peer (struct GNUNET_PeerIdentity *peer)
   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey))
     return GNUNET_NO; /* We already know this peer (are connected even!) */
   bucket = find_current_bucket(&peer->hashPubKey);
+  if (bucket == GNUNET_SYSERR)
+    return GNUNET_NO;
   if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == lowest_bucket) && (lowest_bucket > 0)))
     return GNUNET_YES;
 
@@ -1667,6 +1835,7 @@ static int route_result_message(void *cls,
       }
       else /* We have a valid hello, and peer id stored in new_peer */
       {
+        find_peer_context.count++;
         increment_stats(STAT_FIND_PEER_REPLY);
         if (GNUNET_YES == consider_peer(&new_peer))
         {
@@ -1682,7 +1851,7 @@ static int route_result_message(void *cls,
   if (malicious_dropper == GNUNET_YES)
     record = NULL;
   else
-    record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, message_context->key);
+    record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &message_context->key);
 
   if (record == NULL) /* No record of this message! */
     {
@@ -1701,7 +1870,7 @@ static int route_result_message(void *cls,
                                        message_context->hop_count,
                                        GNUNET_SYSERR,
                                        &my_identity,
-                                       message_context->key,
+                                       &message_context->key,
                                        message_context->peer, NULL);
         }
 #endif
@@ -1728,7 +1897,7 @@ static int route_result_message(void *cls,
             {
               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
                                            message_context->hop_count,
-                                           GNUNET_YES, &my_identity, message_context->key,
+                                           GNUNET_YES, &my_identity, &message_context->key,
                                            message_context->peer, NULL);
             }
 #endif
@@ -1763,7 +1932,7 @@ static int route_result_message(void *cls,
                   dhtlog_handle->insert_route (NULL, message_context->unique_id,
                                                DHTLOG_RESULT,
                                                message_context->hop_count,
-                                               GNUNET_NO, &my_identity, message_context->key,
+                                               GNUNET_NO, &my_identity, &message_context->key,
                                                message_context->peer, &pos->source);
                 }
 #endif
@@ -1876,7 +2045,7 @@ handle_dht_get (void *cls,
 
   if (datacache != NULL)
     results =
-      GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
+      GNUNET_DATACACHE_get (datacache, &message_context->key, get_type,
                             &datacache_get_iterator, message_context);
 
   if (results >= 1)
@@ -1891,14 +2060,14 @@ handle_dht_get (void *cls,
         {
           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
                                 message_context->hop_count, GNUNET_YES, &my_identity,
-                                message_context->key);
+                                &message_context->key);
         }
 
       if ((debug_routes_extended) && (dhtlog_handle != NULL))
         {
           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
                                        message_context->hop_count, GNUNET_YES,
-                                       &my_identity, message_context->key, message_context->peer,
+                                       &my_identity, &message_context->key, message_context->peer,
                                        NULL);
         }
 #endif
@@ -1911,7 +2080,7 @@ handle_dht_get (void *cls,
       {
         dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
                                       message_context->hop_count, GNUNET_NO, &my_identity,
-                                      message_context->key);
+                                      &message_context->key);
       }
 #endif
     }
@@ -2000,7 +2169,7 @@ handle_dht_find_peer (void *cls,
     {
       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
                                    message_context->hop_count, GNUNET_YES, &my_identity,
-                                   message_context->key);
+                                   &message_context->key);
     }
 #endif
   GNUNET_free(find_peer_result);
@@ -2046,7 +2215,7 @@ handle_dht_put (void *cls,
         {
           dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
                                        message_context->hop_count, GNUNET_NO, &my_identity,
-                                       message_context->key);
+                                       &message_context->key);
         }
     }
 #endif
@@ -2059,7 +2228,7 @@ handle_dht_put (void *cls,
     {
       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
                                    message_context->hop_count, GNUNET_YES,
-                                   &my_identity, message_context->key, message_context->peer,
+                                   &my_identity, &message_context->key, message_context->peer,
                                    NULL);
     }
 
@@ -2067,13 +2236,13 @@ handle_dht_put (void *cls,
     {
       dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
                                    message_context->hop_count, GNUNET_YES, &my_identity,
-                                   message_context->key);
+                                   &message_context->key);
     }
 #endif
 
   increment_stats(STAT_PUTS_INSERTED);
   if (datacache != NULL)
-    GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
+    GNUNET_DATACACHE_put (datacache, &message_context->key, data_size,
                           (char *) &put_msg[1], put_type,
                           GNUNET_TIME_absolute_ntoh(put_msg->expiration));
   else
@@ -2154,9 +2323,8 @@ get_forward_count (unsigned int hop_count, size_t target_replication)
     }
   target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
     target_replication / (target_replication * (hop_count + 1) + diameter);
-  target_value = 0;
-
 #if NONSENSE
+  target_value = 0;
   while (target_value < target_count)
     target_value++; /* target_value is ALWAYS 1 after this "loop" */
 #else
@@ -2171,33 +2339,44 @@ get_forward_count (unsigned int hop_count, size_t target_replication)
 
 /*
  * Check whether my identity is closer than any known peers.
+ * If a non-null bloomfilter is given, check if this is the closest
+ * peer that hasn't already been routed to.
  *
  * @param target hash code to check closeness to
+ * @param bloom bloomfilter, exclude these entries from the decision
  *
  * Return GNUNET_YES if node location is closest, GNUNET_NO
  * otherwise.
  */
 int
-am_closest_peer (const GNUNET_HashCode * target)
+am_closest_peer (const GNUNET_HashCode * target, struct GNUNET_CONTAINER_BloomFilter *bloom)
 {
   int bits;
   int other_bits;
   int bucket_num;
   int count;
   struct PeerInfo *pos;
+#if INTEGER_DISTANCE
   unsigned int my_distance;
-
+#endif
   bucket_num = find_current_bucket(target);
   if (bucket_num == GNUNET_SYSERR) /* Same key! */
     return GNUNET_YES;
 
   bits = matching_bits(&my_identity.hashPubKey, target);
+#if INTEGER_DISTANCE
   my_distance = distance(&my_identity.hashPubKey, target);
-
+#endif
   pos = k_buckets[bucket_num].head;
   count = 0;
   while ((pos != NULL) && (count < bucket_size))
     {
+      if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey)))
+        {
+          pos = pos->next;
+          continue; /* Skip already checked entries */
+        }
+
       other_bits = matching_bits(&pos->id.hashPubKey, target);
       if (other_bits > bits)
         return GNUNET_NO;
@@ -2361,6 +2540,33 @@ select_peer (const GNUNET_HashCode * target,
     }
 }
 
+/**
+ * Task used to remove recent entries, either
+ * after timeout, when full, or on shutdown.
+ *
+ * @param cls the entry to remove
+ * @param tc context, reason, etc.
+ */
+static void
+remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct RecentRequest *req = cls;
+  static GNUNET_HashCode hash;
+
+  GNUNET_assert(req != NULL);
+  hash_from_uid(req->uid, &hash);
+  GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req));
+  GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node);
+  GNUNET_CONTAINER_bloomfilter_free(req->bloom);
+  GNUNET_free(req);
+
+  if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap)))
+  {
+    GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap);
+    GNUNET_CONTAINER_heap_destroy(recent.minHeap);
+  }
+}
+
 
 /**
  * Task used to remove forwarding entries, either
@@ -2408,6 +2614,7 @@ static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
   while (current_size >= MAX_OUTSTANDING_FORWARDS)
     {
       source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
+      GNUNET_assert(source_info != NULL);
       record = source_info->record;
       GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
       if (record->head == NULL) /* No more entries in DLL */
@@ -2420,7 +2627,7 @@ static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
       current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
     }
   now = GNUNET_TIME_absolute_get();
-  record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, msg_ctx->key);
+  record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key);
   if (record != NULL) /* Already know this request! */
     {
       pos = record->head;
@@ -2439,8 +2646,8 @@ static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
   else
     {
       record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
-      GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-      memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+      GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+      memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode));
     }
 
   source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
@@ -2480,11 +2687,12 @@ static int route_message(void *cls,
 {
   int i;
   struct PeerInfo *selected;
+#if DEBUG_DHT_ROUTING > 1
   struct PeerInfo *nearest;
-  unsigned int forward_count;
-#if DEBUG_DHT
-  char *nearest_buf;
 #endif
+  unsigned int forward_count;
+  struct RecentRequest *recent_req;
+  GNUNET_HashCode unique_hash;
 #if DEBUG_DHT_ROUTING
   int ret;
 #endif
@@ -2496,7 +2704,7 @@ static int route_message(void *cls,
         {
           dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
                                        message_context->hop_count, GNUNET_SYSERR,
-                                       &my_identity, message_context->key, message_context->peer,
+                                       &my_identity, &message_context->key, message_context->peer,
                                        NULL);
         }
 #endif
@@ -2506,15 +2714,16 @@ static int route_message(void *cls,
     }
 
   increment_stats(STAT_ROUTES);
-  message_context->closest = am_closest_peer(message_context->key);
+  /* Semantics of this call means we find whether we are the closest peer out of those already
+   * routed to on this messages path.
+   */
+  message_context->closest = am_closest_peer(&message_context->key, message_context->bloom);
   forward_count = get_forward_count(message_context->hop_count, message_context->replication);
-  nearest = find_closest_peer(message_context->key);
-
+  
   if (message_context->bloom == NULL)
     message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
 
   if ((stop_on_closest == GNUNET_YES) && (message_context->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
-/*      || ((strict_kademlia == GNUNET_YES) && (message_context->closest == GNUNET_YES))) */
     forward_count = 0;
 
 #if DEBUG_DHT_ROUTING
@@ -2527,7 +2736,7 @@ static int route_message(void *cls,
     {
       dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
                                    message_context->hop_count, ret,
-                                   &my_identity, message_context->key, message_context->peer,
+                                   &my_identity, &message_context->key, message_context->peer,
                                    NULL);
     }
 #endif
@@ -2556,10 +2765,10 @@ static int route_message(void *cls,
         {
           if ((debug_routes) && (dhtlog_handle != NULL))
             {
-              dhtlog_handle->insert_dhtkey(NULL, message_context->key);
+              dhtlog_handle->insert_dhtkey(NULL, &message_context->key);
               dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER,
                                            message_context->hop_count, GNUNET_NO, &my_identity,
-                                           message_context->key);
+                                           &message_context->key);
             }
         }
 #endif
@@ -2570,42 +2779,47 @@ static int route_message(void *cls,
     }
 
   GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
-#if 0
-  if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent->hashmap, message_context->key))
-    {
-      if (GNUNET_SYSERR = GNUNET_CONTAINER_multihashmap_get_multiple (recent->hashmap, message_context->key, &find_matching_recent, &message_context)) /* Have too recently seen this request! */
-        {
-          forward_count = 0;
-        }
-      else /* Exact match not found, but same key found */
-        {
-          recent_req = GNUNET_CONTAINER_multihashmap_get(recent->hashmap, message_context->key);
-        }
+  hash_from_uid(message_context->unique_id, &unique_hash);
+  if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent.hashmap, &unique_hash))
+  {
+      recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap, &unique_hash);
+      GNUNET_assert(recent_req != NULL);
+      if (0 != memcmp(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode)))
+        increment_stats(STAT_DUPLICATE_UID);
+      else
+      {
+        increment_stats(STAT_RECENT_SEEN);
+        GNUNET_CONTAINER_bloomfilter_or2(message_context->bloom, recent_req->bloom, DHT_BLOOM_SIZE);
+      }
     }
   else
     {
       recent_req = GNUNET_malloc(sizeof(struct RecentRequest));
       recent_req->uid = message_context->unique_id;
-      memcmp(&recent_req->key, message_context->key, sizeof(GNUNET_HashCode));
+      memcpy(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode));
       recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req);
-      GNUNET_CONTAINER_heap_insert(recent->minHeap, recent_req, GNUNET_TIME_absolute_get());
-      GNUNET_CONTAINER_multihashmap_put(recent->hashmap, message_context->key, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+      recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap, recent_req, GNUNET_TIME_absolute_get().value);
+      recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
+      GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
     }
 
-  if (GNUNET_CONTAINER_multihashmap_size(recent->hashmap) > DHT_MAX_RECENT)
+  if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT)
     {
-      remove_oldest_recent();
+      recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap);
+      GNUNET_assert(recent_req != NULL);
+      GNUNET_SCHEDULER_cancel(sched, recent_req->remove_task);
+      GNUNET_SCHEDULER_add_now(sched, &remove_recent, recent_req);
     }
-#endif
 
   for (i = 0; i < forward_count; i++)
     {
-      selected = select_peer(message_context->key, message_context->bloom);
+      selected = select_peer(&message_context->key, message_context->bloom);
 
       if (selected != NULL)
         {
           GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
 #if DEBUG_DHT_ROUTING > 1
+          nearest = find_closest_peer(&message_context->key);
           nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
@@ -2616,7 +2830,7 @@ static int route_message(void *cls,
             {
               dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
                                            message_context->hop_count, GNUNET_NO,
-                                           &my_identity, message_context->key, message_context->peer,
+                                           &my_identity, &message_context->key, message_context->peer,
                                            &selected->id);
             }
           forward_message(cls, msg, selected, message_context);
@@ -2640,7 +2854,10 @@ static int route_message(void *cls,
 #endif
 
   if (message_context->bloom != NULL)
-    GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
+    {
+      GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom, message_context->bloom, DHT_BLOOM_SIZE);
+      GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
+    }
 
   return forward_count;
 }
@@ -2684,7 +2901,6 @@ malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   static struct GNUNET_DHT_PutMessage put_message;
   static struct DHT_MessageContext message_context;
   static GNUNET_HashCode key;
-  unsigned int mcsize;
   uint32_t random_key;
 
   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
@@ -2694,12 +2910,11 @@ malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
   put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
   put_message.expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever());
-  mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode);
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.client = NULL;
   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
-  message_context.key = &key;
+  memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
   message_context.msg_options = ntohl (0);
@@ -2726,9 +2941,8 @@ static void
 malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   static struct GNUNET_DHT_GetMessage get_message;
-  static struct DHT_MessageContext message_context;
+  struct DHT_MessageContext message_context;
   static GNUNET_HashCode key;
-  unsigned int mcsize;
   uint32_t random_key;
 
   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
@@ -2737,12 +2951,11 @@ malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
   get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
   get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE);
-  mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode);
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.client = NULL;
   random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1);
   GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key);
-  message_context.key = &key;
+  memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1));
   message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
   message_context.msg_options = ntohl (0);
@@ -2754,13 +2967,10 @@ malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     dhtlog_handle->insert_dhtkey(NULL, &key);
   increment_stats(STAT_GET_START);
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message with hash %s", my_short_id, "DHT", GNUNET_h2s(&key));
-  route_message(NULL, &get_message.header, &message_context);
+  route_message (NULL, &get_message.header, &message_context);
   GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_get_frequency), &malicious_get_task, NULL);
 }
 
-#if DO_FIND_PEER
-
-
 /**
  * Iterator over hash map entries.
  *
@@ -2797,11 +3007,41 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
   int ret;
   struct GNUNET_TIME_Relative next_send_time;
   struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
-
+#if COUNT_INTERVAL
+  struct GNUNET_TIME_Relative time_diff;
+  struct GNUNET_TIME_Absolute end;
+  double multiplier;
+  double count_per_interval;
+#endif
   if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
     return;
 
+  if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If we are finding peers already, no need to send out our request right now! */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since last find peer message sent!\n", newly_found_peers);
+      GNUNET_SCHEDULER_add_delayed (sched,
+                                    GNUNET_TIME_UNIT_MINUTES,
+                                    &send_find_peer_message, NULL);
+      newly_found_peers = 0;
+      return;
+    }
+    
   increment_stats(STAT_FIND_PEER_START);
+#if COUNT_INTERVAL
+  end = GNUNET_TIME_absolute_get();
+  time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start, end);
+
+  if (time_diff.value > FIND_PEER_CALC_INTERVAL.value)
+    {
+      multiplier = time_diff.value / FIND_PEER_CALC_INTERVAL.value;
+      count_per_interval = find_peer_context.count / multiplier;
+    }
+  else
+    {
+      multiplier = FIND_PEER_CALC_INTERVAL.value / time_diff.value;
+      count_per_interval = find_peer_context.count * multiplier;
+    }
+#endif
 
   find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage));
   find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
@@ -2810,10 +3050,10 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
   GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom, temp_bloom);
   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom, find_peer_msg->bloomfilter, DHT_BLOOM_SIZE));
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
-  message_context.key = &my_identity.hashPubKey;
+  memcpy(&message_context.key, &my_identity.hashPubKey, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1));
-  message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION);
-  message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS);
+  message_context.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
+  message_context.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS;
   message_context.network_size = estimate_diameter();
   message_context.peer = &my_identity;
   message_context.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
@@ -2836,12 +3076,18 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
                              GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                                       DHT_MAXIMUM_FIND_PEER_INTERVAL.value - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
     }
+
+  GNUNET_assert (next_send_time.value != 0);
+  find_peer_context.count = 0;
   newly_found_peers = 0;
-  GNUNET_SCHEDULER_add_delayed (sched,
-                                next_send_time,
-                                &send_find_peer_message, NULL);
+  find_peer_context.start = GNUNET_TIME_absolute_get();
+  if (GNUNET_YES == do_find_peer)
+  {
+    GNUNET_SCHEDULER_add_delayed (sched,
+                                  next_send_time,
+                                         &send_find_peer_message, NULL);
+  }
 }
-#endif
 
 /**
  * Handler for any generic DHT messages, calls the appropriate handler
@@ -2859,11 +3105,7 @@ handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
   const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
   const struct GNUNET_MessageHeader *enc_msg;
   struct DHT_MessageContext message_context;
-  size_t enc_type;
-
   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
-  enc_type = ntohs (enc_msg->type);
-
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
@@ -2876,7 +3118,7 @@ handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
 #endif
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.client = find_active_client (client);
-  message_context.key = &dht_msg->key;
+  memcpy(&message_context.key, &dht_msg->key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
   message_context.replication = ntohl (dht_msg->desired_replication_level);
   message_context.msg_options = ntohl (dht_msg->options);
@@ -2920,6 +3162,10 @@ handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client,
 
   switch (ntohs(dht_control_msg->command))
   {
+  case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
+    GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending self seeking find peer request!\n");
+    GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL);
+    break;
   case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET:
     if (ntohs(dht_control_msg->variable) > 0)
       malicious_get_frequency = ntohs(dht_control_msg->variable);
@@ -2971,15 +3217,11 @@ handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
     (const struct GNUNET_DHT_StopMessage *) message;
   struct DHTQueryRecord *record;
   struct DHTRouteSource *pos;
-  uint64_t uid;
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
 #endif
-
-  uid = GNUNET_ntohll(dht_stop_msg->unique_id);
-
   record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
   if (record != NULL)
     {
@@ -3017,6 +3259,12 @@ handle_dht_p2p_route_request (void *cls,
   struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
   struct DHT_MessageContext *message_context;
 
+  if (get_max_send_delay().value > MAX_REQUEST_TIME.value)
+  {
+    fprintf(stderr, "Sending of previous requests has taken far too long, backing off!\n");
+    return GNUNET_YES;
+  }
+
   if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/
     {
 #if DEBUG_PING
@@ -3034,7 +3282,7 @@ handle_dht_p2p_route_request (void *cls,
   message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
   GNUNET_assert(message_context->bloom != NULL);
   message_context->hop_count = ntohl(incoming->hop_count);
-  message_context->key = &incoming->key;
+  memcpy(&message_context->key, &incoming->key, sizeof(GNUNET_HashCode));
   message_context->replication = ntohl(incoming->desired_replication_level);
   message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
   message_context->msg_options = ntohl(incoming->options);
@@ -3074,7 +3322,7 @@ handle_dht_p2p_route_result (void *cls,
   memset(&message_context, 0, sizeof(struct DHT_MessageContext));
   message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
   GNUNET_assert(message_context.bloom != NULL);
-  message_context.key = &incoming->key;
+  memcpy(&message_context.key, &incoming->key, sizeof(GNUNET_HashCode));
   message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
   message_context.msg_options = ntohl(incoming->options);
   message_context.hop_count = ntohl(incoming->hop_count);
@@ -3322,7 +3570,7 @@ run (void *cls,
   coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
                                  cfg,   /* Main configuration */
                                  GNUNET_TIME_UNIT_FOREVER_REL,
-                                 NULL,  /* Closure passed to DHT functionas around? */
+                                 NULL,  /* Closure passed to DHT functions */
                                  &core_init,    /* Call core_init once connected */
                                  &handle_core_connect,  /* Handle connects */
                                  &handle_core_disconnect,  /* remove peers on disconnects */
@@ -3402,6 +3650,15 @@ run (void *cls,
       malicious_dropper = GNUNET_YES;
     }
 
+  if (GNUNET_NO ==
+        GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
+                                             "do_find_peer"))
+    {
+      do_find_peer = GNUNET_NO;
+    }
+  else
+    do_find_peer = GNUNET_YES;
+
   if (GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
                                            "mysql_logging_extended"))
@@ -3445,14 +3702,19 @@ run (void *cls,
       GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO);
       GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO);
     }
-#if DO_FIND_PEER
-  next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
-                         GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
-                                                  (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
-  GNUNET_SCHEDULER_add_delayed (sched,
-                                next_send_time,
-                                &send_find_peer_message, NULL);
-#endif
+  /* FIXME: if there are no recent requests then these never get freed, but alternative is _annoying_! */
+  recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2);
+  recent.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  if (GNUNET_YES == do_find_peer)
+  {
+    next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value +
+                           GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
+                                                    (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.value);
+    find_peer_context.start = GNUNET_TIME_absolute_get();
+    GNUNET_SCHEDULER_add_delayed (sched,
+                                  next_send_time,
+                                  &send_find_peer_message, &find_peer_context);
+  }
 
   /* Scheduled the task to clean up when shutdown is called */
   cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,