fixing block reconstruction start/shutdown code
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index b8184262f1db16f7a85b4a7bb59098d4d5486c39..08b01fbbf561819e6a96763e1a432382367ba6e1 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
- * FIXME:
- * - TTL/priority calculations are absent!
  * TODO:
- * - have non-zero preference / priority for requests we initiate!
- * - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high
- * - statistics
+ * - collect traffic data for anonymity levels > 1
+ * - implement transmission restrictions for anonymity level > 1
+ * - more statistics
  */
 #include "platform.h"
 #include <float.h>
 #include "gnunet_constants.h"
 #include "gnunet_core_service.h"
+#include "gnunet_dht_service.h"
 #include "gnunet_datastore_service.h"
+#include "gnunet_load_lib.h"
 #include "gnunet_peer_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_transport_service.h"
 #include "gnunet_util_lib.h"
 #include "gnunet-service-fs_indexing.h"
 #include "fs.h"
 #define DEBUG_FS GNUNET_NO
 
 /**
- * Maximum number of outgoing messages we queue per peer.
- * FIXME: make configurable?
+ * Should we introduce random latency in processing?  Required for proper
+ * implementation of GAP, but can be disabled for performance evaluation of
+ * the basic routing algorithm.
+ *
+ * Note that with delays enabled, performance can be significantly lower
+ * (several orders of magnitude in 2-peer test runs); if you want to
+ * measure throughput of other components, set this to NO.  Also, you
+ * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
+ * a rather wasteful mode of operation (that might still get the highest
+ * throughput overall).
+ *
+ * Performance measurements (for 50 MB file, 2 peers):
+ *
+ * - Without delays: 3300 kb/s
+ * - With    delays:  101 kb/s
+ */
+#define SUPPORT_DELAYS GNUNET_NO
+
+/**
+ * Size for the hash map for DHT requests from the FS
+ * service.  Should be about the number of concurrent
+ * DHT requests we plan to make.
+ */
+#define FS_DHT_HT_SIZE 1024
+
+/**
+ * At what frequency should our datastore load decrease
+ * automatically (since if we don't use it, clearly the
+ * load must be going down).
+ */
+#define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
+
+/**
+ * How often do we flush trust values to disk?
+ */
+#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+
+/**
+ * How quickly do we age cover traffic?  At the given 
+ * time interval, remaining cover traffic counters are
+ * decremented by 1/16th.
+ */
+#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
+/**
+ * How often do we at most PUT content into the DHT?
  */
-#define MAX_QUEUE_PER_PEER 16
+#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
  * Inverse of the probability that we will submit the same query
  * repeatedly recently, the probability is multiplied by the inverse
  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
  * plus MAX_CORK_DELAY (so roughly every 3.5s).
+ *
+ * Note that this factor is a key influence to performance in small
+ * networks (especially test networks of 2 peers) because if there is
+ * only a single peer with the data, this value will determine how
+ * soon we might re-try.  For example, a value of 3 can result in 
+ * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
+ * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
+ * rather inefficient in larger networks, hence picking 1 is in 
+ * general not the best choice.
+ *
+ * Performance measurements (for 50 MB file, 2 peers, no delays):
+ *
+ * - 1: 3300 kb/s (consistently)
+ * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
+ * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
+ *
+ * Note that this does NOT mean that the value should be 1 since
+ * a 2-peer network is far from representative here (and this fails
+ * to take into consideration bandwidth wasted by repeatedly 
+ * sending queries to peers that don't have the content).  Also,
+ * it is expected that higher values lead to more inconsistent
+ * measurements since this only affects lost messages towards the
+ * end of the download.
+ *
+ * Finally, we should probably consider changing this and making
+ * it dependent on the number of connected peers or a related
+ * metric (bad magic constants...).
  */
-#define RETRY_PROBABILITY_INV 3
+#define RETRY_PROBABILITY_INV 1
 
 /**
  * What is the maximum delay for a P2P FS message (in our interaction
  */
 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
 
-
-
 /**
- * Maximum number of requests (from other peers) that we're
- * willing to have pending at any given point in time.
- * FIXME: set from configuration.
+ * Maximum number of requests (from other peers, overall) that we're
+ * willing to have pending at any given point in time.  Can be changed
+ * via the configuration file (32k is just the default).
  */
-static uint64_t max_pending_requests = (32 * 1024);
+static unsigned long long max_pending_requests = (32 * 1024);
 
 
 /**
@@ -86,12 +155,6 @@ static uint64_t max_pending_requests = (32 * 1024);
  */
 struct PendingMessage;
 
-/**
- * Our connection to the datastore.
- */
-static struct GNUNET_DATASTORE_Handle *dsh;
-
-
 /**
  * Function called upon completion of a transmission.
  *
@@ -134,6 +197,11 @@ struct PendingMessage
    */
   void *cont_cls;
 
+  /**
+   * Do not transmit this pending message until this deadline.
+   */
+  struct GNUNET_TIME_Absolute delay_until;
+
   /**
    * Size of the reply; actual reply message follows
    * at the end of this struct.
@@ -178,6 +246,27 @@ struct ConnectedPeer
    */ 
   struct GNUNET_TIME_Relative avg_delay;
 
+  /**
+   * Point in time until which this peer does not want us to migrate content
+   * to it.
+   */
+  struct GNUNET_TIME_Absolute migration_blocked;
+
+  /**
+   * Time until when we blocked this peer from migrating
+   * data to us.
+   */
+  struct GNUNET_TIME_Absolute last_migration_block;
+
+  /**
+   * Transmission times for the last MAX_QUEUE_PER_PEER
+   * requests for this peer.  Used as a ring buffer, current
+   * offset is stored in 'last_request_times_off'.  If the
+   * oldest entry is more recent than the 'avg_delay', we should
+   * not send any more requests right now.
+   */
+  struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
+
   /**
    * Handle for an active request for transmission to this
    * peer, or NULL.
@@ -196,6 +285,33 @@ struct ConnectedPeer
    */
   struct PendingMessage *pending_messages_tail;
 
+  /**
+   * How long does it typically take for us to transmit a message
+   * to this peer?  (delay between the request being issued and
+   * the callback being invoked).
+   */
+  struct GNUNET_LOAD_Value *transmission_delay;
+
+  /**
+   * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
+   */
+  struct GNUNET_CORE_InformationRequestContext *irc;
+
+  /**
+   * Request for which 'irc' is currently active (or NULL).
+   */
+  struct PendingRequest *pr;
+
+  /**
+   * Time when the last transmission request was issued.
+   */
+  struct GNUNET_TIME_Absolute last_transmission_request_start;
+
+  /**
+   * ID of delay task for scheduling transmission.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
+
   /**
    * Average priority of successful replies.  Calculated
    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
@@ -204,9 +320,19 @@ struct ConnectedPeer
 
   /**
    * Increase in traffic preference still to be submitted
-   * to the core service for this peer. FIXME: double or 'uint64_t'?
+   * to the core service for this peer.
+   */
+  uint64_t inc_preference;
+
+  /**
+   * Trust rating for this peer
+   */
+  uint32_t trust;
+
+  /**
+   * Trust rating for this peer on disk.
    */
-  double inc_preference;
+  uint32_t disk_trust;
 
   /**
    * The peer's identity.
@@ -230,6 +356,11 @@ struct ConnectedPeer
    */
   unsigned int last_client_replies_woff;
 
+  /**
+   * Current offset into 'last_request_times' ring buffer.
+   */
+  unsigned int last_request_times_off;
+
 };
 
 
@@ -346,6 +477,31 @@ struct ClientList
 };
 
 
+/**
+ * Information about a peer that we have forwarded this
+ * request to already.  
+ */
+struct UsedTargetEntry
+{
+  /**
+   * What was the last time we have transmitted this request to this
+   * peer?
+   */
+  struct GNUNET_TIME_Absolute last_request_time;
+
+  /**
+   * How often have we transmitted this request to this peer?
+   */
+  unsigned int num_requests;
+
+  /**
+   * PID of the target peer.
+   */
+  GNUNET_PEER_Id pid;
+
+};
+
+
 /**
  * Doubly-linked list of messages we are performing
  * due to a pending request.
@@ -414,10 +570,15 @@ struct PendingRequest
    */
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
+  /**
+   * Reference to DHT get operation for this request (or NULL).
+   */
+  struct GNUNET_DHT_GetHandle *dht_get;
+
   /**
    * Context of our GNUNET_CORE_peer_change_preference call.
    */
-  struct GNUNET_CORE_InformationRequestContext *irc;
+  struct ConnectedPeer *pirc;
 
   /**
    * Hash code of all replies that we have seen so far (only valid
@@ -471,7 +632,7 @@ struct PendingRequest
    * (Interned) Peer identifiers of peers that have already
    * received our query for this content.
    */
-  GNUNET_PEER_Id *used_pids;
+  struct UsedTargetEntry *used_targets;
   
   /**
    * Our entry in the queue (non-NULL while we wait for our
@@ -480,7 +641,6 @@ struct PendingRequest
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
-
    * Size of the 'bf' (in bytes).
    */
   size_t bf_size;
@@ -491,14 +651,14 @@ struct PendingRequest
   uint32_t anonymity_level;
 
   /**
-   * How many entries in "used_pids" are actually valid?
+   * How many entries in "used_targets" are actually valid?
    */
-  unsigned int used_pids_off;
+  unsigned int used_targets_off;
 
   /**
-   * How long is the "used_pids" array?
+   * How long is the "used_targets" array?
    */
-  unsigned int used_pids_size;
+  unsigned int used_targets_size;
 
   /**
    * Number of results found for this request.
@@ -548,12 +708,17 @@ struct PendingRequest
   /**
    * Remove this request after transmission of the current response.
    */
-  int16_t do_remove;
+  int8_t do_remove;
+
+  /**
+   * GNUNET_YES if we should not forward this request to other peers.
+   */
+  int8_t local_only;
 
   /**
    * GNUNET_YES if we should not forward this request to other peers.
    */
-  int16_t local_only;
+  int8_t forward_only;
 
 };
 
@@ -608,9 +773,19 @@ struct MigrationReadyBlock
 
 
 /**
- * Our scheduler.
+ * Our connection to the datastore.
+ */
+static struct GNUNET_DATASTORE_Handle *dsh;
+
+/**
+ * Our block context.
+ */
+static struct GNUNET_BLOCK_Context *block_ctx;
+
+/**
+ * Our block configuration.
  */
-static struct GNUNET_SCHEDULER_Handle *sched;
+static struct GNUNET_CONFIGURATION_Handle *block_cfg;
 
 /**
  * Our configuration.
@@ -673,17 +848,42 @@ static struct MigrationReadyBlock *mig_tail;
  */
 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
 
+/**
+ * Request to datastore for DHT PUTs (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+/**
+ * Type we will request for the next DHT PUT round from the datastore.
+ */
+static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+
+/**
+ * Where do we store trust information?
+ */
+static char *trustDirectory;
+
 /**
  * ID of task that collects blocks for migration.
  */
 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
 
+/**
+ * ID of task that collects blocks for DHT PUTs.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier dht_task;
+
 /**
  * What is the maximum frequency at which we are allowed to
  * poll the datastore for migration content?
  */
 static struct GNUNET_TIME_Relative min_migration_delay;
 
+/**
+ * Handle for DHT operations.
+ */
+static struct GNUNET_DHT_Handle *dht_handle;
+
 /**
  * Size of the doubly-linked list of migration blocks.
  */
