fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 6860c377a7682756b0128b13ba7b1b331e59a871..ccdd76de235fb1add6c72bd6d857fd95dd9d53d7 100644 (file)
  * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
+ * FIXME:
+ * - TTL/priority calculations are absent!
  * TODO:
- * - forward_request_task (P2P forwarding!) 
- * - track stats for hot-path routing
+ * - have non-zero preference / priority for requests we initiate!
  * - implement hot-path routing decision procedure
- * - detect duplicate requests (P2P and CS)
- * - implement: bound_priority, test_load_too_high, validate_skblock
- * - add content migration support (store locally)
+ * - implement: bound_priority, test_load_too_high
  * - statistics
- * 
  */
 #include "platform.h"
 #include <float.h>
 #include "gnunet_peer_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
 #include "gnunet_util_lib.h"
-#include "gnunet-service-fs_drq.h"
 #include "gnunet-service-fs_indexing.h"
 #include "fs.h"
 
+#define DEBUG_FS GNUNET_NO
+
 /**
  * Maximum number of outgoing messages we queue per peer.
- * FIXME: set to a tiny value for testing; make configurable.
+ * FIXME: make configurable?
+ */
+#define MAX_QUEUE_PER_PEER 16
+
+/**
+ * Inverse of the probability that we will submit the same query
+ * to the same peer again.  If the same peer already got the query
+ * repeatedly recently, the probability is multiplied by the inverse
+ * of this number each time.  Note that we only try about every TTL_DECREMENT/2
+ * plus MAX_CORK_DELAY (so roughly every 3.5s).
+ */
+#define RETRY_PROBABILITY_INV 3
+
+/**
+ * What is the maximum delay for a P2P FS message (in our interaction
+ * with core)?  FS-internal delays are another story.  The value is
+ * chosen based on the 32k block size.  Given that peers typcially
+ * have at least 1 kb/s bandwidth, 45s waits give us a chance to
+ * transmit one message even to the lowest-bandwidth peers.
  */
-#define MAX_QUEUE_PER_PEER 2
+#define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
 
 
 
 /**
  * Maximum number of requests (from other peers) that we're
  * willing to have pending at any given point in time.
- * FIXME: set from configuration (and 32 is a tiny value for testing only).
+ * FIXME: set from configuration.
  */
-static uint64_t max_pending_requests = 32;
+static uint64_t max_pending_requests = (32 * 1024);
 
 
 /**
@@ -68,6 +86,11 @@ static uint64_t max_pending_requests = 32;
  */
 struct PendingMessage;
 
+/**
+ * Our connection to the datastore.
+ */
+static struct GNUNET_DATASTORE_Handle *dsh;
+
 
 /**
  * Function called upon completion of a transmission.
@@ -80,7 +103,7 @@ typedef void (*TransmissionContinuation)(void * cls,
 
 
 /**
- * Information we keep for each pending reply.  The
+ * Information we keep for each pending message (GET/PUT).  The
  * actual message follows at the end of this struct.
  */
 struct PendingMessage
@@ -323,26 +346,6 @@ struct ClientList
 };
 
 
-/**
- * Hash map entry of requests we are performing
- * on behalf of the same peer.
- */
-struct PeerRequestEntry
-{
-
-  /**
-   * Request this entry represents.
-   */
-  struct PendingRequest *req;
-
-  /**
-   * Entry of peer responsible for this entry.
-   */
-  struct ConnectedPeer *cp;
-
-};
-
-
 /**
  * Doubly-linked list of messages we are performing
  * due to a pending request.
@@ -391,12 +394,12 @@ struct PendingRequest
    * client request list; otherwise NULL.
    */
   struct ClientRequestList *client_request_list;
-  
+
   /**
-   * If this request was made by a peer, this is our entry in the
-   * per-peer multi-hash map; otherwise NULL.
+   * Entry of peer responsible for this entry (if this request
+   * was made by a peer).
    */
-  struct PeerRequestEntry *pht_entry;
+  struct ConnectedPeer *cp;
 
   /**
    * If this is a namespace query, pointer to the hash of the public
@@ -471,12 +474,13 @@ struct PendingRequest
   GNUNET_PEER_Id *used_pids;
   
   /**
-   * Our entry in the DRQ (non-NULL while we wait for our
+   * Our entry in the queue (non-NULL while we wait for our
    * turn to interact with the local database).
    */
-  struct DatastoreRequestQueue *drq;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
+
    * Size of the 'bf' (in bytes).
    */
   size_t bf_size;
@@ -526,8 +530,7 @@ struct PendingRequest
   uint32_t remaining_priority;
 
   /**
-   * Number to mingle hashes for bloom-filter
-   * tests with.
+   * Number to mingle hashes for bloom-filter tests with.
    */
   int32_t mingle;
 
@@ -540,8 +543,67 @@ struct PendingRequest
   /**
    * Type of the content that this request is for.
    */
-  uint32_t type;
+  enum GNUNET_BLOCK_Type type;
+
+  /**
+   * Remove this request after transmission of the current response.
+   */
+  int16_t do_remove;
+
+  /**
+   * GNUNET_YES if we should not forward this request to other peers.
+   */
+  int16_t local_only;
+
+};
+
+
+/**
+ * Block that is ready for migration to other peers.  Actual data is at the end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct MigrationReadyBlock *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct MigrationReadyBlock *prev;
+
+  /**
+   * Query for the block.
+   */
+  GNUNET_HashCode query;
+
+  /**
+   * When does this block expire? 
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Peers we would consider forwarding this
+   * block to.  Zero for empty entries.
+   */
+  GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
 
+  /**
+   * Size of the block.
+   */
+  size_t size;
+
+  /**
+   *  Number of targets already used.
+   */
+  unsigned int used_targets;
+
+  /**
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
 };
 
 
@@ -553,7 +615,7 @@ static struct GNUNET_SCHEDULER_Handle *sched;
 /**
  * Our configuration.
  */
-const struct GNUNET_CONFIGURATION_Handle *cfg;
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
@@ -580,21 +642,349 @@ static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
  */
 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
 
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
 /**
  * Linked list of clients we are currently processing requests for.
  */
-struct ClientList *client_list;
+static struct ClientList *client_list;
 
 /**
  * Pointer to handle to the core service (points to NULL until we've
  * connected to it).
  */
-struct GNUNET_CORE_Handle *core;
+static struct GNUNET_CORE_Handle *core;
+
+/**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
+
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+
+/**
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
+ */
+static struct GNUNET_TIME_Relative min_migration_delay;
+
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+
+/**
+ * Are we allowed to migrate content to this peer.
+ */
+static int active_migration;
+
+
+/**
+ * Transmit messages by copying it to the target buffer
+ * "buf".  "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime.  In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+                 size_t size, void *buf);
 
 
 /* ******************* clean up functions ************************ */
 
 
