fixing block reconstruction start/shutdown code
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 9f8ec98e307a37aceeb2a289640c03d8e12ad4ac..08b01fbbf561819e6a96763e1a432382367ba6e1 100644 (file)
@@ -24,6 +24,8 @@
  * @author Christian Grothoff
  *
  * TODO:
+ * - collect traffic data for anonymity levels > 1
+ * - implement transmission restrictions for anonymity level > 1
  * - more statistics
  */
 #include "platform.h"
@@ -37,6 +39,7 @@
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_transport_service.h"
 #include "gnunet_util_lib.h"
 #include "gnunet-service-fs_indexing.h"
 #include "fs.h"
  * Should we introduce random latency in processing?  Required for proper
  * implementation of GAP, but can be disabled for performance evaluation of
  * the basic routing algorithm.
+ *
+ * Note that with delays enabled, performance can be significantly lower
+ * (several orders of magnitude in 2-peer test runs); if you want to
+ * measure throughput of other components, set this to NO.  Also, you
+ * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
+ * a rather wasteful mode of operation (that might still get the highest
+ * throughput overall).
+ *
+ * Performance measurements (for 50 MB file, 2 peers):
+ *
+ * - Without delays: 3300 kb/s
+ * - With    delays:  101 kb/s
  */
-#define SUPPORT_DELAYS GNUNET_YES
-
-/**
- * Maximum number of outgoing messages we queue per peer.
- */
-#define MAX_QUEUE_PER_PEER 16
+#define SUPPORT_DELAYS GNUNET_NO
 
 /**
  * Size for the hash map for DHT requests from the FS
  */
 #define FS_DHT_HT_SIZE 1024
 
+/**
+ * At what frequency should our datastore load decrease
+ * automatically (since if we don't use it, clearly the
+ * load must be going down).
+ */
+#define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
+
 /**
  * How often do we flush trust values to disk?
  */
 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
 
+/**
+ * How quickly do we age cover traffic?  At the given 
+ * time interval, remaining cover traffic counters are
+ * decremented by 1/16th.
+ */
+#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
 /**
  * How often do we at most PUT content into the DHT?
  */
  * repeatedly recently, the probability is multiplied by the inverse
  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
  * plus MAX_CORK_DELAY (so roughly every 3.5s).
+ *
+ * Note that this factor is a key influence to performance in small
+ * networks (especially test networks of 2 peers) because if there is
+ * only a single peer with the data, this value will determine how
+ * soon we might re-try.  For example, a value of 3 can result in 
+ * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
+ * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
+ * rather inefficient in larger networks, hence picking 1 is in 
+ * general not the best choice.
+ *
+ * Performance measurements (for 50 MB file, 2 peers, no delays):
+ *
+ * - 1: 3300 kb/s (consistently)
+ * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
+ * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
+ *
+ * Note that this does NOT mean that the value should be 1 since
+ * a 2-peer network is far from representative here (and this fails
+ * to take into consideration bandwidth wasted by repeatedly 
+ * sending queries to peers that don't have the content).  Also,
+ * it is expected that higher values lead to more inconsistent
+ * measurements since this only affects lost messages towards the
+ * end of the download.
+ *
+ * Finally, we should probably consider changing this and making
+ * it dependent on the number of connected peers or a related
+ * metric (bad magic constants...).
  */
-#define RETRY_PROBABILITY_INV 3
+#define RETRY_PROBABILITY_INV 1
 
 /**
  * What is the maximum delay for a P2P FS message (in our interaction
 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
 
 /**
- * Maximum number of requests (from other peers) that we're
- * willing to have pending at any given point in time.
+ * Maximum number of requests (from other peers, overall) that we're
+ * willing to have pending at any given point in time.  Can be changed
+ * via the configuration file (32k is just the default).
  */
 static unsigned long long max_pending_requests = (32 * 1024);
 
@@ -206,6 +258,15 @@ struct ConnectedPeer
    */
   struct GNUNET_TIME_Absolute last_migration_block;
 
+  /**
+   * Transmission times for the last MAX_QUEUE_PER_PEER
+   * requests for this peer.  Used as a ring buffer, current
+   * offset is stored in 'last_request_times_off'.  If the
+   * oldest entry is more recent than the 'avg_delay', we should
+   * not send any more requests right now.
+   */
+  struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
+
   /**
    * Handle for an active request for transmission to this
    * peer, or NULL.
@@ -231,6 +292,16 @@ struct ConnectedPeer
    */
   struct GNUNET_LOAD_Value *transmission_delay;
 
+  /**
+   * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
+   */
+  struct GNUNET_CORE_InformationRequestContext *irc;
+
+  /**
+   * Request for which 'irc' is currently active (or NULL).
+   */
+  struct PendingRequest *pr;
+
   /**
    * Time when the last transmission request was issued.
    */
@@ -285,6 +356,11 @@ struct ConnectedPeer
    */
   unsigned int last_client_replies_woff;
 
+  /**
+   * Current offset into 'last_request_times' ring buffer.
+   */
+  unsigned int last_request_times_off;
+
 };
 
 
@@ -401,6 +477,31 @@ struct ClientList
 };
 
 
+/**
+ * Information about a peer that we have forwarded this
+ * request to already.  
+ */
+struct UsedTargetEntry
+{
+  /**
+   * What was the last time we have transmitted this request to this
+   * peer?
+   */
+  struct GNUNET_TIME_Absolute last_request_time;
+
+  /**
+   * How often have we transmitted this request to this peer?
+   */
+  unsigned int num_requests;
+
+  /**
+   * PID of the target peer.
+   */
+  GNUNET_PEER_Id pid;
+
+};
+
+
 /**
  * Doubly-linked list of messages we are performing
  * due to a pending request.
@@ -470,14 +571,14 @@ struct PendingRequest
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
   /**
-   * Context of our GNUNET_CORE_peer_change_preference call.
+   * Reference to DHT get operation for this request (or NULL).
    */