@@ -692,7 +892,107 @@ static unsigned int mig_size;
 /**
  * Are we allowed to migrate content to this peer.
  */
-static int active_migration;
+static int active_to_migration;
+
+/**
+ * Are we allowed to push out content from this peer.
+ */
+static int active_from_migration;
+
+/**
+ * How many entires with zero anonymity do we currently estimate
+ * to have in the database?
+ */
+static unsigned int zero_anonymity_count_estimate;
+
+/**
+ * Typical priorities we're seeing from other peers right now.  Since
+ * most priorities will be zero, this value is the weighted average of
+ * non-zero priorities seen "recently".  In order to ensure that new
+ * values do not dramatically change the ratio, values are first
+ * "capped" to a reasonable range (+N of the current value) and then
+ * averaged into the existing value by a ratio of 1:N.  Hence
+ * receiving the largest possible priority can still only raise our
+ * "current_priorities" by at most 1.
+ */
+static double current_priorities;
+
+/**
+ * Datastore 'GET' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
+
+/**
+ * How long do requests typically stay in the routing table?
+ */
+static struct GNUNET_LOAD_Value *rt_entry_lifetime;
+
+/**
+ * How many query messages have we received 'recently' that 
+ * have not yet been claimed as cover traffic?
+ */
+static unsigned int cover_query_count;
+
+/**
+ * How many content messages have we received 'recently' that 
+ * have not yet been claimed as cover traffic?
+ */
+static unsigned int cover_content_count;
+
+/**
+ * ID of our task that we use to age the cover counters.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
+
+static void
+age_cover_counters (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  cover_content_count = (cover_content_count * 15) / 16;
+  cover_query_count = (cover_query_count * 15) / 16;
+  cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
+                                                &age_cover_counters,
+                                                NULL);
+}
+
+/**
+ * We've just now completed a datastore request.  Update our
+ * datastore load calculations.
+ *
+ * @param start time when the datastore request was issued
+ */
+static void
+update_datastore_delays (struct GNUNET_TIME_Absolute start)
+{
+  struct GNUNET_TIME_Relative delay;
+
+  delay = GNUNET_TIME_absolute_get_duration (start);
+  GNUNET_LOAD_update (datastore_get_load,
+                     delay.rel_value);
+}
+
+
+/**
+ * Get the filename under which we would store the GNUNET_HELLO_Message
+ * for the given host and protocol.
+ * @return filename of the form DIRECTORY/HOSTID
+ */
+static char *
+get_trust_filename (const struct GNUNET_PeerIdentity *id)
+{
+  struct GNUNET_CRYPTO_HashAsciiEncoded fil;
+  char *fn;
+
+  GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
+  GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
+  return fn;
+}
+
 
 
 /**
@@ -715,7 +1015,6 @@ transmit_to_peer (void *cls,
 
 /* ******************* clean up functions ************************ */
 
-
 /**
  * Delete the given migration block.
  *
@@ -760,7 +1059,7 @@ is_closer (const GNUNET_HashCode *key,
  *            targets for (or NULL for none)
  * @param key ID of the peer 
  * @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)2
+ * @return GNUNET_YES (always continue iteration)
  */
 static int
 consider_migration (void *cls,
@@ -778,6 +1077,8 @@ consider_migration (void *cls,
   unsigned int repl;
   
   /* consider 'cp' as a migration target for mb */
+  if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
+    return GNUNET_YES; /* peer has requested no migration! */
   if (mb != NULL)
     {
       GNUNET_PEER_resolve (cp->pid,
@@ -820,7 +1121,7 @@ consider_migration (void *cls,
     }
 
   /* consider scheduling transmission to cp for content migration */
-  if (cp->cth != NULL)
+  if (cp->cth != NULL)        
     return GNUNET_YES; 
   msize = 0;
   pos = mig_head;
@@ -842,6 +1143,17 @@ consider_migration (void *cls,
     }
   if (msize == 0)
     return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   cp->cth 
     = GNUNET_CORE_notify_transmit_ready (core,
                                         0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -865,6 +1177,19 @@ gather_migration_blocks (void *cls,
                         const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ * 
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /**
  * If the migration task is not currently running, consider
  * (re)scheduling it with the appropriate delay.
@@ -874,23 +1199,58 @@ consider_migration_gathering ()
 {
   struct GNUNET_TIME_Relative delay;
 
+  if (dsh == NULL)
+    return;
   if (mig_qe != NULL)
     return;
   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
     return;
   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
                                         mig_size);
-  delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+  delay = GNUNET_TIME_relative_divide (delay,
                                       MAX_MIGRATION_QUEUE);
   delay = GNUNET_TIME_relative_max (delay,
                                    min_migration_delay);
-  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          delay,
+  mig_task = GNUNET_SCHEDULER_add_delayed (delay,
                                           &gather_migration_blocks,
                                           NULL);
 }
 
 
+/**
+ * If the DHT PUT gathering task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_dht_put_gathering (void *cls)
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (dsh == NULL)
+    return;
+  if (dht_qe != NULL)
+    return;
+  if (dht_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  if (zero_anonymity_count_estimate > 0)
+    {
+      delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+                                          zero_anonymity_count_estimate);
+      delay = GNUNET_TIME_relative_min (delay,
+                                       MAX_DHT_PUT_FREQ);
+    }
+  else
+    {
+      /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+        (hopefully) appear */
+      delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+    }
+  dht_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                          &gather_dht_put_blocks,
+                                          cls);
+}
+
+
 /**
  * Process content offered for migration.
  *
@@ -908,7 +1268,7 @@ consider_migration_gathering ()
 static void
 process_migration_content (void *cls,
                           const GNUNET_HashCode * key,
-                          uint32_t size,
+                          size_t size,
                           const void *data,
                           enum GNUNET_BLOCK_Type type,
                           uint32_t priority,
@@ -925,7 +1285,7 @@ process_migration_content (void *cls,
        consider_migration_gathering ();
       return;
     }
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     {
       if (GNUNET_OK !=
          GNUNET_FS_handle_on_demand_block (key, size, data,
@@ -933,9 +1293,17 @@ process_migration_content (void *cls,
                                            expiration, uid, 
                                            &process_migration_content,
                                            NULL))
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       {
+         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       }
       return;
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
+#endif
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
   mb->expiration = expiration;
@@ -954,6 +1322,87 @@ process_migration_content (void *cls,
 }
 
 
+/**
+ * Function called upon completion of the DHT PUT operation.
+ */
+static void
+dht_put_continuation (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Store content in DHT.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void
+process_dht_put_content (void *cls,
+                        const GNUNET_HashCode * key,
+                        size_t size,
+                        const void *data,
+                        enum GNUNET_BLOCK_Type type,
+                        uint32_t priority,
+                        uint32_t anonymity,
+                        struct GNUNET_TIME_Absolute
+                        expiration, uint64_t uid)
+{ 
+  static unsigned int counter;
+  static GNUNET_HashCode last_vhash;
+  static GNUNET_HashCode vhash;
+
+  if (key == NULL)
+    {
+      dht_qe = NULL;
+      consider_dht_put_gathering (cls);
+      return;
+    }
+  /* slightly funky code to estimate the total number of values with zero
+     anonymity from the maximum observed length of a monotonically increasing 
+     sequence of hashes over the contents */
+  GNUNET_CRYPTO_hash (data, size, &vhash);
+  if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
+    {
+      if (zero_anonymity_count_estimate > 0)
+       zero_anonymity_count_estimate /= 2;
+      counter = 0;
+    }
+  last_vhash = vhash;
+  if (counter < 31)
+    counter++;
+  if (zero_anonymity_count_estimate < (1 << counter))
+    zero_anonymity_count_estimate = (1 << counter);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for DHT PUT\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  GNUNET_DHT_put (dht_handle,
+                 key,
+                 DEFAULT_PUT_REPLICATION,
+                 GNUNET_DHT_RO_NONE,
+                 type,
+                 size,
+                 data,
+                 expiration,
+                 GNUNET_TIME_UNIT_FOREVER_REL,
+                 &dht_put_continuation,
+                 cls);
+}
+
+
 /**
  * Task that is run periodically to obtain blocks for content
  * migration
@@ -966,26 +1415,54 @@ gather_migration_blocks (void *cls,
                         const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   mig_task = GNUNET_SCHEDULER_NO_TASK;
-  mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
-                                       GNUNET_TIME_UNIT_FOREVER_REL,
-                                       &process_migration_content, NULL);
+  if (dsh != NULL)
+    {
+      mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           &process_migration_content, NULL);
+      GNUNET_assert (mig_qe != NULL);
+    }
 }
 
 
 /**
- * We're done with a particular message list entry.
- * Free all associated resources.
+ * Task that is run periodically to obtain blocks for DHT PUTs.
  * 
- * @param pml entry to destroy
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
  */
 static void
-destroy_pending_message_list_entry (struct PendingMessageList *pml)
+gather_dht_put_blocks (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
-                              pml->req->pending_tail,
-                              pml);
-  GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
-                              pml->target->pending_messages_tail,
+  dht_task = GNUNET_SCHEDULER_NO_TASK;
+  if (dsh != NULL)
+    {
+      if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+       dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+      dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
+                                                   GNUNET_TIME_UNIT_FOREVER_REL,
+                                                   dht_put_type++,
+                                                   &process_dht_put_content, NULL);
+      GNUNET_assert (dht_qe != NULL);
+    }
+}
+
+
+/**
+ * We're done with a particular message list entry.
+ * Free all associated resources.
+ * 
+ * @param pml entry to destroy
+ */
+static void
+destroy_pending_message_list_entry (struct PendingMessageList *pml)
+{
+  GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
+                              pml->req->pending_tail,
+                              pml);
+  GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
+                              pml->target->pending_messages_tail,
                               pml->pm);
   pml->target->pending_requests--;
   GNUNET_free (pml->pm);
@@ -1008,12 +1485,20 @@ destroy_pending_message (struct PendingMessage *pm,
   TransmissionContinuation cont;
   void *cont_cls;
 
-  GNUNET_assert (pml->pm == pm);
-  GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
   cont = pm->cont;
   cont_cls = pm->cont_cls;
-  destroy_pending_message_list_entry (pml);
-  cont (cont_cls, tpid);  
+  if (pml != NULL)
+    {
+      GNUNET_assert (pml->pm == pm);
+      GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+      destroy_pending_message_list_entry (pml);
+    }
+  else
+    {
+      GNUNET_free (pm);
+    }
+  if (cont != NULL)
+    cont (cont_cls, tpid);  
 }
 
 
@@ -1027,6 +1512,7 @@ static void
 destroy_pending_request (struct PendingRequest *pr)
 {
   struct GNUNET_PeerIdentity pid;
+  unsigned int i;
 
   if (pr->hnode != NULL)
     {
@@ -1048,17 +1534,24 @@ destroy_pending_request (struct PendingRequest *pr)
                                -1,
                                GNUNET_NO);
     }
-  /* might have already been removed from map in 'process_reply' (if
-     there was a unique reply) or never inserted if it was a
-     duplicate; hence ignore the return value here */
-  (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
-                                              &pr->query,
-                                              pr);
+  if (GNUNET_YES == 
+      GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+                                           &pr->query,
+                                           pr))
+    {
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
+    }
   if (pr->qe != NULL)
      {
       GNUNET_DATASTORE_cancel (pr->qe);
       pr->qe = NULL;
     }