+/**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+  GNUNET_CONTAINER_DLL_remove (mig_head,
+                              mig_tail,
+                              mb);
+  GNUNET_PEER_decrement_rcs (mb->target_list,
+                            MIGRATION_LIST_SIZE);
+  mig_size--;
+  GNUNET_free (mb);
+}
+
+
+/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+          const struct GNUNET_PeerIdentity *p1,
+          const struct GNUNET_PeerIdentity *p2)
+{
+  return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+                                   &p2->hashPubKey,
+                                   key);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls 'struct MigrationReadyBlock*' to select
+ *            targets for (or NULL for none)
+ * @param key ID of the peer 
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)
+ */
+static int
+consider_migration (void *cls,
+                   const GNUNET_HashCode *key,
+                   void *value)
+{
+  struct MigrationReadyBlock *mb = cls;
+  struct ConnectedPeer *cp = value;
+  struct MigrationReadyBlock *pos;
+  struct GNUNET_PeerIdentity cppid;
+  struct GNUNET_PeerIdentity otherpid;
+  struct GNUNET_PeerIdentity worstpid;
+  size_t msize;
+  unsigned int i;
+  unsigned int repl;
+  
+  /* consider 'cp' as a migration target for mb */
+  if (mb != NULL)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &cppid);
+      repl = MIGRATION_LIST_SIZE;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (mb->target_list[i] == 0)
+           {
+             mb->target_list[i] = cp->pid;
+             GNUNET_PEER_change_rc (mb->target_list[i], 1);
+             repl = MIGRATION_LIST_SIZE;
+             break;
+           }
+         GNUNET_PEER_resolve (mb->target_list[i],
+                              &otherpid);
+         if ( (repl == MIGRATION_LIST_SIZE) &&
+              is_closer (&mb->query,
+                         &cppid,
+                         &otherpid)) 
+           {
+             repl = i;
+             worstpid = otherpid;
+           }
+         else if ( (repl != MIGRATION_LIST_SIZE) &&
+                   (is_closer (&mb->query,
+                               &worstpid,
+                               &otherpid) ) )
+           {
+             repl = i;
+             worstpid = otherpid;
+           }       
+       }
+      if (repl != MIGRATION_LIST_SIZE) 
+       {
+         GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+         mb->target_list[repl] = cp->pid;
+         GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+       }
+    }
+
+  /* consider scheduling transmission to cp for content migration */
+  if (cp->cth != NULL)
+    return GNUNET_YES; 
+  msize = 0;
+  pos = mig_head;
+  while (pos != NULL)
+    {
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (cp->pid == pos->target_list[i])
+           {
+             if (msize == 0)
+               msize = pos->size;
+             else
+               msize = GNUNET_MIN (msize,
+                                   pos->size);
+             break;
+           }
+       }
+      pos = pos->next;
+    }
+  if (msize == 0)
+    return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
+  cp->cth 
+    = GNUNET_CORE_notify_transmit_ready (core,
+                                        0, GNUNET_TIME_UNIT_FOREVER_REL,
+                                        (const struct GNUNET_PeerIdentity*) key,
+                                        msize + sizeof (struct PutMessage),
+                                        &transmit_to_peer,
+                                        cp);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (mig_qe != NULL)
+    return;
+  if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                        mig_size);
+  delay = GNUNET_TIME_relative_divide (delay,
+                                      MAX_MIGRATION_QUEUE);
+  delay = GNUNET_TIME_relative_max (delay,
+                                   min_migration_delay);
+  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          delay,
+                                          &gather_migration_blocks,
+                                          NULL);
+}
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+                          const GNUNET_HashCode * key,
+                          uint32_t size,
+                          const void *data,
+                          enum GNUNET_BLOCK_Type type,
+                          uint32_t priority,
+                          uint32_t anonymity,
+                          struct GNUNET_TIME_Absolute
+                          expiration, uint64_t uid)
+{
+  struct MigrationReadyBlock *mb;
+  
+  if (key == NULL)
+    {
+      mig_qe = NULL;
+      if (mig_size < MAX_MIGRATION_QUEUE)  
+       consider_migration_gathering ();
+      return;
+    }
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+    {
+      if (GNUNET_OK !=
+         GNUNET_FS_handle_on_demand_block (key, size, data,
+                                           type, priority, anonymity,
+                                           expiration, uid, 
+                                           &process_migration_content,
+                                           NULL))
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      return;
+    }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+  mb->query = *key;
+  mb->expiration = expiration;
+  mb->size = size;
+  mb->type = type;
+  memcpy (&mb[1], data, size);
+  GNUNET_CONTAINER_DLL_insert_after (mig_head,
+                                    mig_tail,
+                                    mig_tail,
+                                    mb);
+  mig_size++;
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &consider_migration,
+                                        mb);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  mig_task = GNUNET_SCHEDULER_NO_TASK;
+  mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                       &process_migration_content, NULL);
+  GNUNET_assert (mig_qe != NULL);
+}
+
+
 /**
  * We're done with a particular message list entry.
  * Free all associated resources.
@@ -628,15 +1018,18 @@ destroy_pending_message (struct PendingMessage *pm,
                         GNUNET_PEER_Id tpid)
 {
   struct PendingMessageList *pml = pm->pml;
+  TransmissionContinuation cont;
+  void *cont_cls;
 
   GNUNET_assert (pml->pm == pm);
   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
-  pm->cont (pm->cont_cls, 0);  
+  cont = pm->cont;
+  cont_cls = pm->cont_cls;
   destroy_pending_message_list_entry (pml);
+  cont (cont_cls, tpid);  
 }
 
 
-
 /**
  * We're done processing a particular request.
  * Free all associated resources.
@@ -654,16 +1047,30 @@ destroy_pending_request (struct PendingRequest *pr)
                                         pr->hnode);
       pr->hnode = NULL;
     }
-  /* might have already been removed from map
-     in 'process_reply' if there was a unique 
-     reply; hence ignore the return value here */
+  if (NULL == pr->client_request_list)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches active"),
+                               -1,
+                               GNUNET_NO);
+    }
+  else
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# client searches active"),
+                               -1,
+                               GNUNET_NO);
+    }
+  /* might have already been removed from map in 'process_reply' (if
+     there was a unique reply) or never inserted if it was a
+     duplicate; hence ignore the return value here */
   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
                                               &pr->query,
                                               pr);