-  struct GNUNET_CORE_InformationRequestContext *irc;
+  struct GNUNET_DHT_GetHandle *dht_get;
 
   /**
-   * Reference to DHT get operation for this request (or NULL).
+   * Context of our GNUNET_CORE_peer_change_preference call.
    */
-  struct GNUNET_DHT_GetHandle *dht_get;
+  struct ConnectedPeer *pirc;
 
   /**
    * Hash code of all replies that we have seen so far (only valid
@@ -531,7 +632,7 @@ struct PendingRequest
    * (Interned) Peer identifiers of peers that have already
    * received our query for this content.
    */
-  GNUNET_PEER_Id *used_pids;
+  struct UsedTargetEntry *used_targets;
   
   /**
    * Our entry in the queue (non-NULL while we wait for our
@@ -550,14 +651,14 @@ struct PendingRequest
   uint32_t anonymity_level;
 
   /**
-   * How many entries in "used_pids" are actually valid?
+   * How many entries in "used_targets" are actually valid?
    */
-  unsigned int used_pids_off;
+  unsigned int used_targets_off;
 
   /**
-   * How long is the "used_pids" array?
+   * How long is the "used_targets" array?
    */
-  unsigned int used_pids_size;
+  unsigned int used_targets_size;
 
   /**
    * Number of results found for this request.
@@ -686,11 +787,6 @@ static struct GNUNET_BLOCK_Context *block_ctx;
  */
 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
 
-/**
- * Our scheduler.
- */
-static struct GNUNET_SCHEDULER_Handle *sched;
-
 /**
  * Our configuration.
  */
@@ -796,7 +892,12 @@ static unsigned int mig_size;
 /**
  * Are we allowed to migrate content to this peer.
  */
-static int active_migration;
+static int active_to_migration;
+
+/**
+ * Are we allowed to push out content from this peer.
+ */
+static int active_from_migration;
 
 /**
  * How many entires with zero anonymity do we currently estimate
@@ -831,6 +932,34 @@ static struct GNUNET_LOAD_Value *datastore_put_load;
  */
 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
 
+/**
+ * How many query messages have we received 'recently' that 
+ * have not yet been claimed as cover traffic?
+ */
+static unsigned int cover_query_count;
+
+/**
+ * How many content messages have we received 'recently' that 
+ * have not yet been claimed as cover traffic?
+ */
+static unsigned int cover_content_count;
+
+/**
+ * ID of our task that we use to age the cover counters.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
+
+static void
+age_cover_counters (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  cover_content_count = (cover_content_count * 15) / 16;
+  cover_query_count = (cover_query_count * 15) / 16;
+  cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
+                                                &age_cover_counters,
+                                                NULL);
+}
+
 /**
  * We've just now completed a datastore request.  Update our
  * datastore load calculations.
@@ -844,7 +973,7 @@ update_datastore_delays (struct GNUNET_TIME_Absolute start)
 
   delay = GNUNET_TIME_absolute_get_duration (start);
   GNUNET_LOAD_update (datastore_get_load,
-                     delay.value);
+                     delay.rel_value);
 }
 
 
@@ -948,7 +1077,7 @@ consider_migration (void *cls,
   unsigned int repl;
   
   /* consider 'cp' as a migration target for mb */
-  if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
+  if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
     return GNUNET_YES; /* peer has requested no migration! */
   if (mb != NULL)
     {
@@ -1022,7 +1151,7 @@ consider_migration (void *cls,
 #endif
   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
     }
   cp->cth 
@@ -1082,8 +1211,7 @@ consider_migration_gathering ()
                                       MAX_MIGRATION_QUEUE);
   delay = GNUNET_TIME_relative_max (delay,
                                    min_migration_delay);
-  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          delay,
+  mig_task = GNUNET_SCHEDULER_add_delayed (delay,
                                           &gather_migration_blocks,
                                           NULL);
 }
@@ -1117,8 +1245,7 @@ consider_dht_put_gathering (void *cls)
         (hopefully) appear */
       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
     }
-  dht_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          delay,
+  dht_task = GNUNET_SCHEDULER_add_delayed (delay,
                                           &gather_dht_put_blocks,
                                           cls);
 }
@@ -1264,6 +1391,7 @@ process_dht_put_content (void *cls,
 #endif
   GNUNET_DHT_put (dht_handle,
                  key,
+                 DEFAULT_PUT_REPLICATION,
                  GNUNET_DHT_RO_NONE,
                  type,
                  size,
@@ -1384,6 +1512,7 @@ static void
 destroy_pending_request (struct PendingRequest *pr)
 {
   struct GNUNET_PeerIdentity pid;
+  unsigned int i;
 
   if (pr->hnode != NULL)
     {
@@ -1411,7 +1540,7 @@ destroy_pending_request (struct PendingRequest *pr)
                                            pr))
     {
       GNUNET_LOAD_update (rt_entry_lifetime,
-                         GNUNET_TIME_absolute_get_duration (pr->start_time).value);
+                         GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
     }
   if (pr->qe != NULL)
      {
@@ -1445,10 +1574,11 @@ destroy_pending_request (struct PendingRequest *pr)
       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                       
       pr->bf = NULL;
     }
-  if (pr->irc != NULL)
+  if (pr->pirc != NULL)
     {
-      GNUNET_CORE_peer_change_preference_cancel (pr->irc);
-      pr->irc = NULL;
+      GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
+      pr->pirc->irc = NULL;
+      pr->pirc = NULL;
     }
   if (pr->replies_seen != NULL)
     {
@@ -1457,47 +1587,77 @@ destroy_pending_request (struct PendingRequest *pr)
     }
   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (sched,
-                              pr->task);
+      GNUNET_SCHEDULER_cancel (pr->task);
       pr->task = GNUNET_SCHEDULER_NO_TASK;
     }
   while (NULL != pr->pending_head)    
     destroy_pending_message_list_entry (pr->pending_head);
   GNUNET_PEER_change_rc (pr->target_pid, -1);
-  if (pr->used_pids != NULL)
+  if (pr->used_targets != NULL)
     {
-      GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
-      GNUNET_free (pr->used_pids);
-      pr->used_pids_off = 0;
-      pr->used_pids_size = 0;
-      pr->used_pids = NULL;
+      for (i=0;i<pr->used_targets_off;i++)
+       GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
+      GNUNET_free (pr->used_targets);
+      pr->used_targets_off = 0;
+      pr->used_targets_size = 0;
+      pr->used_targets = NULL;
     }
   GNUNET_free (pr);
 }
 
 