+  if (pr->dht_get != NULL)
+    {
+      GNUNET_DHT_get_stop (pr->dht_get);
+      pr->dht_get = NULL;
+    }
   if (pr->client_request_list != NULL)
     {
       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
@@ -1081,10 +1574,11 @@ destroy_pending_request (struct PendingRequest *pr)
       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                       
       pr->bf = NULL;
     }
-  if (pr->irc != NULL)
+  if (pr->pirc != NULL)
     {
-      GNUNET_CORE_peer_change_preference_cancel (pr->irc);
-      pr->irc = NULL;
+      GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
+      pr->pirc->irc = NULL;
+      pr->pirc = NULL;
     }
   if (pr->replies_seen != NULL)
     {
@@ -1093,45 +1587,85 @@ destroy_pending_request (struct PendingRequest *pr)
     }
   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (sched,
-                              pr->task);
+      GNUNET_SCHEDULER_cancel (pr->task);
       pr->task = GNUNET_SCHEDULER_NO_TASK;
     }
   while (NULL != pr->pending_head)    
     destroy_pending_message_list_entry (pr->pending_head);
   GNUNET_PEER_change_rc (pr->target_pid, -1);
-  if (pr->used_pids != NULL)
+  if (pr->used_targets != NULL)
     {
-      GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
-      GNUNET_free (pr->used_pids);
-      pr->used_pids_off = 0;
-      pr->used_pids_size = 0;
-      pr->used_pids = NULL;
+      for (i=0;i<pr->used_targets_off;i++)
+       GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
+      GNUNET_free (pr->used_targets);
+      pr->used_targets_off = 0;
+      pr->used_targets_size = 0;
+      pr->used_targets = NULL;
     }
   GNUNET_free (pr);
 }
 
 
+/**
+ * Find latency information in 'atsi'.
+ *
+ * @param atsi performance data
+ * @return connection latency
+ */
+static struct GNUNET_TIME_Relative
+get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+         (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
+    atsi++;
+  if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
+    {
+      GNUNET_break (0);
+      /* how can we not have latency data? */
+      return GNUNET_TIME_UNIT_SECONDS;
+    }
+  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                       ntohl (atsi->value));
+}
+
+
 /**
  * Method called whenever a given peer connects.
  *
  * @param cls closure, not used
  * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  */
 static void 
 peer_connect_handler (void *cls,
                      const struct
                      GNUNET_PeerIdentity * peer,
-                     struct GNUNET_TIME_Relative latency,
-                     uint32_t distance)
+                     const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   struct ConnectedPeer *cp;
   struct MigrationReadyBlock *pos;
-  
+  char *fn;
+  uint32_t trust;
+  struct GNUNET_TIME_Relative latency;
+
+  latency = get_latency (atsi);
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (NULL != cp)
+    {
+      GNUNET_break (0);
+      return;
+    }
   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+  cp->transmission_delay = GNUNET_LOAD_value_init (latency);
   cp->pid = GNUNET_PEER_intern (peer);
+
+  fn = get_trust_filename (peer);
+  if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
+      (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
+    cp->disk_trust = cp->trust = ntohl (trust);
+  GNUNET_free (fn);
+
   GNUNET_break (GNUNET_OK ==
                GNUNET_CONTAINER_multihashmap_put (connected_peers,
                                                   &peer->hashPubKey,
@@ -1147,6 +1681,138 @@ peer_connect_handler (void *cls,
 }
 
 
+/**
+ * Method called whenever a given peer has a status change.
+ *
+ * @param cls closure
+ * @param peer peer identity this notification is about
+ * @param bandwidth_in available amount of inbound bandwidth
+ * @param bandwidth_out available amount of outbound bandwidth
+ * @param timeout absolute time when this peer will time out
+ *        unless we see some further activity from it
+ * @param atsi status information
+ */
+static void
+peer_status_handler (void *cls,
+                    const struct
+                    GNUNET_PeerIdentity * peer,
+                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
+                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+                    struct GNUNET_TIME_Absolute timeout,
+                    const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  struct ConnectedPeer *cp;
+  struct GNUNET_TIME_Relative latency;
+
+  latency = get_latency (atsi);
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (cp == NULL)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  GNUNET_LOAD_value_set_decline (cp->transmission_delay,
+                                latency);  
+}
+
+
+
+/**
+ * Increase the host credit by a value.
+ *
+ * @param host which peer to change the trust value on
+ * @param value is the int value by which the
+ *  host credit is to be increased or decreased
+ * @returns the actual change in trust (positive or negative)
+ */
+static int
+change_host_trust (struct ConnectedPeer *host, int value)
+{
+  if (value == 0)
+    return 0;
+  GNUNET_assert (host != NULL);
+  if (value > 0)
+    {
+      if (host->trust + value < host->trust)
+        {
+          value = UINT32_MAX - host->trust;
+          host->trust = UINT32_MAX;
+        }
+      else
+        host->trust += value;
+    }
+  else
+    {
+      if (host->trust < -value)
+        {
+          value = -host->trust;
+          host->trust = 0;
+        }
+      else
+        host->trust += value;
+    }
+  return value;
+}
+
+
+/**
+ * Write host-trust information to a file - flush the buffer entry!
+ */
+static int
+flush_trust (void *cls,
+            const GNUNET_HashCode *key,
+            void *value)
+{
+  struct ConnectedPeer *host = value;
+  char *fn;
+  uint32_t trust;
+  struct GNUNET_PeerIdentity pid;
+
+  if (host->trust == host->disk_trust)
+    return GNUNET_OK;                     /* unchanged */
+  GNUNET_PEER_resolve (host->pid,
+                      &pid);
+  fn = get_trust_filename (&pid);
+  if (host->trust == 0)
+    {
+      if ((0 != UNLINK (fn)) && (errno != ENOENT))
+        GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
+                                  GNUNET_ERROR_TYPE_BULK, "unlink", fn);
+    }
+  else
+    {
+      trust = htonl (host->trust);
+      if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
+                                                   sizeof(uint32_t),
+                                                   GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
+                                                   | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
+        host->disk_trust = host->trust;
+    }
+  GNUNET_free (fn);
+  return GNUNET_OK;
+}
+
+/**
+ * Call this method periodically to scan data/hosts for new hosts.
+ */
+static void
+cron_flush_trust (void *cls,
+                 const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+
+  if (NULL == connected_peers)
+    return;
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &flush_trust,
+                                        NULL);
+  if (NULL == tc)
+    return;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+  GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
+}
+
 
 /**
  * Free (each) request made by the peer.
@@ -1210,6 +1876,14 @@ peer_disconnect_handler (void *cls,
                GNUNET_CONTAINER_multihashmap_remove (connected_peers,
                                                      &peer->hashPubKey,
                                                      cp));
+  if (cp->irc != NULL)
+    {
+      GNUNET_CORE_peer_change_preference_cancel (cp->irc);
+      cp->irc = NULL;
+      cp->pr->pirc = NULL;
+      cp->pr = NULL;
+    }
+
   /* remove this peer from migration considerations; schedule
      alternatives */
   next = mig_head;
@@ -1222,25 +1896,33 @@ peer_disconnect_handler (void *cls,
            {
              GNUNET_PEER_change_rc (pos->target_list[i], -1);
              pos->target_list[i] = 0;
-             if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
-               {
-                 delete_migration_block (pos);
-                 consider_migration_gathering ();
-               }
-             GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                                    &consider_migration,
-                                                    pos);
-             break;
-           }
+            }
+         }
+      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+       {
+         delete_migration_block (pos);
+         consider_migration_gathering ();
+          continue;
        }
+      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                            &consider_migration,
+                                            pos);
     }
-
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+    {
+      GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      cp->cth = NULL;
+    }
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (NULL != (pm = cp->pending_messages_head))
     destroy_pending_message (pm, 0 /* delivery failed */);
+  GNUNET_LOAD_value_free (cp->transmission_delay);
   GNUNET_break (0 == cp->pending_requests);
   GNUNET_free (cp);
 }
@@ -1369,14 +2051,25 @@ shutdown_task (void *cls,
       GNUNET_DATASTORE_cancel (mig_qe);
       mig_qe = NULL;
     }
+  if (dht_qe != NULL)
+    {
+      GNUNET_DATASTORE_cancel (dht_qe);
+      dht_qe = NULL;
+    }
   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
     {
-      GNUNET_SCHEDULER_cancel (sched, mig_task);
+      GNUNET_SCHEDULER_cancel (mig_task);
       mig_task = GNUNET_SCHEDULER_NO_TASK;
     }
+  if (GNUNET_SCHEDULER_NO_TASK != dht_task)
+    {
+      GNUNET_SCHEDULER_cancel (dht_task);
+      dht_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (client_list != NULL)
     handle_client_disconnect (NULL,
                              client_list->client);
+  cron_flush_trust (NULL, NULL);
   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
                                         &clean_peer,
                                         NULL);
@@ -1388,6 +2081,8 @@ shutdown_task (void *cls,
   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
   query_request_map = NULL;
+  GNUNET_LOAD_value_free (rt_entry_lifetime);
+  rt_entry_lifetime = NULL;
   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
   peer_request_map = NULL;
@@ -1399,20 +2094,69 @@ shutdown_task (void *cls,
       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
       stats = NULL;
     }
-  GNUNET_DATASTORE_disconnect (dsh,
-                              GNUNET_NO);
+  if (dsh != NULL)
+    {
+      GNUNET_DATASTORE_disconnect (dsh,
+                                  GNUNET_NO);
+      dsh = NULL;
+    }
   while (mig_head != NULL)
     delete_migration_block (mig_head);
   GNUNET_assert (0 == mig_size);
-  dsh = NULL;
-  sched = NULL;
+  GNUNET_DHT_disconnect (dht_handle);
+  dht_handle = NULL;
+  GNUNET_LOAD_value_free (datastore_get_load);
+  datastore_get_load = NULL;
+  GNUNET_LOAD_value_free (datastore_put_load);
+  datastore_put_load = NULL;
+  GNUNET_BLOCK_context_destroy (block_ctx);
+  block_ctx = NULL;
+  GNUNET_CONFIGURATION_destroy (block_cfg);
+  block_cfg = NULL;
   cfg = NULL;  
+  GNUNET_free_non_null (trustDirectory);
+  trustDirectory = NULL;
+  GNUNET_SCHEDULER_cancel (cover_age_task);
+  cover_age_task = GNUNET_SCHEDULER_NO_TASK;
 }
 
 
 /* ******************* Utility functions  ******************** */
 
 