-  if (pr->drq != NULL)
-    {
-      GNUNET_FS_drq_get_cancel (pr->drq);
-      pr->drq = NULL;
+  if (pr->qe != NULL)
+     {
+      GNUNET_DATASTORE_cancel (pr->qe);
+      pr->qe = NULL;
     }
   if (pr->client_request_list != NULL)
     {
@@ -673,15 +1080,14 @@ destroy_pending_request (struct PendingRequest *pr)
       GNUNET_free (pr->client_request_list);
       pr->client_request_list = NULL;
     }
-  if (pr->pht_entry != NULL)
+  if (pr->cp != NULL)
     {
-      GNUNET_PEER_resolve (pr->pht_entry->cp->pid,
+      GNUNET_PEER_resolve (pr->cp->pid,
                           &pid);
-      GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
-                                           &pid.hashPubKey,
-                                           pr->pht_entry);
-      GNUNET_free (pr->pht_entry);
-      pr->pht_entry = NULL;
+      (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+                                                  &pid.hashPubKey,
+                                                  pr);
+      pr->cp = NULL;
     }
   if (pr->bf != NULL)
     {
@@ -735,16 +1141,26 @@ peer_connect_handler (void *cls,
                      uint32_t distance)
 {
   struct ConnectedPeer *cp;
-
+  struct MigrationReadyBlock *pos;
+  
   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
   cp->pid = GNUNET_PEER_intern (peer);
-  GNUNET_CONTAINER_multihashmap_put (connected_peers,
-                                    &peer->hashPubKey,
-                                    cp,
-                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (connected_peers,
+                                                  &peer->hashPubKey,
+                                                  cp,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+
+  pos = mig_head;
+  while (NULL != pos)
+    {
+      (void) consider_migration (pos, &peer->hashPubKey, cp);
+      pos = pos->next;
+    }
 }
 
 
+
 /**
  * Free (each) request made by the peer.
  *
@@ -761,9 +1177,10 @@ destroy_request (void *cls,
   const struct GNUNET_PeerIdentity * peer = cls;
   struct PendingRequest *pr = value;
   
-  GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
-                                       &peer->hashPubKey,
-                                       pr);
+  GNUNET_break (GNUNET_YES ==
+               GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+                                                     &peer->hashPubKey,
+                                                     pr));
   destroy_pending_request (pr);
   return GNUNET_YES;
 }
@@ -783,6 +1200,8 @@ peer_disconnect_handler (void *cls,
   struct ConnectedPeer *cp;
   struct PendingMessage *pm;
   unsigned int i;
+  struct MigrationReadyBlock *pos;
+  struct MigrationReadyBlock *next;
 
   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
                                              &peer->hashPubKey,
@@ -800,9 +1219,35 @@ peer_disconnect_handler (void *cls,
          cp->last_client_replies[i] = NULL;
        }
     }
-  GNUNET_CONTAINER_multihashmap_remove (connected_peers,
-                                       &peer->hashPubKey,
-                                       cp);
+  GNUNET_break (GNUNET_YES ==
+               GNUNET_CONTAINER_multihashmap_remove (connected_peers,
+                                                     &peer->hashPubKey,
+                                                     cp));
+  /* remove this peer from migration considerations; schedule
+     alternatives */
+  next = mig_head;
+  while (NULL != (pos = next))
+    {
+      next = pos->next;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (pos->target_list[i] == cp->pid)
+           {
+             GNUNET_PEER_change_rc (pos->target_list[i], -1);
+             pos->target_list[i] = 0;
+            }
+         }
+      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+       {
+         delete_migration_block (pos);
+         consider_migration_gathering ();
+          continue;
+       }
+      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                            &consider_migration,
+                                            pos);
+    }
+
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
@@ -862,7 +1307,7 @@ handle_client_disconnect (void *cls,
   struct ClientResponseMessage *creply;
 
   if (client == NULL)
-    return; /* huh? is this allowed? */
+    return;
   prev = NULL;
   pos = client_list;
   while ( (NULL != pos) &&
@@ -874,7 +1319,12 @@ handle_client_disconnect (void *cls,
   if (pos == NULL)
     return; /* no requests pending for this client */
   while (NULL != (rcl = pos->rl_head))
-    destroy_pending_request (rcl->req);
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Destroying pending request `%s' on disconnect\n",
+                 GNUNET_h2s (&rcl->req->query));
+      destroy_pending_request (rcl->req);
+    }
   if (prev == NULL)
     client_list = pos->next;
   else
@@ -927,6 +1377,16 @@ static void
 shutdown_task (void *cls,
               const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  if (mig_qe != NULL)
+    {
+      GNUNET_DATASTORE_cancel (mig_qe);
+      mig_qe = NULL;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != mig_task)
+    {
+      GNUNET_SCHEDULER_cancel (sched, mig_task);
+      mig_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (client_list != NULL)
     handle_client_disconnect (NULL,
                              client_list->client);
@@ -947,6 +1407,17 @@ shutdown_task (void *cls,
   GNUNET_assert (NULL != core);
   GNUNET_CORE_disconnect (core);
   core = NULL;
+  if (stats != NULL)
+    {
+      GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+      stats = NULL;
+    }
+  GNUNET_DATASTORE_disconnect (dsh,
+                              GNUNET_NO);
+  while (mig_head != NULL)
+    delete_migration_block (mig_head);
+  GNUNET_assert (0 == mig_size);
+  dsh = NULL;
   sched = NULL;
   cfg = NULL;  
 }
@@ -956,7 +1427,7 @@ shutdown_task (void *cls,
 
 
 /**
- * Transmit the given message by copying it to the target buffer
+ * Transmit messages by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
  * for writing in the meantime.  In that case, do nothing
  * (the disconnect or shutdown handler will take care of the rest).
@@ -976,14 +1447,18 @@ transmit_to_peer (void *cls,
   char *cbuf = buf;
   struct GNUNET_PeerIdentity pid;
   struct PendingMessage *pm;
+  struct MigrationReadyBlock *mb;
+  struct MigrationReadyBlock *next;
+  struct PutMessage migm;
   size_t msize;
-  
-  cp->cth = NULL;
+  unsigned int i;
+  cp->cth = NULL;
   if (NULL == buf)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping reply, core too busy.\n");
+                 "Dropping message, core too busy.\n");
 #endif
       return 0;
     }
@@ -1006,8 +1481,69 @@ transmit_to_peer (void *cls,
                                                   &pid,
                                                   pm->msize,
                                                   &transmit_to_peer,
-                                                  pm);
+                                                  cp);
+    }
+  else
+    {      
+      next = mig_head;
+      while (NULL != (mb = next))
+       {
+         next = mb->next;
+         for (i=0;i<MIGRATION_LIST_SIZE;i++)
+           {
+             if ( (cp->pid == mb->target_list[i]) &&
+                  (mb->size + sizeof (migm) <= size) )
+               {
+                 GNUNET_PEER_change_rc (mb->target_list[i], -1);
+                 mb->target_list[i] = 0;
+                 mb->used_targets++;
+                 migm.header.size = htons (sizeof (migm) + mb->size);
+                 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+                 migm.type = htonl (mb->type);
+                 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+                 memcpy (&cbuf[msize], &migm, sizeof (migm));
+                 msize += sizeof (migm);
+                 size -= sizeof (migm);
+                 memcpy (&cbuf[msize], &mb[1], mb->size);
+                 msize += mb->size;
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+                 break;
+               }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
+           }
+         if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+              (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
+           {
+             delete_migration_block (mb);
+             consider_migration_gathering ();
+           }
+       }
+      consider_migration (NULL, 
+                         &pid.hashPubKey,
+                         cp);
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitting %u bytes to peer %u\n",
+             msize,
+             cp->pid);
+#endif
   return msize;
 }
 
@@ -1049,21 +1585,27 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
   cp->pending_requests++;
   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
     destroy_pending_message (cp->pending_messages_tail, 0);  
+  GNUNET_PEER_resolve (cp->pid, &pid);
+  if (NULL != cp->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+  /* need to schedule transmission */
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              cp->pending_messages_head->priority,
+                                              MAX_TRANSMIT_DELAY,
+                                              &pid,
+                                              cp->pending_messages_head->msize,
+                                              &transmit_to_peer,
+                                              cp);
   if (cp->cth == NULL)
     {
-      /* need to schedule transmission */
-      GNUNET_PEER_resolve (cp->pid, &pid);
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  cp->pending_messages_head->priority,
-                                                  GNUNET_TIME_UNIT_FOREVER_REL,
-                                                  &pid,
-                                                  cp->pending_messages_head->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
-    }
-  if (cp->cth == NULL)
-    {
-      /* FIXME: call stats (rare, bad case) */
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Failed to schedule transmission with core!\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# CORE transmission failures"),
+                               1,
+                               GNUNET_NO);
     }
 }
 
@@ -1102,12 +1644,51 @@ test_load_too_high ()
 /* ******************* Pending Request Refresh Task ******************** */
 
 
+
+/**
+ * We use a random delay to make the timing of requests less
+ * predictable.  This function returns such a random delay.  We add a base
+ * delay of MAX_CORK_DELAY (1s).
+ *
+ * FIXME: make schedule dependent on the specifics of the request?
+ * Or bandwidth and number of connected peers and load?
+ *
+ * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
+ */
+static struct GNUNET_TIME_Relative
+get_processing_delay ()
+{
+  return 
+    GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
+                             GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                            GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                                      TTL_DECREMENT)));
+}
+
+
+/**
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers.  This function is called periodically
+ * and should forward the request to other peers until we have all
+ * possible replies.  If we have transmitted the *only* reply to
+ * the initiator we should destroy the pending request.  If we have
+ * many replies in the queue to the initiator, we should delay sending
+ * out more queries until the reply queue has shrunk some.
+ *
+ * @param cls our "struct ProcessGetContext *"
+ * @param tc unused
+ */
+static void
+forward_request_task (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /**
  * Function called after we either failed or succeeded
  * at transmitting a query to a peer.  
  *
  * @param cls the requests "struct PendingRequest*"
- * @param pid ID of receiving peer, 0 on transmission error
+ * @param tpid ID of receiving peer, 0 on transmission error
  */
 static void
 transmit_query_continuation (void *cls,
@@ -1115,18 +1696,41 @@ transmit_query_continuation (void *cls,
 {
   struct PendingRequest *pr = cls;
 
-  if (tpid == 0)    
-    return;    
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# queries scheduled for forwarding"),
+                           -1,
+                           GNUNET_NO);
+  if (tpid == 0)   
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Transmission of request failed, will try again later.\n");
+#endif
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                get_processing_delay (),
+                                                &forward_request_task,
+                                                pr); 
+      return;    
+    }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# queries forwarded"),
+                           1,
+                           GNUNET_NO);
   GNUNET_PEER_change_rc (tpid, 1);
   if (pr->used_pids_off == pr->used_pids_size)
     GNUNET_array_grow (pr->used_pids,
                       pr->used_pids_size,
                       pr->used_pids_size * 2 + 2);
   pr->used_pids[pr->used_pids_off++] = tpid;
+  if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+    pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                            get_processing_delay (),
+                                            &forward_request_task,
+                                            pr);
 }
 
 
-#if 0
 /**
  * How many bytes should a bloomfilter be if we have already seen
  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
@@ -1158,58 +1762,36 @@ compute_bloomfilter_size (unsigned int entry_count)
 
 
 /**
- * Recalculate our bloom filter for filtering replies.
+ * Recalculate our bloom filter for filtering replies.  This function
+ * will create a new bloom filter from scratch, so it should only be
+ * called if we have no bloomfilter at all (and hence can create a
+ * fresh one of minimal size without problems) OR if our peer is the
+ * initiator (in which case we may resize to larger than mimimum size).
  *
- * @param count number of entries we are filtering right now
- * @param mingle set to our new mingling value
- * @param bf_size set to the size of the bloomfilter
- * @param entries the entries to filter
- * @return updated bloomfilter, NULL for none
- */
-static struct GNUNET_CONTAINER_BloomFilter *
-refresh_bloomfilter (unsigned int count,
-                    int32_t * mingle,
-                    size_t *bf_size,
-                    const GNUNET_HashCode *entries)
+ * @param pr request for which the BF is to be recomputed
+ */
+static void
+refresh_bloomfilter (struct PendingRequest *pr)
 {
-  struct GNUNET_CONTAINER_BloomFilter *bf;
-  size_t nsize;
   unsigned int i;
+  size_t nsize;
   GNUNET_HashCode mhash;
 
-  if (0 == count)
-    return NULL;
-  nsize = compute_bloomfilter_size (count);
-  *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
-  *bf_size = nsize;
-  bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
-                                         nsize,
-                                         BLOOMFILTER_K);
-  for (i=0;i<count;i++)
+  nsize = compute_bloomfilter_size (pr->replies_seen_off);
+  if (nsize == pr->bf_size)
+    return; /* size not changed */
+  if (pr->bf != NULL)
+    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+  pr->bf_size = nsize;
+  pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
+  pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
+                                             pr->bf_size,
+                                             BLOOMFILTER_K);
+  for (i=0;i<pr->replies_seen_off;i++)
     {
-      mingle_hash (&entries[i], *mingle, &mhash);
-      GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
+      mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
+      GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
     }
-  return bf;
-}
-#endif
-
-
-/**
- * We use a random delay to make the timing of requests less
- * predictable.  This function returns such a random delay.
- *
- * FIXME: make schedule dependent on the specifics of the request?
- * Or bandwidth and number of connected peers and load?
- *
- * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
- */
-static struct GNUNET_TIME_Relative
-get_processing_delay ()
-{
-  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
-                                       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                                                 TTL_DECREMENT));
 }
 
 
@@ -1229,8 +1811,8 @@ static void
 target_reservation_cb (void *cls,
                       const struct
                       GNUNET_PeerIdentity * peer,
-                      unsigned int bpm_in,
-                      unsigned int bpm_out,
+                      struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
+                      struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
                       int amount,
                       uint64_t preference)
 {
@@ -1243,35 +1825,96 @@ target_reservation_cb (void *cls,
   size_t msize;
   unsigned int k;
   int no_route;
+  uint32_t bm;
 
   pr->irc = NULL;
-  GNUNET_assert (peer != NULL);
+  if (peer == NULL)
+    {
+      /* error in communication with core, try again later */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
+      return;
+    }
   // (3) transmit, update ttl/priority
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &peer->hashPubKey);
   if (cp == NULL)
     {
       /* Peer must have just left */
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Selected peer disconnected!\n");
+#endif
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
       return;
     }
   no_route = GNUNET_NO;
-  if (amount != DBLOCK_SIZE) 
+  /* FIXME: check against DBLOCK_SIZE and possibly return
+     amount to reserve; however, this also needs to work
+     with testcases which currently start out with a far
+     too low per-peer bw limit, so they would never send
+     anything.  Big issue. */
+  if (amount == 0)
     {
-      if (pr->pht_entry == NULL)
-       return;  /* this target round failed */
+      if (pr->cp == NULL)
+       {
+#if DEBUG_FS > 1
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
+                     amount,
+                     DBLOCK_SIZE);
+#endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# reply bandwidth reservation requests failed"),
+                                   1,
+                                   GNUNET_NO);
+         if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                    get_processing_delay (),
+                                                    &forward_request_task,
+                                                    pr);
+         return;  /* this target round failed */
+       }
       /* FIXME: if we are "quite" busy, we may still want to skip
         this round; need more load detection code! */
       no_route = GNUNET_YES;
     }
   
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# queries scheduled for forwarding"),
+                           1,
+                           GNUNET_NO);
   /* build message and insert message into priority queue */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Forwarding request `%s' to `%4s'!\n",
+             GNUNET_h2s (&pr->query),
+             GNUNET_i2s (peer));
+#endif
   k = 0;
+  bm = 0;
+  if (GNUNET_YES == no_route)
+    {
+      bm |= GET_MESSAGE_BIT_RETURN_TO;
+      k++;      
+    }
   if (pr->namespace != NULL)
-    k++;
+    {
+      bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
+      k++;
+    }
   if (pr->target_pid != 0)
-    k++;
-  if (GNUNET_YES == no_route)
-    k++;      
+    {
+      bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
+      k++;
+    }
   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
@@ -1283,17 +1926,17 @@ target_reservation_cb (void *cls,
   pr->remaining_priority /= 2;
   gm->priority = htonl (pr->remaining_priority);
   gm->ttl = htonl (pr->ttl);
-  gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion?
-  gm->hash_bitmap = htonl (42); // FIXME!
+  gm->filter_mutator = htonl(pr->mingle); 
+  gm->hash_bitmap = htonl (bm);
   gm->query = pr->query;
   ext = (GNUNET_HashCode*) &gm[1];
   k = 0;
+  if (GNUNET_YES == no_route)
+    GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
   if (pr->namespace != NULL)
     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
   if (pr->target_pid != 0)
     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
-  if (GNUNET_YES == no_route)
-    GNUNET_PEER_resolve (pr->pht_entry->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
   bfdata = (char *) &ext[k];
   if (pr->bf != NULL)
     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
@@ -1350,12 +1993,36 @@ target_peer_select_cb (void *cls,
   struct PendingRequest *pr = psc->pr;
   double score;
   unsigned int i;
-  
-  /* 1) check if we have already (recently) forwarded to this peer */
+  unsigned int pc;
+
+  /* 1) check that this peer is not the initiator */
+  if (cp == pr->cp)
+    return GNUNET_YES; /* skip */         
+
+  /* 2) check if we have already (recently) forwarded to this peer */
+  pc = 0;
   for (i=0;i<pr->used_pids_off;i++)
-    if (pr->used_pids[i] == cp->pid)
-      return GNUNET_YES; /* skip */
-  // 2) calculate how much we'd like to forward to this peer
+    if (pr->used_pids[i] == cp->pid) 
+      {
+       pc++;
+       if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                          RETRY_PROBABILITY_INV))
+         {
+#if DEBUG_FS
+           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                       "NOT re-trying query that was previously transmitted %u times\n",
+                       (unsigned int) pr->used_pids_off);
+#endif
+           return GNUNET_YES; /* skip */
+         }
+      }
+#if DEBUG_FS
+  if (0 < pc)
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Re-trying query that was previously transmitted %u times to this peer\n",
+               (unsigned int) pc);
+#endif
+  // 3) calculate how much we'd like to forward to this peer
   score = 42; // FIXME!
   // FIXME: also need API to gather data on responsiveness
   // of this peer (we have fields for that in 'cp', but
@@ -1371,6 +2038,33 @@ target_peer_select_cb (void *cls,
 }
   
 
+/**
+ * The priority level imposes a bound on the maximum
+ * value for the ttl that can be requested.
+ *
+ * @param ttl_in requested ttl
+ * @param prio given priority
+ * @return ttl_in if ttl_in is below the limit,
+ *         otherwise the ttl-limit for the given priority
+ */
+static int32_t
+bound_ttl (int32_t ttl_in, uint32_t prio)
+{
+  unsigned long long allowed;
+
+  if (ttl_in <= 0)
+    return ttl_in;
+  allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
+  if (ttl_in > allowed)      
+    {
+      if (allowed >= (1 << 30))
+        return 1 << 30;
+      return allowed;
+    }
+  return ttl_in;
+}
+
+
 /**
  * We're processing a GET request from another peer and have decided
  * to forward it to other peers.  This function is called periodically
@@ -1390,13 +2084,20 @@ forward_request_task (void *cls,
   struct PendingRequest *pr = cls;
   struct PeerSelectionContext psc;
   struct ConnectedPeer *cp; 
+  struct GNUNET_TIME_Relative delay;
 
-  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          get_processing_delay (),
-                                          &forward_request_task,
-                                          pr);
+  pr->task = GNUNET_SCHEDULER_NO_TASK;
   if (pr->irc != NULL)
-    return; /* previous request still pending */
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
+                 GNUNET_h2s (&pr->query));
+#endif
+      return; /* already pending */
+    }
+  if (GNUNET_YES == pr->local_only)
+    return; /* configured to not do P2P search */
   /* (1) select target */
   psc.pr = pr;
   psc.target_score = DBL_MIN;
@@ -1404,16 +2105,54 @@ forward_request_task (void *cls,
                                         &target_peer_select_cb,
                                         &psc);  
   if (psc.target_score == DBL_MIN)
-    return; /* nobody selected */
+    {
+      delay = get_processing_delay ();
+#if DEBUG_FS 
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
+                 GNUNET_h2s (&pr->query),
+                 delay.value);
+#endif
+      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                              delay,
+                                              &forward_request_task,
+                                              pr);
+      return; /* nobody selected */
+    }
+  /* (3) update TTL/priority */
+  
+  if (pr->client_request_list != NULL)
+    {
+      /* FIXME: use better algorithm!? */
+      if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                        4))
+       pr->priority++;
+      /* FIXME: bound priority by "customary" priority used by other peers
+        at this time! */
+      pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
+                          pr->priority);
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Trying query `%s' with priority %u and TTL %d.\n",
+                 GNUNET_h2s (&pr->query),
+                 pr->priority,
+                 pr->ttl);
+#endif
+    }
+  else
+    {
+      /* FIXME: should we do something here as well!? */
+    }
 
-  /* (2) reserve reply bandwidth */
+  /* (3) reserve reply bandwidth */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &psc.target.hashPubKey);
+  GNUNET_assert (NULL != cp);
   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
                                                &psc.target,
                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               (uint32_t) -1 /* no limit */, 
-                                               DBLOCK_SIZE, 
+                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
+                                               DBLOCK_SIZE * 2
                                                (uint64_t) cp->inc_preference,
                                                &target_reservation_cb,
                                                pr);
@@ -1429,23 +2168,24 @@ forward_request_task (void *cls,
  * at transmitting a reply to a peer.  
  *
  * @param cls the requests "struct PendingRequest*"
- * @param pid ID of receiving peer, 0 on transmission error
+ * @param tpid ID of receiving peer, 0 on transmission error
  */
 static void
 transmit_reply_continuation (void *cls,
                             GNUNET_PEER_Id tpid)
 {
   struct PendingRequest *pr = cls;
-
+  
   switch (pr->type)
     {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
       /* only one reply expected, done with the request! */
       destroy_pending_request (pr);
       break;
-    case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_ANY:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
       break;
     default:
       GNUNET_break (0);
@@ -1454,94 +2194,6 @@ transmit_reply_continuation (void *cls,
 }
 
 
-/**
- * Check if the given KBlock is well-formed.
- *
- * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
- * @param query where to store the query that this block answers
- * @return GNUNET_OK if this is actually a well-formed KBlock
- */
-static int
-check_kblock (const struct KBlock *kb,
-             size_t dsize,
-             GNUNET_HashCode *query)
-{
-  if (dsize < sizeof (struct KBlock))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (dsize - sizeof (struct KBlock) !=
-      ntohs (kb->purpose.size) 
-      - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
-      - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
-                               &kb->purpose,
-                               &kb->signature,
-                               &kb->keyspace)) 
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (query != NULL)
-    GNUNET_CRYPTO_hash (&kb->keyspace,
-                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                       query);
-  return GNUNET_OK;
-}
-
-
-/**
- * Check if the given SBlock is well-formed.
- *
- * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
- * @param query where to store the query that this block answers
- * @param namespace where to store the namespace that this block belongs to
- * @return GNUNET_OK if this is actually a well-formed SBlock
- */
-static int
-check_sblock (const struct SBlock *sb,
-             size_t dsize,
-             GNUNET_HashCode *query,   
-             GNUNET_HashCode *namespace)
-{
-  if (dsize < sizeof (struct SBlock))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (dsize !=
-      ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
-                               &sb->purpose,
-                               &sb->signature,
-                               &sb->subspace)) 
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (query != NULL)
-    *query = sb->identifier;
-  if (namespace != NULL)
-    GNUNET_CRYPTO_hash (&sb->subspace,
-                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                       namespace);
-  return GNUNET_OK;
-}
-
-
 /**
  * Transmit the given message by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
@@ -1591,6 +2243,11 @@ transmit_to_client (void *cls,
                                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                                  &transmit_to_client,
                                                  cl);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted %u bytes to client\n",
+             (unsigned int) msize);
+#endif
   return msize;
 }
 
@@ -1605,7 +2262,10 @@ struct ProcessReplyClosure
    */
   const void *data;
 
-  // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
+  /**
+   * Who gave us this reply? NULL for local host.
+   */
+  struct ConnectedPeer *sender;
 
   /**
    * When the reply expires.
@@ -1626,12 +2286,17 @@ struct ProcessReplyClosure
   /**
    * Type of the block.
    */
-  uint32_t type;
+  enum GNUNET_BLOCK_Type type;
 
   /**
    * How much was this reply worth to us?
    */
   uint32_t priority;
+
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
 };
 
 
@@ -1654,69 +2319,151 @@ process_reply (void *cls,
   struct ClientResponseMessage *creply;
   struct ClientList *cl;
   struct PutMessage *pm;
-  struct ContentMessage *cm;
   struct ConnectedPeer *cp;
+  struct GNUNET_TIME_Relative cur_delay;
   GNUNET_HashCode chash;
   GNUNET_HashCode mhash;
   size_t msize;
-  uint32_t prio;
 
-  
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Matched result (type %u) for query `%s' with pending request\n",
+             (unsigned int) prq->type,
+             GNUNET_h2s (key));
+#endif  
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# replies received and matched"),
+                           1,
+                           GNUNET_NO);
+  if (prq->sender != NULL)
+    {
+      /* FIXME: should we be more precise here and not use
+        "start_time" but a peer-specific time stamp? */
+      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
+      prq->sender->avg_delay.value
+       = (prq->sender->avg_delay.value * 
+          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
+      prq->sender->avg_priority
+       = (prq->sender->avg_priority * 
+          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+      if (pr->cp != NULL)
+       {
+         GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
+         GNUNET_PEER_change_rc (pr->cp->pid, 1);
+         prq->sender->last_p2p_replies
+           [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+           = pr->cp->pid;
+       }
+      else
+       {
+         if (NULL != prq->sender->last_client_replies
+             [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+           GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+                                      [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
+         prq->sender->last_client_replies
+           [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+           = pr->client_request_list->client_list->client;
+         GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+       }
+    }
   GNUNET_CRYPTO_hash (prq->data,
                      prq->size,
                      &chash);
   switch (prq->type)
     {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
       /* only possible reply, stop requesting! */
       while (NULL != pr->pending_head)
        destroy_pending_message_list_entry (pr->pending_head);
+      if (pr->qe != NULL)
+       {
+         if (pr->client_request_list != NULL)
+           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
+                                       GNUNET_YES);
+         GNUNET_DATASTORE_cancel (pr->qe);
+         pr->qe = NULL;
+       }
+      pr->do_remove = GNUNET_YES;
+      if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+       {
+         GNUNET_SCHEDULER_cancel (sched,
+                                  pr->task);
+         pr->task = GNUNET_SCHEDULER_NO_TASK;
+       }
       GNUNET_break (GNUNET_YES ==
                    GNUNET_CONTAINER_multihashmap_remove (query_request_map,
                                                          key,
                                                          pr));
       break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+      if (pr->namespace == NULL)
+       {
+         GNUNET_break (0);
+         return GNUNET_YES;
+       }
       if (0 != memcmp (pr->namespace,
                       &prq->namespace,
                       sizeof (GNUNET_HashCode)))
-       return GNUNET_YES; /* wrong namespace */        
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Reply mismatched in terms of namespace.  Discarded.\n"));
+         return GNUNET_YES; /* wrong namespace */      
+       }
       /* then: fall-through! */
-    case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_NBLOCK:
       if (pr->bf != NULL) 
        {
          mingle_hash (&chash, pr->mingle, &mhash);
          if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
                                                               &mhash))
-           return GNUNET_YES; /* duplicate */
-         GNUNET_CONTAINER_bloomfilter_add (pr->bf,
-                                           &mhash);
+           {
+             GNUNET_STATISTICS_update (stats,
+                                       gettext_noop ("# duplicate replies discarded (bloomfilter)"),
+                                       1,
+                                       GNUNET_NO);
+#if DEBUG_FS
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Duplicate response `%s', discarding.\n",
+                         GNUNET_h2s (&mhash));
+#endif
+             return GNUNET_YES; /* duplicate */
+           }
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "New response `%s', adding to filter.\n",
+                     GNUNET_h2s (&mhash));
+#endif
        }
       if (pr->client_request_list != NULL)
        {
          if (pr->replies_seen_size == pr->replies_seen_off)
-           {
-             GNUNET_array_grow (pr->replies_seen,
-                                pr->replies_seen_size,
-                                pr->replies_seen_size * 2 + 4);
-             // FIXME: recalculate BF!
-           }
-         pr->replies_seen[pr->replies_seen_off++] = chash;
+           GNUNET_array_grow (pr->replies_seen,
+                              pr->replies_seen_size,
+                              pr->replies_seen_size * 2 + 4);  
+           pr->replies_seen[pr->replies_seen_off++] = chash;         
        }
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
-      // FIXME: any checks against duplicates for SKBlocks?
+      if ( (pr->bf == NULL) ||
+          (pr->client_request_list != NULL) )
+       refresh_bloomfilter (pr);
+      GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+                                       &mhash);
       break;
     default:
       GNUNET_break (0);
       return GNUNET_YES;
     }
-  prio = pr->priority;
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
-  if (pr->client_request_list != NULL)
+  if (NULL != pr->client_request_list)
     {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies received for local clients"),
+                               1,
+                               GNUNET_NO);
       cl = pr->client_request_list->client_list;
       msize = sizeof (struct PutMessage) + prq->size;
       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
@@ -1730,37 +2477,73 @@ process_reply (void *cls,
       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
       pm->header.size = htons (msize);
       pm->type = htonl (prq->type);
-      pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
-      memcpy (&creply[1], prq->data, prq->size);      
+      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      memcpy (&pm[1], prq->data, prq->size);      
       if (NULL == cl->th)
-       cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
-                                                     msize,
-                                                     GNUNET_TIME_UNIT_FOREVER_REL,
-                                                     &transmit_to_client,
-                                                     cl);
+       {
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Transmitting result for query `%s' to client\n",
+                     GNUNET_h2s (key));
+#endif  
+         cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client,
+                                                       cl);
+       }
       GNUNET_break (cl->th != NULL);
+      if (pr->do_remove)               
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
     }
   else
     {
-      cp = pr->pht_entry->cp;
-      msize = sizeof (struct ContentMessage) + prq->size;
+      cp = pr->cp;
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Transmitting result for query `%s' to other peer (PID=%u)\n",
+                 GNUNET_h2s (key),
+                 (unsigned int) cp->pid);
+#endif  
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies received for other peers"),
+                               1,
+                               GNUNET_NO);
+      msize = sizeof (struct PutMessage) + prq->size;
       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
       reply->msize = msize;
-      reply->priority = (uint32_t) -1; /* send replies first! */
-      cm = (struct ContentMessage*) &reply[1];
-      cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
-      cm->header.size = htons (msize);
-      cm->type = htonl (prq->type);
-      cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
-      memcpy (&reply[1], prq->data, prq->size);
+      reply->priority = UINT32_MAX; /* send replies first! */
+      pm = (struct PutMessage*) &reply[1];
+      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+      pm->header.size = htons (msize);
+      pm->type = htonl (prq->type);
+      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      memcpy (&pm[1], prq->data, prq->size);
       add_to_pending_messages_for_peer (cp, reply, pr);
     }