+/**
+ * Find latency information in 'atsi'.
+ *
+ * @param atsi performance data
+ * @return connection latency
+ */
+static struct GNUNET_TIME_Relative
+get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+         (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
+    atsi++;
+  if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
+    {
+      GNUNET_break (0);
+      /* how can we not have latency data? */
+      return GNUNET_TIME_UNIT_SECONDS;
+    }
+  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                       ntohl (atsi->value));
+}
+
+
 /**
  * Method called whenever a given peer connects.
  *
  * @param cls closure, not used
  * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  */
 static void 
 peer_connect_handler (void *cls,
                      const struct
                      GNUNET_PeerIdentity * peer,
-                     struct GNUNET_TIME_Relative latency,
-                     uint32_t distance)
+                     const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   struct ConnectedPeer *cp;
   struct MigrationReadyBlock *pos;
   char *fn;
   uint32_t trust;
-  
+  struct GNUNET_TIME_Relative latency;
+
+  latency = get_latency (atsi);
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (NULL != cp)
+    {
+      GNUNET_break (0);
+      return;
+    }
   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
-  cp->transmission_delay = GNUNET_LOAD_value_init ();
+  cp->transmission_delay = GNUNET_LOAD_value_init (latency);
   cp->pid = GNUNET_PEER_intern (peer);
 
   fn = get_trust_filename (peer);
@@ -1521,6 +1681,43 @@ peer_connect_handler (void *cls,
 }
 
 
+/**
+ * Method called whenever a given peer has a status change.
+ *
+ * @param cls closure
+ * @param peer peer identity this notification is about
+ * @param bandwidth_in available amount of inbound bandwidth
+ * @param bandwidth_out available amount of outbound bandwidth
+ * @param timeout absolute time when this peer will time out
+ *        unless we see some further activity from it
+ * @param atsi status information
+ */
+static void
+peer_status_handler (void *cls,
+                    const struct
+                    GNUNET_PeerIdentity * peer,
+                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
+                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+                    struct GNUNET_TIME_Absolute timeout,
+                    const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  struct ConnectedPeer *cp;
+  struct GNUNET_TIME_Relative latency;
+
+  latency = get_latency (atsi);
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (cp == NULL)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  GNUNET_LOAD_value_set_decline (cp->transmission_delay,
+                                latency);  
+}
+
+
+
 /**
  * Increase the host credit by a value.
  *
@@ -1532,12 +1729,9 @@ peer_connect_handler (void *cls,
 static int
 change_host_trust (struct ConnectedPeer *host, int value)
 {
-  unsigned int old_trust;
-
   if (value == 0)
     return 0;
   GNUNET_assert (host != NULL);
-  old_trust = host->trust;
   if (value > 0)
     {
       if (host->trust + value < host->trust)
@@ -1616,8 +1810,7 @@ cron_flush_trust (void *cls,
     return;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  GNUNET_SCHEDULER_add_delayed (tc->sched,
-                               TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
+  GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
 }
 
 
@@ -1683,6 +1876,14 @@ peer_disconnect_handler (void *cls,
                GNUNET_CONTAINER_multihashmap_remove (connected_peers,
                                                      &peer->hashPubKey,
                                                      cp));
+  if (cp->irc != NULL)
+    {
+      GNUNET_CORE_peer_change_preference_cancel (cp->irc);
+      cp->irc = NULL;
+      cp->pr->pirc = NULL;
+      cp->pr = NULL;
+    }
+
   /* remove this peer from migration considerations; schedule
      alternatives */
   next = mig_head;
@@ -1716,7 +1917,7 @@ peer_disconnect_handler (void *cls,
     }
   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
     }
   while (NULL != (pm = cp->pending_messages_head))
@@ -1857,12 +2058,12 @@ shutdown_task (void *cls,
     }
   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
     {
-      GNUNET_SCHEDULER_cancel (sched, mig_task);
+      GNUNET_SCHEDULER_cancel (mig_task);
       mig_task = GNUNET_SCHEDULER_NO_TASK;
     }
   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
     {
-      GNUNET_SCHEDULER_cancel (sched, dht_task);
+      GNUNET_SCHEDULER_cancel (dht_task);
       dht_task = GNUNET_SCHEDULER_NO_TASK;
     }
   while (client_list != NULL)
@@ -1912,10 +2113,11 @@ shutdown_task (void *cls,
   block_ctx = NULL;
   GNUNET_CONFIGURATION_destroy (block_cfg);
   block_cfg = NULL;
-  sched = NULL;
   cfg = NULL;  
   GNUNET_free_non_null (trustDirectory);
   trustDirectory = NULL;
+  GNUNET_SCHEDULER_cancel (cover_age_task);
+  cover_age_task = GNUNET_SCHEDULER_NO_TASK;
 }
 
 
@@ -1992,12 +2194,32 @@ transmit_to_peer (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping message, core too busy.\n");
 #endif
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "Dropping message, core too busy.\n");
       GNUNET_LOAD_update (cp->transmission_delay,
                          UINT64_MAX);