+/**
+ * We've had to delay a request for transmission to core, but now
+ * we should be ready.  Run it.
+ *
+ * @param cls the 'struct ConnectedPeer' for which a request was delayed
+ * @param tc task context (unused)
+ */
+static void
+delayed_transmission_request (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct ConnectedPeer *cp = cls;
+  struct GNUNET_PeerIdentity pid;
+  struct PendingMessage *pm;
+
+  pm = cp->pending_messages_head;
+  cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (cp->cth == NULL);
+  if (pm == NULL)
+    return;
+  GNUNET_PEER_resolve (cp->pid,
+                      &pid);
+  cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              pm->priority,
+                                              GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                                              &pid,
+                                              pm->msize,
+                                              &transmit_to_peer,
+                                              cp);
+}
+
+
 /**
  * Transmit messages by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
@@ -1432,13 +2176,16 @@ transmit_to_peer (void *cls,
 {
   struct ConnectedPeer *cp = cls;
   char *cbuf = buf;
-  struct GNUNET_PeerIdentity pid;
   struct PendingMessage *pm;
+  struct PendingMessage *next_pm;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative min_delay;
   struct MigrationReadyBlock *mb;
   struct MigrationReadyBlock *next;
   struct PutMessage migm;
   size_t msize;
   unsigned int i;
+  struct GNUNET_PeerIdentity pid;
  
   cp->cth = NULL;
   if (NULL == buf)
@@ -1447,31 +2194,72 @@ transmit_to_peer (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping message, core too busy.\n");
 #endif
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "Dropping message, core too busy.\n");
+      GNUNET_LOAD_update (cp->transmission_delay,
+                         UINT64_MAX);
+      
+      if (NULL != (pm = cp->pending_messages_head))
+       {
+         GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+                                      cp->pending_messages_tail,
+                                      pm);
+         cp->pending_requests--;    
+         destroy_pending_message (pm, 0);
+       }
+      if (NULL != (pm = cp->pending_messages_head))
+       {
+         GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
+         min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
+         cp->delayed_transmission_request_task
+           = GNUNET_SCHEDULER_add_delayed (min_delay,
+                                           &delayed_transmission_request,
+                                           cp);
+       }
       return 0;
-    }
+    }  
+  GNUNET_LOAD_update (cp->transmission_delay,
+                     GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
+  now = GNUNET_TIME_absolute_get ();
   msize = 0;
-  while ( (NULL != (pm = cp->pending_messages_head) ) &&
+  min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+  next_pm = cp->pending_messages_head;
+  while ( (NULL != (pm = next_pm) ) &&
          (pm->msize <= size) )
     {
+      next_pm = pm->next;
+      if (pm->delay_until.abs_value > now.abs_value)
+       {
+         min_delay = GNUNET_TIME_relative_min (min_delay,
+                                               GNUNET_TIME_absolute_get_remaining (pm->delay_until));
+         continue;
+       }
       memcpy (&cbuf[msize], &pm[1], pm->msize);
       msize += pm->msize;
       size -= pm->msize;
+      if (NULL == pm->pml)
+       {
+         GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+                                      cp->pending_messages_tail,
+                                      pm);
+         cp->pending_requests--;
+       }
       destroy_pending_message (pm, cp->pid);
     }
-  if (NULL != pm)
-    {
-      GNUNET_PEER_resolve (cp->pid,
-                          &pid);
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  pm->priority,
-                                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,
-                                                  &pid,
-                                                  pm->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
+  if (pm != NULL)
+    min_delay = GNUNET_TIME_UNIT_ZERO;
+  if (NULL != cp->pending_messages_head)
+    {     
+      GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task
+       = GNUNET_SCHEDULER_add_delayed (min_delay,
+                                       &delayed_transmission_request,
+                                       cp);
     }
-  else
+  if (pm == NULL)
     {      
+      GNUNET_PEER_resolve (cp->pid,
+                          &pid);
       next = mig_head;
       while (NULL != (mb = next))
        {
@@ -1484,6 +2272,7 @@ transmit_to_peer (void *cls,
                  GNUNET_PEER_change_rc (mb->target_list[i], -1);
                  mb->target_list[i] = 0;
                  mb->used_targets++;
+                 memset (&migm, 0, sizeof (migm));
                  migm.header.size = htons (sizeof (migm) + mb->size);
                  migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
                  migm.type = htonl (mb->type);
@@ -1493,9 +2282,26 @@ transmit_to_peer (void *cls,
                  size -= sizeof (migm);
                  memcpy (&cbuf[msize], &mb[1], mb->size);
                  msize += mb->size;
-                 size -= mb->size;               
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             (unsigned int) mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
                  break;
                }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             (unsigned int) mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
            }
          if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
               (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
@@ -1510,9 +2316,9 @@ transmit_to_peer (void *cls,
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes to peer %u\n",
-             msize,
-             cp->pid);
+             "Transmitting %u bytes to peer with PID %u\n",
+             (unsigned int) msize,
+             (unsigned int) cp->pid);
 #endif
   return msize;
 }
@@ -1536,14 +2342,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
 
   GNUNET_assert (pm->next == NULL);
   GNUNET_assert (pm->pml == NULL);    
-  pml = GNUNET_malloc (sizeof (struct PendingMessageList));
-  pml->req = pr;
-  pml->target = cp;
-  pml->pm = pm;
-  pm->pml = pml;  
-  GNUNET_CONTAINER_DLL_insert (pr->pending_head,
-                              pr->pending_tail,
-                              pml);
+  if (pr != NULL)
+    {
+      pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+      pml->req = pr;
+      pml->target = cp;
+      pml->pm = pm;
+      pm->pml = pml;  
+      GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+                                  pr->pending_tail,
+                                  pml);
+    }
   pos = cp->pending_messages_head;
   while ( (pos != NULL) &&
          (pm->priority < pos->priority) )
@@ -1554,11 +2363,26 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
                                     pm);
   cp->pending_requests++;
   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
-    destroy_pending_message (cp->pending_messages_tail, 0);  
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches discarded (queue length bound)"),
+                               1,
+                               GNUNET_NO);
+      destroy_pending_message (cp->pending_messages_tail, 0);  
+    }
   GNUNET_PEER_resolve (cp->pid, &pid);
   if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+    {
+      GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      cp->cth = NULL;
+    }
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   /* need to schedule transmission */
+  cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
                                               cp->pending_messages_head->priority,
                                               MAX_TRANSMIT_DELAY,
@@ -1572,39 +2396,62 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Failed to schedule transmission with core!\n");
 #endif
-      /* FIXME: call stats (rare, bad case) */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# CORE transmission failures"),
+                               1,
+                               GNUNET_NO);
     }
 }
 
 
 /**
- * Mingle hash with the mingle_number to produce different bits.
+ * Test if the DATABASE (GET) load on this peer is too high
+ * to even consider processing the query at
+ * all.  
+ * 
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal)
+ *         GNUNET_SYSERR to process for free (load low)
  */
-static void
-mingle_hash (const GNUNET_HashCode * in,
-            int32_t mingle_number, 
-            GNUNET_HashCode * hc)
+static int
+test_get_load_too_high (uint32_t priority)
 {
-  GNUNET_HashCode m;
+  double ld;
 
-  GNUNET_CRYPTO_hash (&mingle_number, 
-                     sizeof (int32_t), 
-                     &m);
-  GNUNET_CRYPTO_hash_xor (&m, in, hc);
+  ld = GNUNET_LOAD_get_load (datastore_get_load);
+  if (ld < 1)
+    return GNUNET_SYSERR;    
+  if (ld <= priority)    
+    return GNUNET_NO;    
+  return GNUNET_YES;
 }
 
 
+
+
 /**
- * Test if the load on this peer is too high
+ * Test if the DATABASE (PUT) load on this peer is too high
  * to even consider processing the query at
- * all.
+ * all.  
  * 
- * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal or low)
  */
 static int
-test_load_too_high ()
+test_put_load_too_high (uint32_t priority)
 {
-  return GNUNET_NO; // FIXME
+  double ld;
+
+  if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
+    return GNUNET_NO; /* very fast */
+  ld = GNUNET_LOAD_get_load (datastore_put_load);
+  if (ld < 2.0 * (1 + priority))
+    return GNUNET_NO;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# storage requests dropped due to high load"),
+                           1,
+                           GNUNET_NO);
+  return GNUNET_YES;
 }
 
 