+  return GNUNET_YES;
+}
 
 
-  // FIXME: implement hot-path routing statistics keeping!
-  return GNUNET_YES;
+/**
+ * Continuation called to notify client about result of the
+ * operation.
+ *
+ * @param cls closure
+ * @param success GNUNET_SYSERR on failure
+ * @param msg NULL on success, otherwise an error message
+ */
+static void 
+put_migration_continuation (void *cls,
+                           int success,
+                           const char *msg)
+{
+  /* FIXME */
 }
 
 
@@ -1786,10 +2569,11 @@ handle_p2p_put (void *cls,
   const struct PutMessage *put;
   uint16_t msize;
   size_t dsize;
-  uint32_t type;
+  enum GNUNET_BLOCK_Type type;
   struct GNUNET_TIME_Absolute expiration;
   GNUNET_HashCode query;
   struct ProcessReplyClosure prq;
+  const struct SBlock *sb;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PutMessage))
@@ -1800,53 +2584,68 @@ handle_p2p_put (void *cls,
   put = (const struct PutMessage*) message;
   dsize = msize - sizeof (struct PutMessage);
   type = ntohl (put->type);
-  expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
+  expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
 
-  /* first, validate! */
-  switch (type)
+  if (GNUNET_OK !=
+      GNUNET_BLOCK_check_block (type,
+                               &put[1],
+                               dsize,
+                               &query))
     {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
-      GNUNET_CRYPTO_hash (&put[1], dsize, &query);
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
-      if (GNUNET_OK !=
-         check_kblock ((const struct KBlock*) &put[1],
-                       dsize,
-                       &query))
-       return GNUNET_SYSERR;
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
-      if (GNUNET_OK !=
-         check_sblock ((const struct SBlock*) &put[1],
-                       dsize,
-                       &query,
-                       &prq.namespace))
-       return GNUNET_SYSERR;
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
-      // FIXME -- validate SKBLOCK!
-      GNUNET_break (0);
-      return GNUNET_OK;
-    default:
-      /* unknown block type */
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+    return GNUNET_SYSERR;
+  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+    { 
+      sb = (const struct SBlock*) &put[1];
+      GNUNET_CRYPTO_hash (&sb->subspace,
+                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+                         &prq.namespace);
+    }
 
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received result for query `%s' from peer `%4s'\n",
+             GNUNET_h2s (&query),
+             GNUNET_i2s (other));
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# replies received (overall)"),
+                           1,
+                           GNUNET_NO);
   /* now, lookup 'query' */
   prq.data = (const void*) &put[1];