+      
+      if (NULL != (pm = cp->pending_messages_head))
+       {
+         GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+                                      cp->pending_messages_tail,
+                                      pm);
+         cp->pending_requests--;    
+         destroy_pending_message (pm, 0);
+       }
+      if (NULL != (pm = cp->pending_messages_head))
+       {
+         GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
+         min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
+         cp->delayed_transmission_request_task
+           = GNUNET_SCHEDULER_add_delayed (min_delay,
+                                           &delayed_transmission_request,
+                                           cp);
+       }
       return 0;
     }  
   GNUNET_LOAD_update (cp->transmission_delay,
-                     GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
+                     GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
   now = GNUNET_TIME_absolute_get ();
   msize = 0;
   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
@@ -2006,7 +2228,7 @@ transmit_to_peer (void *cls,
          (pm->msize <= size) )
     {
       next_pm = pm->next;
-      if (pm->delay_until.value > now.value)
+      if (pm->delay_until.abs_value > now.abs_value)
        {
          min_delay = GNUNET_TIME_relative_min (min_delay,
                                                GNUNET_TIME_absolute_get_remaining (pm->delay_until));
@@ -2030,8 +2252,7 @@ transmit_to_peer (void *cls,
     {     
       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
       cp->delayed_transmission_request_task
-       = GNUNET_SCHEDULER_add_delayed (sched,
-                                       min_delay,
+       = GNUNET_SCHEDULER_add_delayed (min_delay,
                                        &delayed_transmission_request,
                                        cp);
     }
@@ -2142,7 +2363,13 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
                                     pm);
   cp->pending_requests++;
   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
-    destroy_pending_message (cp->pending_messages_tail, 0);  
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches discarded (queue length bound)"),
+                               1,
+                               GNUNET_NO);
+      destroy_pending_message (cp->pending_messages_tail, 0);  
+    }
   GNUNET_PEER_resolve (cp->pid, &pid);
   if (NULL != cp->cth)
     {
@@ -2151,7 +2378,7 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
     }
   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
     }
   /* need to schedule transmission */
@@ -2193,25 +2420,9 @@ test_get_load_too_high (uint32_t priority)
 
   ld = GNUNET_LOAD_get_load (datastore_get_load);
   if (ld < 1)
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests done for free (low load)"),
-                               1,
-                               GNUNET_NO);
-      return GNUNET_SYSERR;
-    }
-  if (ld <= priority)
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests done for a price (normal load)"),
-                               1,
-                               GNUNET_NO);
-      return GNUNET_NO;
-    }
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# requests dropped due to high load"),
-                           1,
-                           GNUNET_NO);
+    return GNUNET_SYSERR;    
+  if (ld <= priority)    
+    return GNUNET_NO;    
   return GNUNET_YES;
 }
 