@@ -1662,6 +2509,7 @@ transmit_query_continuation (void *cls,
                             GNUNET_PEER_Id tpid)
 {
   struct PendingRequest *pr = cls;
+  unsigned int i;
 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# queries scheduled for forwarding"),
@@ -1674,25 +2522,39 @@ transmit_query_continuation (void *cls,
                  "Transmission of request failed, will try again later.\n");
 #endif
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                                 &forward_request_task,
                                                 pr); 
       return;    
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted query `%s'\n",
+             GNUNET_h2s (&pr->query));
+#endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# queries forwarded"),
                            1,
                            GNUNET_NO);
-  GNUNET_PEER_change_rc (tpid, 1);
-  if (pr->used_pids_off == pr->used_pids_size)
-    GNUNET_array_grow (pr->used_pids,
-                      pr->used_pids_size,
-                      pr->used_pids_size * 2 + 2);
-  pr->used_pids[pr->used_pids_off++] = tpid;
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == tpid)
+      break; /* found match! */    
+  if (i == pr->used_targets_off)
+    {
+      /* need to create new entry */
+      if (pr->used_targets_off == pr->used_targets_size)
+       GNUNET_array_grow (pr->used_targets,
+                          pr->used_targets_size,
+                          pr->used_targets_size * 2 + 2);
+      GNUNET_PEER_change_rc (tpid, 1);
+      pr->used_targets[pr->used_targets_off].pid = tpid;
+      pr->used_targets[pr->used_targets_off].num_requests = 0;
+      i = pr->used_targets_off++;
+    }
+  pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
+  pr->used_targets[i].num_requests++;
   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-    pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                            get_processing_delay (),
+    pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                             &forward_request_task,
                                             pr);
 }
@@ -1756,7 +2618,9 @@ refresh_bloomfilter (struct PendingRequest *pr)
                                              BLOOMFILTER_K);
   for (i=0;i<pr->replies_seen_off;i++)
     {
-      mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
+      GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
+                               pr->mingle,
+                               &mhash);
       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
     }
 }
@@ -1769,7 +2633,6 @@ refresh_bloomfilter (struct PendingRequest *pr)
  *
  * @param cls the requests "struct PendingRequest*"
  * @param peer identifies the peer
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
  * @param bpm_out set to the current bandwidth limit (sending) for this peer
  * @param amount set to the amount that was actually reserved or unreserved
  * @param preference current traffic preference for the given peer
@@ -1778,7 +2641,6 @@ static void
 target_reservation_cb (void *cls,
                       const struct
                       GNUNET_PeerIdentity * peer,
-                      struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
                       struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
                       int amount,
                       uint64_t preference)
@@ -1793,19 +2655,9 @@ target_reservation_cb (void *cls,
   unsigned int k;
   int no_route;
   uint32_t bm;
+  unsigned int i;
 
-  pr->irc = NULL;
-  if (peer == NULL)
-    {
-      /* error in communication with core, try again later */
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
-                                                &forward_request_task,
-                                                pr);
-      return;
-    }
-  // (3) transmit, update ttl/priority
+  /* (3) transmit, update ttl/priority */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &peer->hashPubKey);
   if (cp == NULL)
@@ -1816,18 +2668,23 @@ target_reservation_cb (void *cls,
                  "Selected peer disconnected!\n");
 #endif
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
+      return;
+    }
+  cp->irc = NULL;
+  pr->pirc = NULL;
+  if (peer == NULL)
+    {
+      /* error in communication with core, try again later */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                                 &forward_request_task,
                                                 pr);
       return;
     }
   no_route = GNUNET_NO;
-  /* FIXME: check against DBLOCK_SIZE and possibly return
-     amount to reserve; however, this also needs to work
-     with testcases which currently start out with a far
-     too low per-peer bw limit, so they would never send
-     anything.  Big issue. */
   if (amount == 0)
     {
       if (pr->cp == NULL)
@@ -1843,14 +2700,11 @@ target_reservation_cb (void *cls,
                                    1,
                                    GNUNET_NO);
          if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                    get_processing_delay (),
+           pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
                                                     &forward_request_task,
                                                     pr);
          return;  /* this target round failed */
        }
-      /* FIXME: if we are "quite" busy, we may still want to skip
-        this round; need more load detection code! */
       no_route = GNUNET_YES;
     }
   
@@ -1858,6 +2712,16 @@ target_reservation_cb (void *cls,
                            gettext_noop ("# queries scheduled for forwarding"),
                            1,
                            GNUNET_NO);
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == cp->pid) 
+      {
+       GNUNET_STATISTICS_update (stats,
+                                 gettext_noop ("# queries retransmitted to same target"),
+                                 1,
+                                 GNUNET_NO);
+       break;
+      } 
+
   /* build message and insert message into priority queue */
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1911,6 +2775,7 @@ target_reservation_cb (void *cls,
                                               pr->bf_size);
   pm->cont = &transmit_query_continuation;
   pm->cont_cls = pr;
+  cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
   add_to_pending_messages_for_peer (cp, pm, pr);
 }
 
@@ -1936,6 +2801,11 @@ struct PeerSelectionContext
    */
   double target_score;
 
+  /**
+   * Does it make sense to we re-try quickly again?
+   */
+  int fast_retry;
+
 };
 
 
@@ -1958,49 +2828,118 @@ target_peer_select_cb (void *cls,
   struct PeerSelectionContext *psc = cls;
   struct ConnectedPeer *cp = value;
   struct PendingRequest *pr = psc->pr;
+  struct GNUNET_TIME_Relative delay;
   double score;
   unsigned int i;
   unsigned int pc;
 
   /* 1) check that this peer is not the initiator */
-  if (cp == pr->cp)
-    return GNUNET_YES; /* skip */         
+  if (cp == pr->cp)     
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Skipping initiator in forwarding selection\n");
+#endif
+      return GNUNET_YES; /* skip */       
+    }
+  if (cp->irc != NULL)
+    {
+      psc->fast_retry = GNUNET_YES;
+      return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
+    }
 
   /* 2) check if we have already (recently) forwarded to this peer */
+  /* 2a) this particular request */
   pc = 0;
-  for (i=0;i<pr->used_pids_off;i++)
-    if (pr->used_pids[i] == cp->pid) 
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == cp->pid) 
       {
-       pc++;
+       pc = pr->used_targets[i].num_requests;
+       GNUNET_assert (pc > 0);
+       /* FIXME: make re-enabling a peer independent of how often
+          this function is called??? */
        if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                          RETRY_PROBABILITY_INV))
+                                          RETRY_PROBABILITY_INV * pc))
          {
 #if DEBUG_FS
            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                        "NOT re-trying query that was previously transmitted %u times\n",
-                       (unsigned int) pr->used_pids_off);
+                       (unsigned int) pc);
 #endif
            return GNUNET_YES; /* skip */
          }
+       break;
       }
 #if DEBUG_FS
   if (0 < pc)
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-               "Re-trying query that was previously transmitted %u times to this peer\n",
-               (unsigned int) pc);
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Re-trying query that was previously transmitted %u times to this peer\n",
+                 (unsigned int) pc);
+    }
 #endif
-  // 3) calculate how much we'd like to forward to this peer
-  score = 42; // FIXME!
-  // FIXME: also need API to gather data on responsiveness
-  // of this peer (we have fields for that in 'cp', but
-  // they are never set!)
-  
+  /* 2b) many other requests to this peer */
+  delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
+  if (delay.rel_value <= cp->avg_delay.rel_value)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "NOT sending query since we send %u others to this peer in the last %llums\n",
+                 MAX_QUEUE_PER_PEER,
+                 cp->avg_delay.rel_value);
+#endif
+      return GNUNET_YES; /* skip */      
+    }
+
+  /* 3) calculate how much we'd like to forward to this peer,
+     starting with a random value that is strong enough
+     to at least give any peer a chance sometimes 
+     (compared to the other factors that come later) */
+  /* 3a) count successful (recent) routes from cp for same source */
+  if (pr->cp != NULL)
+    {
+      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       P2P_SUCCESS_LIST_SIZE);
+      for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
+       if (cp->last_p2p_replies[i] == pr->cp->pid)
+         score += 1.0; /* likely successful based on hot path */
+    }
+  else
+    {
+      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       CS2P_SUCCESS_LIST_SIZE);
+      for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+       if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
+         score += 1.0; /* likely successful based on hot path */
+    }
+  /* 3b) include latency */
+  if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
+    score += 1.0; /* likely fast based on latency */
+  /* 3c) include priorities */
+  if (cp->avg_priority <= pr->remaining_priority / 2.0)
+    score += 1.0; /* likely successful based on priorities */
+  /* 3d) penalize for queue size */  
+  score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
+  /* 3e) include peer proximity */
+  score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
+                                                   &pr->query)) / (double) UINT32_MAX);
+  /* 4) super-bonus for being the known target */
+  if (pr->target_pid == cp->pid)
+    score += 100.0;
   /* store best-fit in closure */
+  score++; /* avoid zero */
   if (score > psc->target_score)
     {
       psc->target_score = score;
       psc->target.hashPubKey = *key; 
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Peer `%s' gets score %f for forwarding query, max is %8f\n",
+             GNUNET_h2s (key),
+             score,
+             psc->target_score);
+#endif
   return GNUNET_YES;
 }
   
@@ -2033,7 +2972,33 @@ bound_ttl (int32_t ttl_in, uint32_t prio)
 
 
 /**
- * We're processing a GET request from another peer and have decided
+ * Iterator called on each result obtained for a DHT
+ * operation that expects a reply
+ *
+ * @param cls closure
+ * @param exp when will this value expire
+ * @param key key of the result
+ * @param get_path NULL-terminated array of pointers
+ *                 to the peers on reverse GET path (or NULL if not recorded)
+ * @param put_path NULL-terminated array of pointers
+ *                 to the peers on the PUT path (or NULL if not recorded)
+ * @param type type of the result
+ * @param size number of bytes in data
+ * @param data pointer to the result data
+ */
+static void
+process_dht_reply (void *cls,
+                  struct GNUNET_TIME_Absolute exp,
+                  const GNUNET_HashCode * key,
+                  const struct GNUNET_PeerIdentity * const *get_path,
+                  const struct GNUNET_PeerIdentity * const *put_path,
+                  enum GNUNET_BLOCK_Type type,
+                  size_t size,
+                  const void *data);
+
+
+/**
+ * We're processing a GET request and have decided
  * to forward it to other peers.  This function is called periodically
  * and should forward the request to other peers until we have all
  * possible replies.  If we have transmitted the *only* reply to
@@ -2054,7 +3019,7 @@ forward_request_task (void *cls,
   struct GNUNET_TIME_Relative delay;
 
   pr->task = GNUNET_SCHEDULER_NO_TASK;
-  if (pr->irc != NULL)
+  if (pr->pirc != NULL)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2065,37 +3030,82 @@ forward_request_task (void *cls,
     }
   if (GNUNET_YES == pr->local_only)
     return; /* configured to not do P2P search */
+  /* (0) try DHT */
+  if ( (0 == pr->anonymity_level) &&
+       (GNUNET_YES != pr->forward_only) &&
+       (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
+       (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
+    {
+      pr->dht_get = GNUNET_DHT_get_start (dht_handle,
+                                         GNUNET_TIME_UNIT_FOREVER_REL,
+                                         pr->type,
+                                         &pr->query,
+                                         DEFAULT_GET_REPLICATION,
+                                         GNUNET_DHT_RO_NONE,
+                                         pr->bf,
+                                         pr->mingle,
+                                         pr->namespace,
+                                         (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
+                                         &process_dht_reply,
+                                         pr);
+    }
+
+  if ( (pr->anonymity_level > 1) &&
+       (cover_query_count < pr->anonymity_level - 1) )
+    {
+      delay = get_processing_delay ();
+#if DEBUG_FS 
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
+                 GNUNET_h2s (&pr->query),
+                 delay.rel_value);
+#endif
+      pr->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                              &forward_request_task,
+                                              pr);
+      return;
+    }
+  /* consume cover traffic */
+  if (pr->anonymity_level > 1) 
+    cover_query_count -= pr->anonymity_level - 1;
+
   /* (1) select target */
   psc.pr = pr;
-  psc.target_score = DBL_MIN;
+  psc.target_score = -DBL_MAX;
+  psc.fast_retry = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
                                         &target_peer_select_cb,
                                         &psc);  
-  if (psc.target_score == DBL_MIN)
+  if (psc.target_score == -DBL_MAX)
     {
-      delay = get_processing_delay ();
+      if (psc.fast_retry == GNUNET_YES)
+       delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
+      else
+       delay = get_processing_delay ();
 #if DEBUG_FS 
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
                  GNUNET_h2s (&pr->query),
-                 delay.value);
+                 delay.rel_value);
 #endif
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              delay,
+      pr->task = GNUNET_SCHEDULER_add_delayed (delay,
                                               &forward_request_task,
                                               pr);
       return; /* nobody selected */
     }
   /* (3) update TTL/priority */
-  
   if (pr->client_request_list != NULL)
     {
       /* FIXME: use better algorithm!? */
       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                         4))
        pr->priority++;
-      /* FIXME: bound priority by "customary" priority used by other peers
-        at this time! */
+      /* bound priority we use by priorities we see from other peers
+        rounded up (must round up so that we can see non-zero
+        priorities, but round up as little as possible to make it
+        plausible that we forwarded another peers request) */
+      if (pr->priority > current_priorities + 1.0)
+       pr->priority = (uint32_t) current_priorities + 1.0;
       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
                           pr->priority);
 #if DEBUG_FS
@@ -2106,24 +3116,34 @@ forward_request_task (void *cls,
                  pr->ttl);
 #endif
     }
+
+  /* (3) reserve reply bandwidth */
+  if (GNUNET_NO == pr->forward_only)
+    {
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &psc.target.hashPubKey);
+      GNUNET_assert (NULL != cp);
+      GNUNET_assert (cp->irc == NULL);
+      pr->pirc = cp;
+      cp->pr = pr;
+      cp->irc = GNUNET_CORE_peer_change_preference (core,
+                                                   &psc.target,
+                                                   GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
+                                                   GNUNET_BANDWIDTH_value_init (UINT32_MAX),
+                                                   DBLOCK_SIZE * 2, 
+                                                   cp->inc_preference,
+                                                   &target_reservation_cb,
+                                                   pr);
+      GNUNET_assert (cp->irc != NULL);
+      cp->inc_preference = 0;
+    }
   else
     {
-      /* FIXME: should we do something here as well!? */
+      /* force forwarding */
+      static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
+      target_reservation_cb (pr, &psc.target,
+                            zerobw, 0, 0.0);
     }
-
-  /* (3) reserve reply bandwidth */
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &psc.target.hashPubKey);
-  GNUNET_assert (NULL != cp);
-  pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
-                                               &psc.target,
-                                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
-                                               DBLOCK_SIZE * 2, 
-                                               (uint64_t) cp->inc_preference,
-                                               &target_reservation_cb,
-                                               pr);
-  cp->inc_preference = 0.0;
 }
 
 