+  if (other != NULL)
+    prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                                   &other->hashPubKey);
   prq.size = dsize;
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.finished = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
                                              &prq);
-  // FIXME: if migration is on and load is low,
-  // queue to store data in datastore;
-  // use "prq.priority" for that!
+  if (GNUNET_YES == active_migration)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
+      GNUNET_DATASTORE_put (dsh,
+                           0, &query, dsize, &put[1],
+                           type, prq.priority, 1 /* anonymity */, 
+                           expiration, 
+                           1 + prq.priority, MAX_DATASTORE_QUEUE,
+                           GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                           &put_migration_continuation, 
+                           NULL);
+    }
   return GNUNET_OK;
 }
 
@@ -1854,6 +2653,56 @@ handle_p2p_put (void *cls,
 /* **************************** P2P GET Handling ************************ */
 
 
+/**
+ * Closure for 'check_duplicate_request_{peer,client}'.
+ */
+struct CheckDuplicateRequestClosure
+{
+  /**
+   * The new request we should check if it already exists.
+   */
+  const struct PendingRequest *pr;
+
+  /**
+   * Existing request found by the checker, NULL if none.
+   */
+  struct PendingRequest *have;
+};
+
+
+/**
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same client already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest' 
+ *              that already exists)
+ * @return GNUNET_YES if we should continue to
+ *         iterate (no match yet)
+ *         GNUNET_NO if not (match found).
+ */
+static int
+check_duplicate_request_client (void *cls,
+                               const GNUNET_HashCode * key,
+                               void *value)
+{
+  struct CheckDuplicateRequestClosure *cdc = cls;
+  struct PendingRequest *have = value;
+
+  if (have->client_request_list == NULL)
+    return GNUNET_YES;
+  if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
+       (cdc->pr != have) )
+    {
+      cdc->have = have;
+      return GNUNET_NO;
+    }
+  return GNUNET_YES;
+}
+
+
 /**
  * We're processing (local) results for a search request
  * from another peer.  Pass applicable results to the
@@ -1876,7 +2725,7 @@ process_local_reply (void *cls,
                     const GNUNET_HashCode * key,
                     uint32_t size,
                     const void *data,
-                    uint32_t type,
+                    enum GNUNET_BLOCK_Type type,
                     uint32_t priority,
                     uint32_t anonymity,
                     struct GNUNET_TIME_Absolute
@@ -1885,13 +2734,44 @@ process_local_reply (void *cls,
 {
   struct PendingRequest *pr = cls;
   struct ProcessReplyClosure prq;
+  struct CheckDuplicateRequestClosure cdrc;
+  const struct SBlock *sb;
   GNUNET_HashCode dhash;
   GNUNET_HashCode mhash;
   GNUNET_HashCode query;
   
-  pr->drq = NULL;
   if (NULL == key)
     {
+#if DEBUG_FS > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Done processing local replies, forwarding request to other peers.\n");
+#endif
+      pr->qe = NULL;
+      if (pr->client_request_list != NULL)
+       {
+         GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
+                                     GNUNET_YES);
+         /* Figure out if this is a duplicate request and possibly
+            merge 'struct PendingRequest' entries */
+         cdrc.have = NULL;
+         cdrc.pr = pr;
+         GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                                     &pr->query,
+                                                     &check_duplicate_request_client,
+                                                     &cdrc);
+         if (cdrc.have != NULL)
+           {
+#if DEBUG_FS
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Received request for block `%s' twice from client, will only request once.\n",
+                         GNUNET_h2s (&pr->query));
+#endif
+             
+             destroy_pending_request (pr);
+             return;
+           }
+       }
+
       /* no more results */
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
        pr->task = GNUNET_SCHEDULER_add_now (sched,
@@ -1899,14 +2779,29 @@ process_local_reply (void *cls,
                                             pr);      
       return;
     }
-  if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "New local response to `%s' of type %u.\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
     {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Found ONDEMAND block, performing on-demand encoding\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# on-demand blocks matched requests"),
+                               1,
+                               GNUNET_NO);
       if (GNUNET_OK != 
          GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
                                            anonymity, expiration, uid, 
                                            &process_local_reply,
                                            pr))