@@ -2298,6 +2509,7 @@ transmit_query_continuation (void *cls,
                             GNUNET_PEER_Id tpid)
 {
   struct PendingRequest *pr = cls;
+  unsigned int i;
 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# queries scheduled for forwarding"),
@@ -2310,25 +2522,39 @@ transmit_query_continuation (void *cls,
                  "Transmission of request failed, will try again later.\n");
 #endif
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                                 &forward_request_task,
                                                 pr); 
       return;    
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted query `%s'\n",
+             GNUNET_h2s (&pr->query));
+#endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# queries forwarded"),
                            1,
                            GNUNET_NO);
-  GNUNET_PEER_change_rc (tpid, 1);
-  if (pr->used_pids_off == pr->used_pids_size)
-    GNUNET_array_grow (pr->used_pids,
-                      pr->used_pids_size,
-                      pr->used_pids_size * 2 + 2);
-  pr->used_pids[pr->used_pids_off++] = tpid;
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == tpid)
+      break; /* found match! */    
+  if (i == pr->used_targets_off)
+    {
+      /* need to create new entry */
+      if (pr->used_targets_off == pr->used_targets_size)
+       GNUNET_array_grow (pr->used_targets,
+                          pr->used_targets_size,
+                          pr->used_targets_size * 2 + 2);
+      GNUNET_PEER_change_rc (tpid, 1);
+      pr->used_targets[pr->used_targets_off].pid = tpid;
+      pr->used_targets[pr->used_targets_off].num_requests = 0;
+      i = pr->used_targets_off++;
+    }
+  pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
+  pr->used_targets[i].num_requests++;
   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-    pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                            get_processing_delay (),
+    pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                             &forward_request_task,
                                             pr);
 }
@@ -2407,7 +2633,6 @@ refresh_bloomfilter (struct PendingRequest *pr)
  *
  * @param cls the requests "struct PendingRequest*"
  * @param peer identifies the peer
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
  * @param bpm_out set to the current bandwidth limit (sending) for this peer
  * @param amount set to the amount that was actually reserved or unreserved
  * @param preference current traffic preference for the given peer
@@ -2416,7 +2641,6 @@ static void
 target_reservation_cb (void *cls,
                       const struct
                       GNUNET_PeerIdentity * peer,
-                      struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
                       struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
                       int amount,
                       uint64_t preference)
@@ -2431,19 +2655,9 @@ target_reservation_cb (void *cls,
   unsigned int k;
   int no_route;
   uint32_t bm;
+  unsigned int i;
 
-  pr->irc = NULL;
-  if (peer == NULL)
-    {
-      /* error in communication with core, try again later */
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
-                                                &forward_request_task,
-                                                pr);
-      return;
-    }
-  // (3) transmit, update ttl/priority
+  /* (3) transmit, update ttl/priority */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &peer->hashPubKey);
   if (cp == NULL)
@@ -2454,8 +2668,18 @@ target_reservation_cb (void *cls,
                  "Selected peer disconnected!\n");
 #endif
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
+      return;
+    }
+  cp->irc = NULL;
+  pr->pirc = NULL;
+  if (peer == NULL)
+    {
+      /* error in communication with core, try again later */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                                 &forward_request_task,
                                                 pr);
       return;
@@ -2476,8 +2700,7 @@ target_reservation_cb (void *cls,
                                    1,
                                    GNUNET_NO);
          if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                    get_processing_delay (),
+           pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                                     &forward_request_task,
                                                     pr);
          return;  /* this target round failed */
@@ -2489,6 +2712,16 @@ target_reservation_cb (void *cls,
                            gettext_noop ("# queries scheduled for forwarding"),
                            1,
                            GNUNET_NO);
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == cp->pid) 
+      {
+       GNUNET_STATISTICS_update (stats,
+                                 gettext_noop ("# queries retransmitted to same target"),
+                                 1,
+                                 GNUNET_NO);
+       break;
+      } 
+
   /* build message and insert message into priority queue */
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2542,6 +2775,7 @@ target_reservation_cb (void *cls,
                                               pr->bf_size);
   pm->cont = &transmit_query_continuation;
   pm->cont_cls = pr;
+  cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
   add_to_pending_messages_for_peer (cp, pm, pr);
 }
 
@@ -2567,6 +2801,11 @@ struct PeerSelectionContext
    */
   double target_score;
 
+  /**
+   * Does it make sense to we re-try quickly again?
+   */
+  int fast_retry;
+
 };
 
 
@@ -2589,12 +2828,13 @@ target_peer_select_cb (void *cls,
   struct PeerSelectionContext *psc = cls;
   struct ConnectedPeer *cp = value;
   struct PendingRequest *pr = psc->pr;
+  struct GNUNET_TIME_Relative delay;
   double score;
   unsigned int i;
   unsigned int pc;
 
   /* 1) check that this peer is not the initiator */
-  if (cp == pr->cp)
+  if (cp == pr->cp)     
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2602,30 +2842,55 @@ target_peer_select_cb (void *cls,
 #endif
       return GNUNET_YES; /* skip */       
     }
+  if (cp->irc != NULL)
+    {
+      psc->fast_retry = GNUNET_YES;
+      return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
+    }
 
   /* 2) check if we have already (recently) forwarded to this peer */
+  /* 2a) this particular request */
   pc = 0;
-  for (i=0;i<pr->used_pids_off;i++)
-    if (pr->used_pids[i] == cp->pid) 
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == cp->pid) 
       {
-       pc++;
+       pc = pr->used_targets[i].num_requests;
+       GNUNET_assert (pc > 0);
+       /* FIXME: make re-enabling a peer independent of how often
+          this function is called??? */
        if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                          RETRY_PROBABILITY_INV))
+                                          RETRY_PROBABILITY_INV * pc))
          {
 #if DEBUG_FS
            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                        "NOT re-trying query that was previously transmitted %u times\n",
-                       (unsigned int) pr->used_pids_off);
+                       (unsigned int) pc);
 #endif
            return GNUNET_YES; /* skip */
          }
+       break;
       }
 #if DEBUG_FS
   if (0 < pc)
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-               "Re-trying query that was previously transmitted %u times to this peer\n",
-               (unsigned int) pc);
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Re-trying query that was previously transmitted %u times to this peer\n",
+                 (unsigned int) pc);
+    }
 #endif
+  /* 2b) many other requests to this peer */
+  delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
+  if (delay.rel_value <= cp->avg_delay.rel_value)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "NOT sending query since we send %u others to this peer in the last %llums\n",
+                 MAX_QUEUE_PER_PEER,
+                 cp->avg_delay.rel_value);
+#endif
+      return GNUNET_YES; /* skip */      
+    }
+
   /* 3) calculate how much we'd like to forward to this peer,
      starting with a random value that is strong enough
      to at least give any peer a chance sometimes 
@@ -2648,7 +2913,7 @@ target_peer_select_cb (void *cls,
          score += 1.0; /* likely successful based on hot path */
     }
   /* 3b) include latency */
-  if (cp->avg_delay.value < 4 * TTL_DECREMENT)
+  if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
     score += 1.0; /* likely fast based on latency */
   /* 3c) include priorities */
   if (cp->avg_priority <= pr->remaining_priority / 2.0)
@@ -2662,19 +2927,19 @@ target_peer_select_cb (void *cls,
   if (pr->target_pid == cp->pid)
     score += 100.0;
   /* store best-fit in closure */
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Peer `%s' gets score %f for forwarding query, max is %f\n",
-             GNUNET_h2s (key),
-             score,
-             psc->target_score);
-#endif  
   score++; /* avoid zero */
   if (score > psc->target_score)
     {
       psc->target_score = score;
       psc->target.hashPubKey = *key; 
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Peer `%s' gets score %f for forwarding query, max is %8f\n",
+             GNUNET_h2s (key),
+             score,
+             psc->target_score);
+#endif
   return GNUNET_YES;
 }
   
@@ -2754,7 +3019,7 @@ forward_request_task (void *cls,
   struct GNUNET_TIME_Relative delay;
 
   pr->task = GNUNET_SCHEDULER_NO_TASK;
-  if (pr->irc != NULL)
+  if (pr->pirc != NULL)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2775,6 +3040,7 @@ forward_request_task (void *cls,
                                          GNUNET_TIME_UNIT_FOREVER_REL,
                                          pr->type,
                                          &pr->query,
+                                         DEFAULT_GET_REPLICATION,
                                          GNUNET_DHT_RO_NONE,
                                          pr->bf,
                                          pr->mingle,
@@ -2783,23 +3049,46 @@ forward_request_task (void *cls,
                                          &process_dht_reply,
                                          pr);
     }
+
+  if ( (pr->anonymity_level > 1) &&
+       (cover_query_count < pr->anonymity_level - 1) )
+    {
+      delay = get_processing_delay ();
+#if DEBUG_FS 
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
+                 GNUNET_h2s (&pr->query),
+                 delay.rel_value);
+#endif
+      pr->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                              &forward_request_task,
+                                              pr);
+      return;
+    }
+  /* consume cover traffic */
+  if (pr->anonymity_level > 1) 
+    cover_query_count -= pr->anonymity_level - 1;
+
   /* (1) select target */
   psc.pr = pr;
   psc.target_score = -DBL_MAX;
+  psc.fast_retry = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
                                         &target_peer_select_cb,
                                         &psc);  
   if (psc.target_score == -DBL_MAX)
     {
-      delay = get_processing_delay ();
+      if (psc.fast_retry == GNUNET_YES)
+       delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
+      else
+       delay = get_processing_delay ();
 #if DEBUG_FS 
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
                  GNUNET_h2s (&pr->query),
-                 delay.value);
+                 delay.rel_value);
 #endif
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              delay,
+      pr->task = GNUNET_SCHEDULER_add_delayed (delay,
                                               &forward_request_task,
                                               pr);
       return; /* nobody selected */
@@ -2834,7 +3123,10 @@ forward_request_task (void *cls,
       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                              &psc.target.hashPubKey);
       GNUNET_assert (NULL != cp);
-      pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
+      GNUNET_assert (cp->irc == NULL);
+      pr->pirc = cp;
+      cp->pr = pr;
+      cp->irc = GNUNET_CORE_peer_change_preference (core,
                                                    &psc.target,
                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
                                                    GNUNET_BANDWIDTH_value_init (UINT32_MAX),
@@ -2842,6 +3134,7 @@ forward_request_task (void *cls,
                                                    cp->inc_preference,
                                                    &target_reservation_cb,
                                                    pr);
+      GNUNET_assert (cp->irc != NULL);
       cp->inc_preference = 0;
     }
   else
@@ -2849,7 +3142,7 @@ forward_request_task (void *cls,
       /* force forwarding */
       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
       target_reservation_cb (pr, &psc.target,
-                            zerobw, zerobw, 0, 0.0);
+                            zerobw, 0, 0.0);
     }
 }
 
@@ -2981,6 +3274,11 @@ struct ProcessReplyClosure
    */
   uint32_t priority;
 
+  /**
+   * Anonymity requirements for this reply.
+   */
+  uint32_t anonymity_level;
+
   /**
    * Evaluation result (returned).
    */
@@ -3019,8 +3317,28 @@ process_reply (void *cls,
   struct PutMessage *pm;
   struct ConnectedPeer *cp;
   struct GNUNET_TIME_Relative cur_delay;
+#if SUPPORT_DELAYS  
+struct GNUNET_TIME_Relative art_delay;
+#endif
   size_t msize;
+  unsigned int i;
 
+  if (NULL == pr->client_request_list)
+    {
+      /* reply will go over the network, check for cover traffic */
+      if ( (prq->anonymity_level >  1) &&
+          (cover_content_count < prq->anonymity_level - 1) )
+       {
+         /* insufficient cover traffic, skip */
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# replies suppressed due to lack of cover traffic"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_YES;
+       }       
+      if (prq->anonymity_level >  1) 
+       cover_content_count -= prq->anonymity_level - 1;
+    }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Matched result (type %u) for query `%s' with pending request\n",
@@ -3033,13 +3351,19 @@ process_reply (void *cls,
                            GNUNET_NO);
   if (prq->sender != NULL)
     {
-      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
-      prq->sender->avg_delay.value
-       = (prq->sender->avg_delay.value * 
-          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
-      prq->sender->avg_priority
-       = (prq->sender->avg_priority * 
-          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+      for (i=0;i<pr->used_targets_off;i++)
+       if (pr->used_targets[i].pid == prq->sender->pid)
+         break;
+      if (i < pr->used_targets_off)
+       {
+         cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
+         prq->sender->avg_delay.rel_value
+           = (prq->sender->avg_delay.rel_value * 
+              (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
+         prq->sender->avg_priority
+           = (prq->sender->avg_priority * 
+              (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+       }
       if (pr->cp != NULL)
        {
          GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
@@ -3088,8 +3412,7 @@ process_reply (void *cls,
       pr->do_remove = GNUNET_YES;
       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
        {
-         GNUNET_SCHEDULER_cancel (sched,
-                                  pr->task);
+         GNUNET_SCHEDULER_cancel (pr->task);
          pr->task = GNUNET_SCHEDULER_NO_TASK;
        }
       GNUNET_break (GNUNET_YES ==
@@ -3097,7 +3420,7 @@ process_reply (void *cls,
                                                          key,
                                                          pr));
       GNUNET_LOAD_update (rt_entry_lifetime,
-                         GNUNET_TIME_absolute_get_duration (pr->start_time).value);
+                         GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
       break;
     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
       GNUNET_STATISTICS_update (stats,
@@ -3105,9 +3428,9 @@ process_reply (void *cls,
                                1,
                                GNUNET_NO);
 #if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+/*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Duplicate response `%s', discarding.\n",
-                 GNUNET_h2s (&mhash));
+                 GNUNET_h2s (&mhash));*/
 #endif
       return GNUNET_YES; /* duplicate */
     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
@@ -3210,10 +3533,15 @@ process_reply (void *cls,
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
 #if SUPPORT_DELAYS
+      art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                          TTL_DECREMENT));
       reply->delay_until 
-       = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
-                                                                          GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                                                                                    TTL_DECREMENT)));
+       = GNUNET_TIME_relative_to_absolute (art_delay);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("cummulative artificial delay introduced (ms)"),
+                               art_delay.abs_value,
+                               GNUNET_NO);
 #endif
       reply->msize = msize;
       reply->priority = UINT32_MAX; /* send replies first! */
@@ -3286,7 +3614,7 @@ put_migration_continuation (void *cls,
   delay = GNUNET_TIME_absolute_get_duration (*start);
   GNUNET_free (start);
   GNUNET_LOAD_update (datastore_put_load,
-                     delay.value);
+                     delay.rel_value);
   if (GNUNET_OK == success)
     return;
   GNUNET_STATISTICS_update (stats,
@@ -3303,8 +3631,7 @@ put_migration_continuation (void *cls,
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
  */
@@ -3312,8 +3639,7 @@ static int
 handle_p2p_put (void *cls,
                const struct GNUNET_PeerIdentity *other,
                const struct GNUNET_MessageHeader *message,
-               struct GNUNET_TIME_Relative latency,
-               uint32_t distance)
+               const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   const struct PutMessage *put;
   uint16_t msize;
@@ -3352,6 +3678,7 @@ handle_p2p_put (void *cls,
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
+  cover_content_count++;
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received result for query `%s' from peer `%4s'\n",
@@ -3373,6 +3700,7 @@ handle_p2p_put (void *cls,
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.anonymity_level = 1;
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
@@ -3382,9 +3710,9 @@ handle_p2p_put (void *cls,
   if (prq.sender != NULL)
     {
       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
-      prq.sender->trust += prq.priority;
+      change_host_trust (prq.sender, prq.priority);
     }
-  if ( (GNUNET_YES == active_migration) &&
+  if ( (GNUNET_YES == active_to_migration) &&
        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
     {      
 #if DEBUG_FS
@@ -3406,15 +3734,15 @@ handle_p2p_put (void *cls,
     }
   putl = GNUNET_LOAD_get_load (datastore_put_load);
   if ( (GNUNET_NO == prq.request_found) &&
-       ( (GNUNET_YES != active_migration) ||
+       ( (GNUNET_YES != active_to_migration) ||
                 (putl > 2.5 * (1 + prq.priority)) ) )
     {
       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                              &other->hashPubKey);
-      if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
+      if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
        return GNUNET_OK; /* already blocked */
       /* We're too busy; send MigrationStop message! */
-      if (GNUNET_YES != active_migration) 
+      if (GNUNET_YES != active_to_migration) 
        putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
                                                  5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
@@ -3444,8 +3772,7 @@ handle_p2p_put (void *cls,
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
  */
@@ -3453,8 +3780,7 @@ static int
 handle_p2p_migration_stop (void *cls,
                           const struct GNUNET_PeerIdentity *other,
                           const struct GNUNET_MessageHeader *message,
-                          struct GNUNET_TIME_Relative latency,
-                          uint32_t distance)
+                          const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   struct ConnectedPeer *cp; 
   const struct MigrationStopMessage *msm;
@@ -3592,11 +3918,14 @@ process_local_reply (void *cls,
              return;
            }
        }
-
+      if (pr->local_only == GNUNET_YES)
+       {
+         destroy_pending_request (pr);
+         return;
+       }
       /* no more results */
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                            &forward_request_task,
+       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
                                             pr);      
       return;
     }
@@ -3653,6 +3982,7 @@ process_local_reply (void *cls,
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
+  prq.anonymity_level = anonymity;
   if ( (old_rf == 0) &&
        (pr->results_found == 0) )
     update_datastore_delays (pr->start_time);
@@ -3663,6 +3993,7 @@ process_local_reply (void *cls,
     return; /* done here */
   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
     {
+      pr->local_only = GNUNET_YES; /* do not forward */
       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return;
     }
@@ -3704,8 +4035,16 @@ bound_priority (uint32_t prio_in,
 
   ld = test_get_load_too_high (0);
   if (ld == GNUNET_SYSERR)
-    return 0; /* excess resources */
-  ret = change_host_trust (cp, prio_in);
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for free (low load)"),
+                               1,
+                               GNUNET_NO);
+      return 0; /* excess resources */
+    }
+  if (prio_in > INT32_MAX)
+    prio_in = INT32_MAX;
+  ret = - change_host_trust (cp, - (int) prio_in);
   if (ret > 0)
     {
       if (ret > current_priorities + N)
@@ -3722,11 +4061,21 @@ bound_priority (uint32_t prio_in,
     }
   if (ld == GNUNET_YES)
     {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# request dropped, priority insufficient"),
+                               1,
+                               GNUNET_NO);
       /* undo charge */
-      if (ret != 0)
-       change_host_trust (cp, -ret);
+      change_host_trust (cp, (int) ret);
       return -1; /* not enough resources */
     }
+  else
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for a price (normal load)"),
+                               1,
+                               GNUNET_NO);
+    }
 #undef N
   return ret;
 }
@@ -3769,8 +4118,7 @@ check_duplicate_request_peer (void *cls,
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
  */
@@ -3778,8 +4126,7 @@ static int
 handle_p2p_get (void *cls,
                const struct GNUNET_PeerIdentity *other,
                const struct GNUNET_MessageHeader *message,
-               struct GNUNET_TIME_Relative latency,
-               uint32_t distance)
+               const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   struct PendingRequest *pr;
   struct ConnectedPeer *cp;
@@ -3804,6 +4151,11 @@ handle_p2p_get (void *cls,
       return GNUNET_SYSERR;
     }
   gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s'\n",
+             GNUNET_h2s (&gm->query));
+#endif
   type = ntohl (gm->type);
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
@@ -3819,7 +4171,14 @@ handle_p2p_get (void *cls,
       return GNUNET_SYSERR;
     }  
   opt = (const GNUNET_HashCode*) &gm[1];
-  bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
+  bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
+  /* bfsize must be power of 2, check! */
+  if (0 != ( (bfsize - 1) & bfsize))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  cover_query_count++;
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
@@ -3869,10 +4228,6 @@ handle_p2p_get (void *cls,
                  "Dropping query from `%s', this peer is too busy.\n",
                  GNUNET_i2s (other));
 #endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests dropped due to high load"),
-                               1,
-                               GNUNET_NO);
       return GNUNET_OK;
     }
 #if DEBUG_FS 
@@ -3893,7 +4248,7 @@ handle_p2p_get (void *cls,
     }
   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
-       GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
+       GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
     {
       /* don't have BW to send to peer, or would likely take longer than we have for it,
         so at best indirect the query */
@@ -3941,7 +4296,6 @@ handle_p2p_get (void *cls,
                                                  BLOOMFILTER_K);
       pr->bf_size = bfsize;
     }
-
   cdc.have = NULL;
   cdc.pr = pr;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
@@ -3950,8 +4304,8 @@ handle_p2p_get (void *cls,
                                              &cdc);
   if (cdc.have != NULL)
     {
-      if (cdc.have->start_time.value + cdc.have->ttl >=
-         pr->start_time.value + pr->ttl)
+      if (cdc.have->start_time.abs_value + cdc.have->ttl >=
+         pr->start_time.abs_value + pr->ttl)
        {
          /* existing request has higher TTL, drop new one! */
          cdc.have->priority += pr->priority;
@@ -3994,7 +4348,7 @@ handle_p2p_get (void *cls,
   
   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
                                            pr,
-                                           pr->start_time.value + pr->ttl);
+                                           pr->start_time.abs_value + pr->ttl);
 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# P2P searches received"),
@@ -4013,19 +4367,35 @@ handle_p2p_get (void *cls,
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
   if (GNUNET_YES != pr->forward_only)
-    pr->qe = GNUNET_DATASTORE_get (dsh,
-                                  &gm->query,
-                                  type,                               
-                                  pr->priority + 1,
-                                  MAX_DATASTORE_QUEUE,                          
-                                  timeout,
-                                  &process_local_reply,
-                                  pr);
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Handing request for `%s' to datastore\n",
+                 GNUNET_h2s (&gm->query));
+#endif
+      pr->qe = GNUNET_DATASTORE_get (dsh,
+                                    &gm->query,
+                                    type,                             
+                                    pr->priority + 1,
+                                    MAX_DATASTORE_QUEUE,                                
+                                    timeout,
+                                    &process_local_reply,
+                                    pr);
+      if (NULL == pr->qe)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped by datastore (queue length limit)"),
+                                   1,
+                                   GNUNET_NO);
+       }
+    }
   else
-    GNUNET_STATISTICS_update (stats,
-                             gettext_noop ("# requests forwarded due to high load"),
-                             1,
-                             GNUNET_NO);
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests forwarded due to high load"),
+                               1,
+                               GNUNET_NO);
+    }
 
   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
   switch (pr->type)
@@ -4034,11 +4404,16 @@ handle_p2p_get (void *cls,
     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one result, wait for datastore */
       if (GNUNET_YES != pr->forward_only)
-       break;
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
+                                   1,
+                                   GNUNET_NO);
+         break;
+       }
     default:
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                            &forward_request_task,
+       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
                                             pr);
     }
 
@@ -4218,13 +4593,11 @@ handle_start_search (void *cls,
 /**
  * Process fs requests.
  *
- * @param s scheduler to use
  * @param server the initialized server
  * @param c configuration to use
  */
 static int
-main_init (struct GNUNET_SCHEDULER_Handle *s,
-          struct GNUNET_SERVER_Handle *server,
+main_init (struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
@@ -4251,9 +4624,8 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
   };
   unsigned long long enc = 128;
 
-  sched = s;
   cfg = c;
-  stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+  stats = GNUNET_STATISTICS_create ("fs", cfg);
   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
   if ( (GNUNET_OK !=
        GNUNET_CONFIGURATION_get_value_number (cfg,
@@ -4276,17 +4648,16 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
     }
   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
-  rt_entry_lifetime = GNUNET_LOAD_value_init ();
+  rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
-  core = GNUNET_CORE_connect (sched,
-                             cfg,
-                             GNUNET_TIME_UNIT_FOREVER_REL,
+  core = GNUNET_CORE_connect (cfg,
+                             1, /* larger? */
                              NULL,
                              NULL,
                              &peer_connect_handler,
                              &peer_disconnect_handler,
-                             NULL,
+                             &peer_status_handler,
                              NULL, GNUNET_NO,
                              NULL, GNUNET_NO,
                              p2p_handlers);
@@ -4312,8 +4683,7 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
        }
       return GNUNET_SYSERR;
     }
-  /* FIXME: distinguish between sending and storing in options? */
-  if (active_migration) 
+  if (active_from_migration) 
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                  _("Content migration is enabled, will start to gather data\n"));
@@ -4329,14 +4699,15 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
                                                           "TRUST",
                                                           &trustDirectory));
   GNUNET_DISK_directory_create (trustDirectory);
-  GNUNET_SCHEDULER_add_with_priority (sched,
-                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
+  GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
                                      &cron_flush_trust, NULL);
 
 
   GNUNET_SERVER_add_handlers (server, handlers);
-  GNUNET_SCHEDULER_add_delayed (sched,
-                               GNUNET_TIME_UNIT_FOREVER_REL,
+  cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
+                                                &age_cover_counters,
+                                                NULL);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                &shutdown_task,
                                NULL);
   return GNUNET_OK;
@@ -4347,28 +4718,28 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
  * Process fs requests.
  *
  * @param cls closure
- * @param sched scheduler to use
  * @param server the initialized server
  * @param cfg configuration to use
  */
 static void
 run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *sched,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
-                                                          "FS",
-                                                          "ACTIVEMIGRATION");
-  dsh = GNUNET_DATASTORE_connect (cfg,
-                                 sched);
+  active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                             "FS",
+                                                             "CONTENT_CACHING");
+  active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                               "FS",
+                                                               "CONTENT_PUSHING");
+  dsh = GNUNET_DATASTORE_connect (cfg);
   if (dsh == NULL)
     {
-      GNUNET_SCHEDULER_shutdown (sched);
+      GNUNET_SCHEDULER_shutdown ();
       return;
     }
-  datastore_get_load = GNUNET_LOAD_value_init ();
-  datastore_put_load = GNUNET_LOAD_value_init ();
+  datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
+  datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
   block_cfg = GNUNET_CONFIGURATION_create ();
   GNUNET_CONFIGURATION_set_value_string (block_cfg,
                                         "block",
@@ -4376,13 +4747,12 @@ run (void *cls,
                                         "fs");
   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
   GNUNET_assert (NULL != block_ctx);
-  dht_handle = GNUNET_DHT_connect (sched,
-                                  cfg,
+  dht_handle = GNUNET_DHT_connect (cfg,
                                   FS_DHT_HT_SIZE);
-  if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
-       (GNUNET_OK != main_init (sched, server, cfg)) )
+  if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
+       (GNUNET_OK != main_init (server, cfg)) )
     {    
-      GNUNET_SCHEDULER_shutdown (sched);
+      GNUNET_SCHEDULER_shutdown ();
       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
       dsh = NULL;
       GNUNET_DHT_disconnect (dht_handle);