@@ -2145,14 +3165,14 @@ transmit_reply_continuation (void *cls,
   
   switch (pr->type)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one reply expected, done with the request! */
       destroy_pending_request (pr);
       break;
     case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_KBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_SBLOCK:
       break;
     default:
       GNUNET_break (0);
@@ -2230,7 +3250,7 @@ struct ProcessReplyClosure
   const void *data;
 
   /**
-   * Who gave us this reply? NULL for local host.
+   * Who gave us this reply? NULL for local host (or DHT)
    */
   struct ConnectedPeer *sender;
 
@@ -2244,12 +3264,6 @@ struct ProcessReplyClosure
    */
   size_t size;
 
-  /**
-   * Namespace that this reply belongs to
-   * (if it is of type SBLOCK).
-   */
-  GNUNET_HashCode namespace;
-
   /**
    * Type of the block.
    */
@@ -2259,6 +3273,26 @@ struct ProcessReplyClosure
    * How much was this reply worth to us?
    */
   uint32_t priority;
+
+  /**
+   * Anonymity requirements for this reply.
+   */
+  uint32_t anonymity_level;
+
+  /**
+   * Evaluation result (returned).
+   */
+  enum GNUNET_BLOCK_EvaluationResult eval;
+
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
+
+  /**
+   * Did we find a matching request?
+   */
+  int request_found;
 };
 
 
@@ -2283,10 +3317,28 @@ process_reply (void *cls,
   struct PutMessage *pm;
   struct ConnectedPeer *cp;
   struct GNUNET_TIME_Relative cur_delay;
-  GNUNET_HashCode chash;
-  GNUNET_HashCode mhash;
+#if SUPPORT_DELAYS  
+struct GNUNET_TIME_Relative art_delay;
+#endif
   size_t msize;
+  unsigned int i;
 
+  if (NULL == pr->client_request_list)
+    {
+      /* reply will go over the network, check for cover traffic */
+      if ( (prq->anonymity_level >  1) &&
+          (cover_content_count < prq->anonymity_level - 1) )
+       {
+         /* insufficient cover traffic, skip */
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# replies suppressed due to lack of cover traffic"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_YES;
+       }       
+      if (prq->anonymity_level >  1) 
+       cover_content_count -= prq->anonymity_level - 1;
+    }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Matched result (type %u) for query `%s' with pending request\n",
@@ -2299,19 +3351,24 @@ process_reply (void *cls,
                            GNUNET_NO);
   if (prq->sender != NULL)
     {
-      /* FIXME: should we be more precise here and not use
-        "start_time" but a peer-specific time stamp? */
-      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
-      prq->sender->avg_delay.value
-       = (prq->sender->avg_delay.value * 
-          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
-      prq->sender->avg_priority
-       = (prq->sender->avg_priority * 
-          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+      for (i=0;i<pr->used_targets_off;i++)
+       if (pr->used_targets[i].pid == prq->sender->pid)
+         break;
+      if (i < pr->used_targets_off)
+       {
+         cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
+         prq->sender->avg_delay.rel_value
+           = (prq->sender->avg_delay.rel_value * 
+              (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
+         prq->sender->avg_priority
+           = (prq->sender->avg_priority * 
+              (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+       }
       if (pr->cp != NULL)
        {
          GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
-                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
          GNUNET_PEER_change_rc (pr->cp->pid, 1);
          prq->sender->last_p2p_replies
            [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
@@ -2329,14 +3386,19 @@ process_reply (void *cls,
          GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
        }
     }
-  GNUNET_CRYPTO_hash (prq->data,
-                     prq->size,
-                     &chash);
-  switch (prq->type)
+  prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+                                    prq->type,
+                                    key,
+                                    &pr->bf,
+                                    pr->mingle,
+                                    pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
+                                    prq->data,
+                                    prq->size);
+  switch (prq->eval)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-      /* only possible reply, stop requesting! */
+    case GNUNET_BLOCK_EVALUATION_OK_MORE:
+      break;
+    case GNUNET_BLOCK_EVALUATION_OK_LAST:
       while (NULL != pr->pending_head)
        destroy_pending_message_list_entry (pr->pending_head);
       if (pr->qe != NULL)
@@ -2344,82 +3406,75 @@ process_reply (void *cls,
          if (pr->client_request_list != NULL)
            GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
                                        GNUNET_YES);
-         GNUNET_DATASTORE_cancel (pr->qe);
+         GNUNET_DATASTORE_cancel (pr->qe);
          pr->qe = NULL;
        }
       pr->do_remove = GNUNET_YES;
       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
        {
-         GNUNET_SCHEDULER_cancel (sched,
-                                  pr->task);
+         GNUNET_SCHEDULER_cancel (pr->task);
          pr->task = GNUNET_SCHEDULER_NO_TASK;
        }
       GNUNET_break (GNUNET_YES ==
                    GNUNET_CONTAINER_multihashmap_remove (query_request_map,
                                                          key,
                                                          pr));
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
       break;
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-      if (pr->namespace == NULL)
-       {
-         GNUNET_break (0);
-         return GNUNET_YES;
-       }
-      if (0 != memcmp (pr->namespace,
-                      &prq->namespace,
-                      sizeof (GNUNET_HashCode)))
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Reply mismatched in terms of namespace.  Discarded.\n"));
-         return GNUNET_YES; /* wrong namespace */      
-       }
-      /* then: fall-through! */
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_NBLOCK:
-      if (pr->bf != NULL) 
-       {
-         mingle_hash (&chash, pr->mingle, &mhash);
-         if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
-                                                              &mhash))
-           {
-             GNUNET_STATISTICS_update (stats,
-                                       gettext_noop ("# duplicate replies discarded (bloomfilter)"),
-                                       1,
-                                       GNUNET_NO);
-#if DEBUG_FS
-             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                         "Duplicate response `%s', discarding.\n",
-                         GNUNET_h2s (&mhash));
-#endif
-             return GNUNET_YES; /* duplicate */
-           }
+    case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# duplicate replies discarded (bloomfilter)"),
+                               1,
+                               GNUNET_NO);
 #if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "New response `%s', adding to filter.\n",
-                     GNUNET_h2s (&mhash));
+/*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Duplicate response `%s', discarding.\n",
+                 GNUNET_h2s (&mhash));*/
 #endif
-       }
-      if (pr->client_request_list != NULL)
-       {
-         if (pr->replies_seen_size == pr->replies_seen_off)
-           GNUNET_array_grow (pr->replies_seen,
-                              pr->replies_seen_size,
-                              pr->replies_seen_size * 2 + 4);  
-           pr->replies_seen[pr->replies_seen_off++] = chash;         
-       }
-      if ( (pr->bf == NULL) ||
-          (pr->client_request_list != NULL) )
-       refresh_bloomfilter (pr);
-      GNUNET_CONTAINER_bloomfilter_add (pr->bf,
-                                       &mhash);
-      break;
-    default:
+      return GNUNET_YES; /* duplicate */
+    case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+      return GNUNET_YES; /* wrong namespace */ 
+    case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+      GNUNET_break (0);
+      return GNUNET_YES;
+    case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
       GNUNET_break (0);
       return GNUNET_YES;
+    case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 _("Unsupported block type %u\n"),
+                 prq->type);
+      return GNUNET_NO;
+    }
+  if (pr->client_request_list != NULL)
+    {
+      if (pr->replies_seen_size == pr->replies_seen_off)
+       GNUNET_array_grow (pr->replies_seen,
+                          pr->replies_seen_size,
+                          pr->replies_seen_size * 2 + 4);      
+      GNUNET_CRYPTO_hash (prq->data,
+                         prq->size,
+                         &pr->replies_seen[pr->replies_seen_off++]);         
+      refresh_bloomfilter (pr);
+    }
+  if (NULL == prq->sender)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Found result for query `%s' in local datastore\n",
+                 GNUNET_h2s (key));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# results found locally"),
+                               1,
+                               GNUNET_NO);      
     }
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
-  if (pr->client_request_list != NULL)
+  pr->results_found++;
+  prq->request_found = GNUNET_YES;
+  if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
                                gettext_noop ("# replies received for local clients"),
@@ -2455,7 +3510,10 @@ process_reply (void *cls,
        }
       GNUNET_break (cl->th != NULL);
       if (pr->do_remove)               
-       destroy_pending_request (pr);           
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
     }
   else
     {
@@ -2474,8 +3532,19 @@ process_reply (void *cls,
       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+      art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                          TTL_DECREMENT));
+      reply->delay_until 
+       = GNUNET_TIME_relative_to_absolute (art_delay);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("cummulative artificial delay introduced (ms)"),
+                               art_delay.abs_value,
+                               GNUNET_NO);
+#endif
       reply->msize = msize;
-      reply->priority = (uint32_t) -1; /* send replies first! */
+      reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
       pm->header.size = htons (msize);
@@ -2488,6 +3557,44 @@ process_reply (void *cls,
 }
 
 
+/**
+ * Iterator called on each result obtained for a DHT
+ * operation that expects a reply
+ *
+ * @param cls closure
+ * @param exp when will this value expire
+ * @param key key of the result
+ * @param get_path NULL-terminated array of pointers
+ *                 to the peers on reverse GET path (or NULL if not recorded)
+ * @param put_path NULL-terminated array of pointers
+ *                 to the peers on the PUT path (or NULL if not recorded)
+ * @param type type of the result
+ * @param size number of bytes in data
+ * @param data pointer to the result data
+ */
+static void
+process_dht_reply (void *cls,
+                  struct GNUNET_TIME_Absolute exp,
+                  const GNUNET_HashCode * key,
+                  const struct GNUNET_PeerIdentity * const *get_path,
+                  const struct GNUNET_PeerIdentity * const *put_path,
+                  enum GNUNET_BLOCK_Type type,
+                  size_t size,
+                  const void *data)
+{
+  struct PendingRequest *pr = cls;
+  struct ProcessReplyClosure prq;
+
+  memset (&prq, 0, sizeof (prq));
+  prq.data = data;
+  prq.expiration = exp;
+  prq.size = size;  
+  prq.type = type;
+  process_reply (&prq, key, pr);
+}
+
+
+
 /**
  * Continuation called to notify client about result of the
  * operation.
@@ -2501,7 +3608,19 @@ put_migration_continuation (void *cls,
                            int success,
                            const char *msg)
 {
-  /* FIXME */
+  struct GNUNET_TIME_Absolute *start = cls;
+  struct GNUNET_TIME_Relative delay;
+  
+  delay = GNUNET_TIME_absolute_get_duration (*start);
+  GNUNET_free (start);
+  GNUNET_LOAD_update (datastore_put_load,
+                     delay.rel_value);
+  if (GNUNET_OK == success)
+    return;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# datastore 'put' failures"),
+                           1,
+                           GNUNET_NO);
 }
 
 
@@ -2512,8 +3631,7 @@ put_migration_continuation (void *cls,
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
  */
@@ -2521,8 +3639,7 @@ static int
 handle_p2p_put (void *cls,
                const struct GNUNET_PeerIdentity *other,
                const struct GNUNET_MessageHeader *message,
-               struct GNUNET_TIME_Relative latency,
-               uint32_t distance)
+               const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   const struct PutMessage *put;
   uint16_t msize;
@@ -2531,7 +3648,12 @@ handle_p2p_put (void *cls,
   struct GNUNET_TIME_Absolute expiration;
   GNUNET_HashCode query;
   struct ProcessReplyClosure prq;
-  const struct SBlock *sb;
+  struct GNUNET_TIME_Absolute *start;
+  struct GNUNET_TIME_Relative block_time;  
+  double putl;
+  struct ConnectedPeer *cp; 
+  struct PendingMessage *pm;
+  struct MigrationStopMessage *msm;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PutMessage))
@@ -2544,25 +3666,19 @@ handle_p2p_put (void *cls,
   type = ntohl (put->type);
   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
 
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+    return GNUNET_SYSERR;
   if (GNUNET_OK !=
-      GNUNET_BLOCK_check_block (type,
-                               &put[1],
-                               dsize,
-                               &query))
+      GNUNET_BLOCK_get_key (block_ctx,
+                           type,
+                           &put[1],
+                           dsize,
+                           &query))
     {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
-    return GNUNET_SYSERR;
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
-    { 
-      sb = (const struct SBlock*) &put[1];
-      GNUNET_CRYPTO_hash (&sb->subspace,
-                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                         &prq.namespace);
-    }
-
+  cover_content_count++;
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received result for query `%s' from peer `%4s'\n",
@@ -2578,16 +3694,35 @@ handle_p2p_put (void *cls,
   if (other != NULL)
     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                                    &other->hashPubKey);