-       GNUNET_FS_drq_get_next (GNUNET_YES);
+      if (pr->qe != NULL)
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
   /* check for duplicates */
@@ -1921,79 +2816,82 @@ process_local_reply (void *cls,
     {      
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result from datastore filtered by bloomfilter.\n");
+                 "Result from datastore filtered by bloomfilter (duplicate).\n");
 #endif
-      GNUNET_FS_drq_get_next (GNUNET_YES);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# results filtered by query bloomfilter"),
+                               1,
+                               GNUNET_NO);
+      if (pr->qe != NULL)
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Found result for query `%s' in local datastore\n",
+             GNUNET_h2s (key));
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# results found locally"),
+                           1,
+                           GNUNET_NO);
   pr->results_found++;
-  if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
-       (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
-       (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
-    {
-      if (pr->bf == NULL)
-       {
-         pr->bf_size = 32;
-         pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
-                                                     pr->bf_size, 
-                                                     BLOOMFILTER_K);
-       }
-      GNUNET_CONTAINER_bloomfilter_add (pr->bf, 
-                                       &mhash);
-    }
   memset (&prq, 0, sizeof (prq));
   prq.data = data;
   prq.expiration = expiration;
   prq.size = size;  
-  if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) &&
-       (GNUNET_OK != check_sblock ((const struct SBlock*) data,
-                                  size,
-                                  &query,
-                                  &prq.namespace)) )
+  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+    { 
+      sb = (const struct SBlock*) data;
+      GNUNET_CRYPTO_hash (&sb->subspace,
+                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+                         &prq.namespace);
+    }
+  if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
+                                            data,
+                                            size,
+                                            &query))
     {
       GNUNET_break (0);
-      /* FIXME: consider removing the block? */
-      GNUNET_FS_drq_get_next (GNUNET_YES);
+      GNUNET_DATASTORE_remove (dsh,
+                              key,
+                              size, data,
+                              -1, -1, 
+                              GNUNET_TIME_UNIT_FOREVER_REL,
+                              NULL, NULL);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
   prq.type = type;
   prq.priority = priority;  
+  prq.finished = GNUNET_NO;
   process_reply (&prq, key, pr);
-  
-  if ( (GNUNET_YES == test_load_too_high()) ||
-       (pr->results_found > 5 + 2 * pr->priority) )
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
+  if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
     {
-      GNUNET_FS_drq_get_next (GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return;
     }
-  GNUNET_FS_drq_get_next (GNUNET_YES);
-}
-
-
-/**
- * The priority level imposes a bound on the maximum
- * value for the ttl that can be requested.
- *
- * @param ttl_in requested ttl
- * @param prio given priority
- * @return ttl_in if ttl_in is below the limit,
- *         otherwise the ttl-limit for the given priority
- */
-static int32_t
-bound_ttl (int32_t ttl_in, uint32_t prio)
-{
-  unsigned long long allowed;
-
-  if (ttl_in <= 0)
-    return ttl_in;
-  allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
-  if (ttl_in > allowed)      
+  if ( (pr->client_request_list == NULL) &&
+       ( (GNUNET_YES == test_load_too_high()) ||
+        (pr->results_found > 5 + 2 * pr->priority) ) )
     {
-      if (allowed >= (1 << 30))
-        return 1 << 30;
-      return allowed;
+#if DEBUG_FS > 2
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Load too high, done with request\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# processing result set cut short due to load"),
+                               1,
+                               GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  return ttl_in;
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
@@ -2013,6 +2911,36 @@ bound_priority (uint32_t prio_in,
 }
 
 
+/**
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same peer already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest' 
+ *              that already exists)
+ * @return GNUNET_YES if we should continue to
+ *         iterate (no match yet)
+ *         GNUNET_NO if not (match found).
+ */
+static int
+check_duplicate_request_peer (void *cls,
+                             const GNUNET_HashCode * key,
+                             void *value)
+{
+  struct CheckDuplicateRequestClosure *cdc = cls;
+  struct PendingRequest *have = value;
+
+  if (cdc->pr->target_pid == have->target_pid)
+    {
+      cdc->have = have;
+      return GNUNET_NO;
+    }
+  return GNUNET_YES;
+}
+
+
 /**
  * Handle P2P "GET" request.
  *
@@ -2033,9 +2961,9 @@ handle_p2p_get (void *cls,
                uint32_t distance)
 {
   struct PendingRequest *pr;
-  struct PeerRequestEntry *pre;
   struct ConnectedPeer *cp;
   struct ConnectedPeer *cps;
+  struct CheckDuplicateRequestClosure cdc;
   struct GNUNET_TIME_Relative timeout;
   uint16_t msize;
   const struct GetMessage *gm;
@@ -2044,8 +2972,9 @@ handle_p2p_get (void *cls,
   uint32_t bm;
   size_t bfsize;
   uint32_t ttl_decrement;
-  uint32_t type;
+  enum GNUNET_BLOCK_Type type;
   double preference;
+  int have_ns;
 
   msize = ntohs(message->size);
   if (msize < sizeof (struct GetMessage))
@@ -2054,6 +2983,19 @@ handle_p2p_get (void *cls,
       return GNUNET_SYSERR;
     }
   gm = (const struct GetMessage*) message;
+  type = ntohl (gm->type);
+  switch (type)
+    {
+    case GNUNET_BLOCK_TYPE_ANY:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+      break;
+    default:
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
   while (bm > 0)
@@ -2069,10 +3011,9 @@ handle_p2p_get (void *cls,
     }  
   opt = (const GNUNET_HashCode*) &gm[1];
   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
-
   bm = ntohl (gm->hash_bitmap);
   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
-       (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) )
+       (type != GNUNET_BLOCK_TYPE_SBLOCK) )
     {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;      
@@ -2080,7 +3021,15 @@ handle_p2p_get (void *cls,
   bits = 0;
   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                           &other->hashPubKey);
-  GNUNET_assert (NULL != cps);
+  if (NULL == cps)
+    {
+      /* peer must have just disconnected */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to initiator not being connected"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_SYSERR;
+    }
   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                            &opt[bits++]);
@@ -2088,7 +3037,22 @@ handle_p2p_get (void *cls,
     cp = cps;
   if (cp == NULL)
     {
-      /* FIXME: try connect? */
+#if DEBUG_FS
+      if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
+                   GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
+      
+      else
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
+                   GNUNET_i2s (other));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to missing reverse route"),
+                               1,
+                               GNUNET_NO);
+     /* FIXME: try connect? */
       return GNUNET_OK;
     }
   /* note that we can really only check load here since otherwise
@@ -2101,23 +3065,31 @@ handle_p2p_get (void *cls,
                  "Dropping query from `%s', this peer is too busy.\n",
                  GNUNET_i2s (other));
 #endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to high load"),
+                               1,
+                               GNUNET_NO);
       return GNUNET_OK;
     }
 
+#if DEBUG_FS 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
+             GNUNET_h2s (&gm->query),
+             (unsigned int) type,
+             GNUNET_i2s (other),
+             (unsigned int) bm);
+#endif
+  have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)?sizeof(GNUNET_HashCode):0);
-  if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
-    pr->namespace = (GNUNET_HashCode*) &pr[1];
-  pr->type = ntohl (gm->type);
-  pr->mingle = gm->filter_mutator;
-  if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
-    memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
-  else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
+                     (have_ns ? sizeof(GNUNET_HashCode) : 0));
+  if (have_ns)
     {
-      GNUNET_break_op (0);
-      GNUNET_free (pr);
-      return GNUNET_SYSERR;
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
     }
+  pr->type = type;
+  pr->mingle = ntohl (gm->filter_mutator);
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
 
@@ -2130,14 +3102,20 @@ handle_p2p_get (void *cls,
     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                              TTL_DECREMENT);
   if ( (pr->ttl < 0) &&
-       (pr->ttl - ttl_decrement > 0) )
+       (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping query from `%s' due to TTL underflow.\n",
-                 GNUNET_i2s (other));
+                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
+                 GNUNET_i2s (other),
+                 pr->ttl,
+                 ttl_decrement);
 #endif
-      /* integer underflow => drop (should be very rare)! */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due TTL underflow"),
+                               1,
+                               GNUNET_NO);
+      /* integer underflow => drop (should be very rare)! */      
       GNUNET_free (pr);
       return GNUNET_OK;
     } 
@@ -2153,20 +3131,68 @@ handle_p2p_get (void *cls,
       pr->bf_size = bfsize;
     }
 
-  /* FIXME: check somewhere if request already exists, and if so,
-     recycle old state... */
-  pre = GNUNET_malloc (sizeof (struct PeerRequestEntry));
-  pre->cp = cp;
-  pre->req = pr;
-  GNUNET_CONTAINER_multihashmap_put (query_request_map,
-                                    &gm->query,
-                                    pr,
-                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  cdc.have = NULL;
+  cdc.pr = pr;
+  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                             &gm->query,
+                                             &check_duplicate_request_peer,
+                                             &cdc);
+  if (cdc.have != NULL)
+    {
+      if (cdc.have->start_time.value + cdc.have->ttl >=
+         pr->start_time.value + pr->ttl)
+       {
+         /* existing request has higher TTL, drop new one! */
+         cdc.have->priority += pr->priority;
+         destroy_pending_request (pr);
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Have existing request with higher TTL, dropping new request.\n",
+                     GNUNET_i2s (other));
+#endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped due to higher-TTL request"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_OK;
+       }
+      else
+       {
+         /* existing request has lower TTL, drop old one! */
+         pr->priority += cdc.have->priority;
+         /* Possible optimization: if we have applicable pending
+            replies in 'cdc.have', we might want to move those over
+            (this is a really rare special-case, so it is not clear
+            that this would be worth it) */
+         destroy_pending_request (cdc.have);
+         /* keep processing 'pr'! */
+       }
+    }
+
+  pr->cp = cp;
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+                                                  &gm->query,
+                                                  pr,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (peer_request_map,
+                                                  &other->hashPubKey,
+                                                  pr,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
   
   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
                                            pr,
-                                           GNUNET_TIME_absolute_get().value + pr->ttl);
+                                           pr->start_time.value + pr->ttl);
 
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# P2P searches received"),
+                           1,
+                           GNUNET_NO);
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# P2P searches active"),
+                           1,
+                           GNUNET_NO);
 
   /* calculate change in traffic preference */
   preference = (double) pr->priority;
@@ -2175,29 +3201,31 @@ handle_p2p_get (void *cls,
   cps->inc_preference += preference;
 
   /* process locally */
-  type = pr->type;
-  if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
-    type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
+  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+    type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
-  pr->drq = GNUNET_FS_drq_get (&gm->query,
-                              pr->type,                               
-                              &process_local_reply,
-                              pr,
-                              timeout,
-                              GNUNET_NO);
+  pr->qe = GNUNET_DATASTORE_get (dsh,
+                                &gm->query,
+                                type,                         
+                                pr->priority + 1,
+                                MAX_DATASTORE_QUEUE,                            
+                                timeout,
+                                &process_local_reply,
+                                pr);
 
   /* Are multiple results possible?  If so, start processing remotely now! */
   switch (pr->type)
     {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
       /* only one result, wait for datastore */
       break;
     default:
-      pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                          &forward_request_task,
-                                          pr);
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_now (sched,
+                                            &forward_request_task,
+                                            pr);
     }
 
   /* make sure we don't track too many requests */
@@ -2232,8 +3260,8 @@ handle_start_search (void *cls,
   struct PendingRequest *pr;
   uint16_t msize;
   unsigned int sc;
-  uint32_t type;
-  
+  enum GNUNET_BLOCK_Type type;
+
   msize = ntohs (message->size);
   if ( (msize < sizeof (struct SearchMessage)) ||
        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
@@ -2243,8 +3271,34 @@ handle_start_search (void *cls,
                                  GNUNET_SYSERR);
       return;
     }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# client searches received"),
+                           1,
+                           GNUNET_NO);
   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
   sm = (const struct SearchMessage*) message;
+  type = ntohl (sm->type);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s' of type %u from local client\n",
+             GNUNET_h2s (&sm->query),
+             (unsigned int) type);
+#endif
+  switch (type)
+    {
+    case GNUNET_BLOCK_TYPE_ANY:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_NBLOCK:
+      break;
+    default:
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client,
+                                 GNUNET_SYSERR);
+      return;
+    }  
 
   cl = client_list;
   while ( (cl != NULL) &&
@@ -2258,12 +3312,51 @@ handle_start_search (void *cls,
       cl->next = client_list;
       client_list = cl;
     }
-  type = ntohl (sm->type);
-
-  /* FIXME: detect duplicate request; if duplicate, simply update (merge)
-     'pr->replies_seen'! */
+  /* detect duplicate KBLOCK requests */
+  if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_ANY) )
+    {
+      crl = cl->rl_head;
+      while ( (crl != NULL) &&
+             ( (0 != memcmp (&crl->req->query,
+                             &sm->query,
+                             sizeof (GNUNET_HashCode))) ||
+               (crl->req->type != type) ) )
+       crl = crl->next;
+      if (crl != NULL)         
+       { 
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Have existing request, merging content-seen lists.\n");
+#endif
+         pr = crl->req;
+         /* Duplicate request (used to send long list of
+            known/blocked results); merge 'pr->replies_seen'
+            and update bloom filter */
+         GNUNET_array_grow (pr->replies_seen,
+                            pr->replies_seen_size,
+                            pr->replies_seen_off + sc);
+         memcpy (&pr->replies_seen[pr->replies_seen_off],
+                 &sm[1],
+                 sc * sizeof (GNUNET_HashCode));
+         pr->replies_seen_off += sc;
+         refresh_bloomfilter (pr);
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# client searches updated (merged content seen list)"),
+                                   1,
+                                   GNUNET_NO);
+         GNUNET_SERVER_receive_done (client,
+                                     GNUNET_OK);
+         return;
+       }
+    }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# client searches active"),
+                           1,
+                           GNUNET_NO);
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     ((type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0));
+                     ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
   memset (crl, 0, sizeof (struct ClientRequestList));
   crl->client_list = cl;
@@ -2280,77 +3373,51 @@ handle_start_search (void *cls,
          &sm[1],
          sc * sizeof (GNUNET_HashCode));
   pr->replies_seen_off = sc;
-  pr->anonymity_level = ntohl (sm->anonymity_level);
-  pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
-                                       (uint32_t) -1);
+  pr->anonymity_level = ntohl (sm->anonymity_level); 
+  refresh_bloomfilter (pr);
   pr->query = sm->query;
+  if (0 == (1 & ntohl (sm->options)))
+    pr->local_only = GNUNET_NO;
+  else
+    pr->local_only = GNUNET_YES;
   switch (type)
     {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
       if (0 != memcmp (&sm->target,
                       &all_zeros,
                       sizeof (GNUNET_HashCode)))
        pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
       break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
       pr->namespace = (GNUNET_HashCode*) &pr[1];
       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
       break;
     default:
       break;
     }
-  GNUNET_CONTAINER_multihashmap_put (query_request_map,
-                                    &sm->query,
-                                    pr,
-                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  pr->drq = GNUNET_FS_drq_get (&sm->query,
-                              pr->type,                               
-                              &process_local_reply,
-                              pr,
-                              GNUNET_TIME_UNIT_FOREVER_REL,
-                              GNUNET_YES);
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+                                                  &sm->query,
+                                                  pr,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+    type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
+  pr->qe = GNUNET_DATASTORE_get (dsh,
+                                &sm->query,
+                                type,
+                                -3, -1,
+                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
+                                &process_local_reply,
+                                pr);
 }
 
 
 /* **************************** Startup ************************ */
 
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&GNUNET_FS_handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&GNUNET_FS_handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
-};
-
-
 /**
  * Process fs requests.
  *
- * @param cls closure
  * @param s scheduler to use
  * @param server the initialized server
  * @param c configuration to use
@@ -2360,8 +3427,30 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
           struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+
   sched = s;
   cfg = c;
+  stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+  min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
@@ -2371,7 +3460,6 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
                              GNUNET_TIME_UNIT_FOREVER_REL,
                              NULL,
                              NULL,
-                             NULL,
                              &peer_connect_handler,
                              &peer_disconnect_handler,
                              NULL, GNUNET_NO,
@@ -2390,9 +3478,20 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
       requests_by_expiration_heap = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
       peer_request_map = NULL;
-
+      if (dsh != NULL)
+       {
+         GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
+         dsh = NULL;
+       }
       return GNUNET_SYSERR;
-    }  
+    }
+  /* FIXME: distinguish between sending and storing in options? */
+  if (active_migration) 
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Content migration is enabled, will start to gather data\n"));
+      consider_migration_gathering ();
+    }
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);
@@ -2419,11 +3518,22 @@ run (void *cls,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) ||
-       (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) ||
+  active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                          "FS",
+                                                          "ACTIVEMIGRATION");
+  dsh = GNUNET_DATASTORE_connect (cfg,
+                                 sched);
+  if (dsh == NULL)
+    {
+      GNUNET_SCHEDULER_shutdown (sched);
+      return;
+    }
+  if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
        (GNUNET_OK != main_init (sched, server, cfg)) )
     {    
       GNUNET_SCHEDULER_shutdown (sched);
+      GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
+      dsh = NULL;
       return;   
     }
 }