+  else
+    prq.sender = NULL;
   prq.size = dsize;
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.anonymity_level = 1;
+  prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
                                              &prq);
-  if (GNUNET_YES == active_migration)
+  if (prq.sender != NULL)
     {
+      prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
+      change_host_trust (prq.sender, prq.priority);
+    }
+  if ( (GNUNET_YES == active_to_migration) &&
+       (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+    {      
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
+      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+      *start = GNUNET_TIME_absolute_get ();
       GNUNET_DATASTORE_put (dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -2595,12 +3730,75 @@ handle_p2p_put (void *cls,
                            1 + prq.priority, MAX_DATASTORE_QUEUE,
                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                            &put_migration_continuation, 
-                           NULL);
+                           start);
+    }
+  putl = GNUNET_LOAD_get_load (datastore_put_load);
+  if ( (GNUNET_NO == prq.request_found) &&
+       ( (GNUNET_YES != active_to_migration) ||
+                (putl > 2.5 * (1 + prq.priority)) ) )
+    {
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &other->hashPubKey);
+      if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
+       return GNUNET_OK; /* already blocked */
+      /* We're too busy; send MigrationStop message! */
+      if (GNUNET_YES != active_to_migration) 
+       putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+      block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                                  (unsigned int) (60000 * putl * putl)));
+      
+      cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+      pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
+                         sizeof (struct MigrationStopMessage));
+      pm->msize = sizeof (struct MigrationStopMessage);
+      pm->priority = UINT32_MAX;
+      msm = (struct MigrationStopMessage*) &pm[1];
+      msm->header.size = htons (sizeof (struct MigrationStopMessage));
+      msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+      msm->duration = GNUNET_TIME_relative_hton (block_time);
+      add_to_pending_messages_for_peer (cp,
+                                       pm,
+                                       NULL);
+    }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle P2P "MIGRATION_STOP" message.
+ *
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ *        for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param atsi performance information
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_p2p_migration_stop (void *cls,
+                          const struct GNUNET_PeerIdentity *other,
+                          const struct GNUNET_MessageHeader *message,
+                          const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  struct ConnectedPeer *cp; 
+  const struct MigrationStopMessage *msm;
+
+  msm = (const struct MigrationStopMessage*) message;
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &other->hashPubKey);
+  if (cp == NULL)
+    {
+      GNUNET_break (0);
+      return GNUNET_OK;
     }
+  cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
   return GNUNET_OK;
 }
 
 
+
 /* **************************** P2P GET Handling ************************ */
 
 
@@ -2674,7 +3872,7 @@ check_duplicate_request_client (void *cls,
 static void
 process_local_reply (void *cls,
                     const GNUNET_HashCode * key,
-                    uint32_t size,
+                    size_t size,
                     const void *data,
                     enum GNUNET_BLOCK_Type type,
                     uint32_t priority,
@@ -2686,10 +3884,8 @@ process_local_reply (void *cls,
   struct PendingRequest *pr = cls;
   struct ProcessReplyClosure prq;
   struct CheckDuplicateRequestClosure cdrc;
-  const struct SBlock *sb;
-  GNUNET_HashCode dhash;
-  GNUNET_HashCode mhash;
   GNUNET_HashCode query;
+  unsigned int old_rf;
   
   if (NULL == key)
     {
@@ -2722,11 +3918,14 @@ process_local_reply (void *cls,
              return;
            }
        }
-
+      if (pr->local_only == GNUNET_YES)
+       {
+         destroy_pending_request (pr);
+         return;
+       }
       /* no more results */
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                            &forward_request_task,
+       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
                                             pr);      
       return;
     }
@@ -2736,7 +3935,7 @@ process_local_reply (void *cls,
              GNUNET_h2s (key),
              type);
 #endif
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2752,56 +3951,22 @@ process_local_reply (void *cls,
                                            &process_local_reply,
                                            pr))
       if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-  /* check for duplicates */
-  GNUNET_CRYPTO_hash (data, size, &dhash);
-  mingle_hash (&dhash, 
-              pr->mingle,
-              &mhash);
-  if ( (pr->bf != NULL) &&
-       (GNUNET_YES ==
-       GNUNET_CONTAINER_bloomfilter_test (pr->bf,
-                                          &mhash)) )
-    {      
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result from datastore filtered by bloomfilter (duplicate).\n");
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# results filtered by query bloomfilter"),
-                               1,
-                               GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       {
+         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       }
       return;
     }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Found result for query `%s' in local datastore\n",
-             GNUNET_h2s (key));
-#endif
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# results found locally"),
-                           1,
-                           GNUNET_NO);
-  pr->results_found++;
+  old_rf = pr->results_found;
   memset (&prq, 0, sizeof (prq));
   prq.data = data;
   prq.expiration = expiration;
   prq.size = size;  
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
-    { 
-      sb = (const struct SBlock*) data;
-      GNUNET_CRYPTO_hash (&sb->subspace,
-                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                         &prq.namespace);
-    }
-  if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
-                                            data,
-                                            size,
-                                            &query))
+  if (GNUNET_OK != 
+      GNUNET_BLOCK_get_key (block_ctx,
+                           type,
+                           data,
+                           size,
+                           &query))
     {
       GNUNET_break (0);
       GNUNET_DATASTORE_remove (dsh,
@@ -2813,14 +3978,27 @@ process_local_reply (void *cls,
       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
-  if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
+  prq.type = type;
+  prq.priority = priority;  
+  prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
+  prq.anonymity_level = anonymity;
+  if ( (old_rf == 0) &&
+       (pr->results_found == 0) )
+    update_datastore_delays (pr->start_time);
+  process_reply (&prq, key, pr);
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
+  if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
     {
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      pr->local_only = GNUNET_YES; /* do not forward */
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  else if ( (pr->client_request_list == NULL) &&
-       ( (GNUNET_YES == test_load_too_high()) ||
+  if ( (pr->client_request_list == NULL) &&
+       ( (GNUNET_YES == test_get_load_too_high (0)) ||
         (pr->results_found > 5 + 2 * pr->priority) ) )
     {
 #if DEBUG_FS > 2
@@ -2831,14 +4009,10 @@ process_local_reply (void *cls,
                                gettext_noop ("# processing result set cut short due to load"),
                                1,
                                GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  else if (pr->qe != NULL)
-    GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-  prq.type = type;
-  prq.priority = priority;  
-  process_reply (&prq, key, pr);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
@@ -2850,11 +4024,60 @@ process_local_reply (void *cls,
  * @param cp the peer making the request
  * @return effective priority
  */
-static uint32_t
+static int32_t
 bound_priority (uint32_t prio_in,
                struct ConnectedPeer *cp)
 {
-  return 0; // FIXME!
+#define N ((double)128.0)
+  uint32_t ret;
+  double rret;
+  int ld;
+
+  ld = test_get_load_too_high (0);
+  if (ld == GNUNET_SYSERR)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for free (low load)"),
+                               1,
+                               GNUNET_NO);
+      return 0; /* excess resources */
+    }
+  if (prio_in > INT32_MAX)
+    prio_in = INT32_MAX;
+  ret = - change_host_trust (cp, - (int) prio_in);
+  if (ret > 0)
+    {
+      if (ret > current_priorities + N)
+       rret = current_priorities + N;
+      else
+       rret = ret;
+      current_priorities 
+       = (current_priorities * (N-1) + rret)/N;
+    }
+  if ( (ld == GNUNET_YES) && (ret > 0) )
+    {
+      /* try with charging */
+      ld = test_get_load_too_high (ret);
+    }
+  if (ld == GNUNET_YES)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# request dropped, priority insufficient"),
+                               1,
+                               GNUNET_NO);
+      /* undo charge */
+      change_host_trust (cp, (int) ret);
+      return -1; /* not enough resources */
+    }
+  else
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for a price (normal load)"),
+                               1,
+                               GNUNET_NO);
+    }
+#undef N
+  return ret;
 }
 
 
@@ -2895,8 +4118,7 @@ check_duplicate_request_peer (void *cls,
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param atsi performance information
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
  */
@@ -2904,8 +4126,7 @@ static int
 handle_p2p_get (void *cls,
                const struct GNUNET_PeerIdentity *other,
                const struct GNUNET_MessageHeader *message,
-               struct GNUNET_TIME_Relative latency,
-               uint32_t distance)
+               const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   struct PendingRequest *pr;
   struct ConnectedPeer *cp;
@@ -2919,8 +4140,8 @@ handle_p2p_get (void *cls,
   uint32_t bm;
   size_t bfsize;
   uint32_t ttl_decrement;
+  int32_t priority;
   enum GNUNET_BLOCK_Type type;
-  double preference;
   int have_ns;
 
   msize = ntohs(message->size);
@@ -2930,19 +4151,12 @@ handle_p2p_get (void *cls,
       return GNUNET_SYSERR;
     }
   gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s'\n",
+             GNUNET_h2s (&gm->query));
+#endif
   type = ntohl (gm->type);
-  switch (type)
-    {
-    case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-      break;
-    default:
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
   while (bm > 0)
@@ -2957,14 +4171,15 @@ handle_p2p_get (void *cls,
       return GNUNET_SYSERR;
     }  
   opt = (const GNUNET_HashCode*) &gm[1];
-  bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
-  bm = ntohl (gm->hash_bitmap);
-  if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
-       (type != GNUNET_BLOCK_TYPE_SBLOCK) )
+  bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
+  /* bfsize must be power of 2, check! */
+  if (0 != ( (bfsize - 1) & bfsize))
     {
       GNUNET_break_op (0);
-      return GNUNET_SYSERR;      
+      return GNUNET_SYSERR;
     }
+  cover_query_count++;
+  bm = ntohl (gm->hash_bitmap);
   bits = 0;
   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                           &other->hashPubKey);
@@ -3005,20 +4220,16 @@ handle_p2p_get (void *cls,
   /* note that we can really only check load here since otherwise
      peers could find out that we are overloaded by not being
      disconnected after sending us a malformed query... */
-  if (GNUNET_YES == test_load_too_high ())
+  priority = bound_priority (ntohl (gm->priority), cps);
+  if (priority < 0)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping query from `%s', this peer is too busy.\n",
                  GNUNET_i2s (other));
 #endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests dropped due to high load"),
-                               1,
-                               GNUNET_NO);
       return GNUNET_OK;
     }
-
 #if DEBUG_FS 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
@@ -3031,16 +4242,25 @@ handle_p2p_get (void *cls,
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
                      (have_ns ? sizeof(GNUNET_HashCode) : 0));
   if (have_ns)
-    pr->namespace = (GNUNET_HashCode*) &pr[1];
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
+  if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
+       (GNUNET_LOAD_get_average (cp->transmission_delay) > 
+       GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
+    {
+      /* don't have BW to send to peer, or would likely take longer than we have for it,
+        so at best indirect the query */
+      priority = 0;
+      pr->forward_only = GNUNET_YES;
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
-    memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
-
   pr->anonymity_level = 1;
-  pr->priority = bound_priority (ntohl (gm->priority), cps);
+  pr->priority = (uint32_t) priority;
   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
   pr->query = gm->query;
   /* decrement ttl (always) */
@@ -3061,7 +4281,7 @@ handle_p2p_get (void *cls,
                                gettext_noop ("# requests dropped due TTL underflow"),
                                1,
                                GNUNET_NO);
-      /* integer underflow => drop (should be very rare)! */
+      /* integer underflow => drop (should be very rare)! */      
       GNUNET_free (pr);
       return GNUNET_OK;
     } 
@@ -3076,7 +4296,6 @@ handle_p2p_get (void *cls,
                                                  BLOOMFILTER_K);
       pr->bf_size = bfsize;
     }
-
   cdc.have = NULL;
   cdc.pr = pr;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
@@ -3085,8 +4304,8 @@ handle_p2p_get (void *cls,
                                              &cdc);
   if (cdc.have != NULL)
     {
-      if (cdc.have->start_time.value + cdc.have->ttl >=
-         pr->start_time.value + pr->ttl)
+      if (cdc.have->start_time.abs_value + cdc.have->ttl >=
+         pr->start_time.abs_value + pr->ttl)
        {
          /* existing request has higher TTL, drop new one! */
          cdc.have->priority += pr->priority;
@@ -3129,7 +4348,7 @@ handle_p2p_get (void *cls,
   
   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
                                            pr,
-                                           pr->start_time.value + pr->ttl);
+                                           pr->start_time.abs_value + pr->ttl);
 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# P2P searches received"),
@@ -3141,36 +4360,60 @@ handle_p2p_get (void *cls,
                            GNUNET_NO);
 
   /* calculate change in traffic preference */
-  preference = (double) pr->priority;
-  if (preference < QUERY_BANDWIDTH_VALUE)
-    preference = QUERY_BANDWIDTH_VALUE;
-  cps->inc_preference += preference;
-
+  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
   /* process locally */
-  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
-  pr->qe = GNUNET_DATASTORE_get (dsh,
-                                &gm->query,
-                                type,                         
-                                pr->priority + 1,
-                                MAX_DATASTORE_QUEUE,                            
-                                timeout,
-                                &process_local_reply,
-                                pr);
+  if (GNUNET_YES != pr->forward_only)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Handing request for `%s' to datastore\n",
+                 GNUNET_h2s (&gm->query));
+#endif
+      pr->qe = GNUNET_DATASTORE_get (dsh,
+                                    &gm->query,
+                                    type,                             
+                                    pr->priority + 1,
+                                    MAX_DATASTORE_QUEUE,                                
+                                    timeout,
+                                    &process_local_reply,
+                                    pr);
+      if (NULL == pr->qe)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped by datastore (queue length limit)"),
+                                   1,
+                                   GNUNET_NO);
+       }
+    }
+  else
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests forwarded due to high load"),
+                               1,
+                               GNUNET_NO);
+    }
 
-  /* Are multiple results possible?  If so, start processing remotely now! */
+  /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
   switch (pr->type)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one result, wait for datastore */
-      break;
+      if (GNUNET_YES != pr->forward_only)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
+                                   1,
+                                   GNUNET_NO);
+         break;
+       }
     default:
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                            &forward_request_task,
+       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
                                             pr);
     }
 
@@ -3178,6 +4421,7 @@ handle_p2p_get (void *cls,
   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
     {
       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+      GNUNET_assert (pr != NULL);
       destroy_pending_request (pr);
     }
   return GNUNET_OK;
@@ -3230,22 +4474,6 @@ handle_start_search (void *cls,
              GNUNET_h2s (&sm->query),
              (unsigned int) type);
 #endif
-  switch (type)
-    {
-    case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-    case GNUNET_BLOCK_TYPE_NBLOCK:
-      break;
-    default:
-      GNUNET_break (0);
-      GNUNET_SERVER_receive_done (client,
-                                 GNUNET_SYSERR);
-      return;
-    }  
-
   cl = client_list;
   while ( (cl != NULL) &&
          (cl->client != client) )
@@ -3259,8 +4487,8 @@ handle_start_search (void *cls,
       client_list = cl;
     }
   /* detect duplicate KBLOCK requests */
-  if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
+  if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
        (type == GNUNET_BLOCK_TYPE_ANY) )
     {
       crl = cl->rl_head;
@@ -3302,7 +4530,7 @@ handle_start_search (void *cls,
                            1,
                            GNUNET_NO);
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
+                     ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
   memset (crl, 0, sizeof (struct ClientRequestList));
   crl->client_list = cl;
@@ -3320,6 +4548,7 @@ handle_start_search (void *cls,
          sc * sizeof (GNUNET_HashCode));
   pr->replies_seen_off = sc;
   pr->anonymity_level = ntohl (sm->anonymity_level); 
+  pr->start_time = GNUNET_TIME_absolute_get ();
   refresh_bloomfilter (pr);
   pr->query = sm->query;
   if (0 == (1 & ntohl (sm->options)))
@@ -3328,14 +4557,14 @@ handle_start_search (void *cls,
     pr->local_only = GNUNET_YES;
   switch (type)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       if (0 != memcmp (&sm->target,
                       &all_zeros,
                       sizeof (GNUNET_HashCode)))
        pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
       break;
-    case GNUNET_BLOCK_TYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_SBLOCK:
       pr->namespace = (GNUNET_HashCode*) &pr[1];
       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
       break;
@@ -3347,7 +4576,7 @@ handle_start_search (void *cls,
                                                   &sm->query,
                                                   pr,
                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
   pr->qe = GNUNET_DATASTORE_get (dsh,
                                 &sm->query,
@@ -3361,65 +4590,74 @@ handle_start_search (void *cls,
 
 /* **************************** Startup ************************ */
 
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&GNUNET_FS_handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&GNUNET_FS_handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
-};
-
-
 /**
  * Process fs requests.
  *
- * @param s scheduler to use
  * @param server the initialized server
  * @param c configuration to use
  */
 static int
-main_init (struct GNUNET_SCHEDULER_Handle *s,
-          struct GNUNET_SERVER_Handle *server,
+main_init (struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  sched = s;
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { &handle_p2p_migration_stop, 
+       GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+       sizeof (struct MigrationStopMessage) },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+  unsigned long long enc = 128;
+
   cfg = c;
-  stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
-  min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
-  connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  stats = GNUNET_STATISTICS_create ("fs", cfg);
+  min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
+  if ( (GNUNET_OK !=
+       GNUNET_CONFIGURATION_get_value_number (cfg,
+                                              "fs",
+                                              "MAX_PENDING_REQUESTS",
+                                              &max_pending_requests)) ||
+       (GNUNET_OK !=
+       GNUNET_CONFIGURATION_get_value_number (cfg,
+                                              "fs",
+                                              "EXPECTED_NEIGHBOUR_COUNT",
+                                              &enc)) ||
+       (GNUNET_OK != 
+       GNUNET_CONFIGURATION_get_value_time (cfg,
+                                            "fs",
+                                            "MIN_MIGRATION_DELAY",
+                                            &min_migration_delay)) )
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Configuration fails to specify certain parameters, assuming default values."));
+    }
+  connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
+  query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
+  rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
+  peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
-  core = GNUNET_CORE_connect (sched,
-                             cfg,
-                             GNUNET_TIME_UNIT_FOREVER_REL,
+  core = GNUNET_CORE_connect (cfg,
+                             1, /* larger? */
                              NULL,
                              NULL,
                              &peer_connect_handler,
                              &peer_disconnect_handler,
+                             &peer_status_handler,
                              NULL, GNUNET_NO,
                              NULL, GNUNET_NO,
                              p2p_handlers);
@@ -3432,6 +4670,8 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
       connected_peers = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
       query_request_map = NULL;
+      GNUNET_LOAD_value_free (rt_entry_lifetime);
+      rt_entry_lifetime = NULL;
       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
       requests_by_expiration_heap = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
@@ -3443,15 +4683,31 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
        }
       return GNUNET_SYSERR;
     }
-  /* FIXME: distinguish between sending and storing in options? */
-  if (active_migration) 
-    consider_migration_gathering ();
+  if (active_from_migration) 
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Content migration is enabled, will start to gather data\n"));
+      consider_migration_gathering ();
+    }
+  consider_dht_put_gathering (NULL);
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONFIGURATION_get_value_filename (cfg,
+                                                          "fs",
+                                                          "TRUST",
+                                                          &trustDirectory));
+  GNUNET_DISK_directory_create (trustDirectory);
+  GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
+                                     &cron_flush_trust, NULL);
+
+
   GNUNET_SERVER_add_handlers (server, handlers);
-  GNUNET_SCHEDULER_add_delayed (sched,
-                               GNUNET_TIME_UNIT_FOREVER_REL,
+  cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
+                                                &age_cover_counters,
+                                                NULL);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                &shutdown_task,
                                NULL);
   return GNUNET_OK;
@@ -3462,32 +4718,53 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
  * Process fs requests.
  *
  * @param cls closure
- * @param sched scheduler to use
  * @param server the initialized server
  * @param cfg configuration to use
  */
 static void
 run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *sched,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
-                                                          "FS",
-                                                          "ACTIVEMIGRATION");
-  dsh = GNUNET_DATASTORE_connect (cfg,
-                                 sched);
+  active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                             "FS",
+                                                             "CONTENT_CACHING");
+  active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                               "FS",
+                                                               "CONTENT_PUSHING");
+  dsh = GNUNET_DATASTORE_connect (cfg);
   if (dsh == NULL)
     {
-      GNUNET_SCHEDULER_shutdown (sched);
+      GNUNET_SCHEDULER_shutdown ();
       return;
     }
-  if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
-       (GNUNET_OK != main_init (sched, server, cfg)) )
+  datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
+  datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
+  block_cfg = GNUNET_CONFIGURATION_create ();
+  GNUNET_CONFIGURATION_set_value_string (block_cfg,
+                                        "block",
+                                        "PLUGINS",
+                                        "fs");
+  block_ctx = GNUNET_BLOCK_context_create (block_cfg);
+  GNUNET_assert (NULL != block_ctx);
+  dht_handle = GNUNET_DHT_connect (cfg,
+                                  FS_DHT_HT_SIZE);
+  if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
+       (GNUNET_OK != main_init (server, cfg)) )
     {    
-      GNUNET_SCHEDULER_shutdown (sched);
+      GNUNET_SCHEDULER_shutdown ();
       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
       dsh = NULL;
+      GNUNET_DHT_disconnect (dht_handle);
+      dht_handle = NULL;
+      GNUNET_BLOCK_context_destroy (block_ctx);
+      block_ctx = NULL;
+      GNUNET_CONFIGURATION_destroy (block_cfg);
+      block_cfg = NULL;
+      GNUNET_LOAD_value_free (datastore_get_load);
+      datastore_get_load = NULL;
+      GNUNET_LOAD_value_free (datastore_put_load);
+      datastore_put_load = NULL;
       return;   
     }
 }