fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 0e592e094a201a3b4acd1c58462c6ccd5d64ebca..ccdd76de235fb1add6c72bd6d857fd95dd9d53d7 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
 /**
  * @file fs/gnunet-service-fs.c
- * @brief program that provides the file-sharing service
+ * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
+ * FIXME:
+ * - TTL/priority calculations are absent!
  * TODO:
- * - fix gazillion of minor FIXME's
- * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
- *   core to transmit to another peer; need to make sure this is bounded overall...
- * - randomly delay processing for improved anonymity (can wait)
- * - content migration (put in local DS) (can wait)
- * - handle some special cases when forwarding replies based on tracked requests (can wait)
- * - tracking of success correlations for hot-path routing (can wait)
- * - various load-based actions (can wait)
- * - validation of KSBLOCKS (can wait)
- * - remove on-demand blocks if they keep failing (can wait)
- * - check that we decrement PIDs always where necessary (can wait)
- * - find out how to use core-pulling instead of pushing (at least for some cases)
+ * - have non-zero preference / priority for requests we initiate!
+ * - implement hot-path routing decision procedure
+ * - implement: bound_priority, test_load_too_high
+ * - statistics
  */
 #include "platform.h"
 #include <float.h>
+#include "gnunet_constants.h"
 #include "gnunet_core_service.h"
 #include "gnunet_datastore_service.h"
 #include "gnunet_peer_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
 #include "gnunet_util_lib.h"
+#include "gnunet-service-fs_indexing.h"
 #include "fs.h"
 
 #define DEBUG_FS GNUNET_NO
 
+/**
+ * Maximum number of outgoing messages we queue per peer.
+ * FIXME: make configurable?
+ */
+#define MAX_QUEUE_PER_PEER 16
 
 /**
- * In-memory information about indexed files (also available
- * on-disk).
+ * Inverse of the probability that we will submit the same query
+ * to the same peer again.  If the same peer already got the query
+ * repeatedly recently, the probability is multiplied by the inverse
+ * of this number each time.  Note that we only try about every TTL_DECREMENT/2
+ * plus MAX_CORK_DELAY (so roughly every 3.5s).
  */
-struct IndexInfo
-{
-  
-  /**
-   * This is a linked list.
-   */
-  struct IndexInfo *next;
+#define RETRY_PROBABILITY_INV 3
 
-  /**
-   * Name of the indexed file.  Memory allocated
-   * at the end of this struct (do not free).
-   */
-  const char *filename;
+/**
+ * What is the maximum delay for a P2P FS message (in our interaction
+ * with core)?  FS-internal delays are another story.  The value is
+ * chosen based on the 32k block size.  Given that peers typcially
+ * have at least 1 kb/s bandwidth, 45s waits give us a chance to
+ * transmit one message even to the lowest-bandwidth peers.
+ */
+#define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
 
-  /**
-   * Context for transmitting confirmation to client,
-   * NULL if we've done this already.
-   */
-  struct GNUNET_SERVER_TransmitContext *tc;
-  
-  /**
-   * Hash of the contents of the file.
-   */
-  GNUNET_HashCode file_id;
 
-};
+
+/**
+ * Maximum number of requests (from other peers) that we're
+ * willing to have pending at any given point in time.
+ * FIXME: set from configuration.
+ */
+static uint64_t max_pending_requests = (32 * 1024);
+
+
+/**
+ * Information we keep for each pending reply.  The
+ * actual message follows at the end of this struct.
+ */
+struct PendingMessage;
+
+/**
+ * Our connection to the datastore.
+ */
+static struct GNUNET_DATASTORE_Handle *dsh;
 
 
 /**
- * Signature of a function that is called whenever a datastore
- * request can be processed (or an entry put on the queue times out).
+ * Function called upon completion of a transmission.
  *
  * @param cls closure
- * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
+ * @param pid ID of receiving peer, 0 on transmission error
  */
-typedef void (*RequestFunction)(void *cls,
-                               int ok);
+typedef void (*TransmissionContinuation)(void * cls, 
+                                        GNUNET_PEER_Id tpid);
 
 
 /**
- * Doubly-linked list of our requests for the datastore.
+ * Information we keep for each pending message (GET/PUT).  The
+ * actual message follows at the end of this struct.
  */
-struct DatastoreRequestQueue
+struct PendingMessage
 {
-
   /**
-   * This is a doubly-linked list.
+   * This is a doubly-linked list of messages to the same peer.
    */
-  struct DatastoreRequestQueue *next;
+  struct PendingMessage *next;
 
   /**
-   * This is a doubly-linked list.
+   * This is a doubly-linked list of messages to the same peer.
    */
-  struct DatastoreRequestQueue *prev;
+  struct PendingMessage *prev;
+
+  /**
+   * Entry in pending message list for this pending message.
+   */ 
+  struct PendingMessageList *pml;  
 
   /**
-   * Function to call (will issue the request).
+   * Function to call immediately once we have transmitted this
+   * message.
    */
-  RequestFunction req;
+  TransmissionContinuation cont;
 
   /**
-   * Closure for req.
+   * Closure for cont.
    */
-  void *req_cls;
+  void *cont_cls;
 
   /**
-   * When should this request time-out because we don't care anymore?
+   * Size of the reply; actual reply message follows
+   * at the end of this struct.
    */
-  struct GNUNET_TIME_Absolute timeout;
-    
+  size_t msize;
+  
   /**
-   * ID of task used for signaling timeout.
+   * How important is this message for us?
    */
-  GNUNET_SCHEDULER_TaskIdentifier task;
-
+  uint32_t priority;
 };
 
 
 /**
- * Closure for processing START_SEARCH messages from a client.
+ * Information about a peer that we are connected to.
+ * We track data that is useful for determining which
+ * peers should receive our requests.  We also keep
+ * a list of messages to transmit to this peer.
  */
-struct LocalGetContext
+struct ConnectedPeer
 {
 
   /**
-   * This is a doubly-linked list.
+   * List of the last clients for which this peer successfully
+   * answered a query.
    */
-  struct LocalGetContext *next;
+  struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
 
   /**
-   * This is a doubly-linked list.
+   * List of the last PIDs for which
+   * this peer successfully answered a query;
+   * We use 0 to indicate no successful reply.
    */
-  struct LocalGetContext *prev;
+  GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
 
   /**
-   * Client that initiated the search.
-   */
-  struct GNUNET_SERVER_Client *client;
+   * Average delay between sending the peer a request and
+   * getting a reply (only calculated over the requests for
+   * which we actually got a reply).   Calculated
+   * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+   */ 
+  struct GNUNET_TIME_Relative avg_delay;
 
   /**
-   * Array of results that we've already received 
-   * (can be NULL).
+   * Handle for an active request for transmission to this
+   * peer, or NULL.
    */
-  GNUNET_HashCode *results; 
+  struct GNUNET_CORE_TransmitHandle *cth;
 
   /**
-   * Bloomfilter over all results (for fast query construction);
-   * NULL if we don't have any results.
-   *
-   * FIXME: this member is not used, is that OK? If so, it should
-   * be removed!
+   * Messages (replies, queries, content migration) we would like to
+   * send to this peer in the near future.  Sorted by priority, head.
    */
-  struct GNUNET_CONTAINER_BloomFilter *results_bf; 
+  struct PendingMessage *pending_messages_head;
 
   /**
-   * DS request associated with this operation.
+   * Messages (replies, queries, content migration) we would like to
+   * send to this peer in the near future.  Sorted by priority, tail.
    */
-  struct DatastoreRequestQueue *req;
+  struct PendingMessage *pending_messages_tail;
 
   /**
-   * Current result message to transmit to client (or NULL).
-   */
-  struct ContentMessage *result;
-  
-  /**
-   * Type of the content that we're looking for.
-   * 0 for any.
+   * Average priority of successful replies.  Calculated
+   * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
    */
-  uint32_t type;
+  double avg_priority;
 
   /**
-   * Desired anonymity level.
+   * Increase in traffic preference still to be submitted
+   * to the core service for this peer. FIXME: double or 'uint64_t'?
    */
-  uint32_t anonymity_level;
+  double inc_preference;
 
   /**
-   * Number of results actually stored in the results array.
-   */
-  unsigned int results_used;
-  
-  /**
-   * Size of the results array in memory.
-   */
-  unsigned int results_size;
-  
-  /**
-   * Size (in bytes) of the 'results_bf' bloomfilter.
-   *
-   * FIXME: this member is not used, is that OK? If so, it should
-   * be removed!
+   * The peer's identity.
    */
-  size_t results_bf_size;
+  GNUNET_PEER_Id pid;  
 
   /**
-   * If the request is for a DBLOCK or IBLOCK, this is the identity of
-   * the peer that is known to have a response.  Set to all-zeros if
-   * such a target is not known (note that even if OUR anonymity
-   * level is >0 we may happen to know the responder's identity;
-   * nevertheless, we should probably not use it for a DHT-lookup
-   * or similar blunt actions in order to avoid exposing ourselves).
+   * Size of the linked list of 'pending_messages'.
    */
-  struct GNUNET_PeerIdentity target;
+  unsigned int pending_requests;
 
   /**
-   * If the request is for an SBLOCK, this is the identity of the
-   * pseudonym to which the SBLOCK belongs. 
+   * Which offset in "last_p2p_replies" will be updated next?
+   * (we go round-robin).
    */
-  GNUNET_HashCode namespace;
+  unsigned int last_p2p_replies_woff;
 
   /**
-   * Hash of the keyword (aka query) for KBLOCKs; Hash of
-   * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
-   * and hash of the identifier XORed with the target for
-   * SBLOCKS (aka query).
+   * Which offset in "last_client_replies" will be updated next?
+   * (we go round-robin).
    */
-  GNUNET_HashCode query;
+  unsigned int last_client_replies_woff;
 
 };
 
 
 /**
- * Possible routing policies for an FS-GET request.
- */
-enum RoutingPolicy
-  {
-    /**
-     * Simply drop the request.
-     */
-    ROUTING_POLICY_NONE = 0,
-    
-    /**
-     * Answer it if we can from local datastore.
-     */
-    ROUTING_POLICY_ANSWER = 1,
-
-    /**
-     * Forward the request to other peers (if possible).
-     */
-    ROUTING_POLICY_FORWARD = 2,
-
-    /**
-     * Forward to other peers, and ask them to route
-     * the response via ourselves.
-     */
-    ROUTING_POLICY_INDIRECT = 6,
-    
-    /**
-     * Do everything we could possibly do (that would
-     * make sense).
-     */
-    ROUTING_POLICY_ALL = 7
-  };
+ * Information we keep for each pending request.  We should try to
+ * keep this struct as small as possible since its memory consumption
+ * is key to how many requests we can have pending at once.
+ */
+struct PendingRequest;
 
 
 /**
- * Internal context we use for our initial processing
- * of a GET request.
+ * Doubly-linked list of requests we are performing
+ * on behalf of the same client.
  */
-struct ProcessGetContext
+struct ClientRequestList
 {
+
   /**
-   * The search query (used for datastore lookup).
+   * This is a doubly-linked list.
    */
-  GNUNET_HashCode query;
-  
+  struct ClientRequestList *next;
+
   /**
-   * Which peer we should forward the response to.
+   * This is a doubly-linked list.
    */
-  struct GNUNET_PeerIdentity reply_to;
+  struct ClientRequestList *prev;
 
   /**
-   * Namespace for the result (only set for SKS requests)
+   * Request this entry represents.
    */
-  GNUNET_HashCode namespace;
+  struct PendingRequest *req;
 
   /**
-   * Peer that we should forward the query to if possible
-   * (since that peer likely has the content).
+   * Client list this request belongs to.
    */
-  struct GNUNET_PeerIdentity prime_target;
+  struct ClientList *client_list;
 
+};
+
+
+/**
+ * Replies to be transmitted to the client.  The actual
+ * response message is allocated after this struct.
+ */
+struct ClientResponseMessage
+{
   /**
-   * When did we receive this request?
+   * This is a doubly-linked list.
    */
-  struct GNUNET_TIME_Absolute start_time;
+  struct ClientResponseMessage *next;
 
   /**
-   * Our entry in the DRQ (non-NULL while we wait for our
-   * turn to interact with the local database).
+   * This is a doubly-linked list.
    */
-  struct DatastoreRequestQueue *drq;
+  struct ClientResponseMessage *prev;
 
   /**
-   * Filter used to eliminate duplicate results.  Can be NULL if we
-   * are not yet filtering any results.
+   * Client list entry this response belongs to.
    */
-  struct GNUNET_CONTAINER_BloomFilter *bf;
+  struct ClientList *client_list;
 
   /**
-   * Bitmap describing which of the optional
-   * hash codes / peer identities were given to us.
+   * Number of bytes in the response.
    */
-  uint32_t bm;
+  size_t msize;
+};
 
+
+/**
+ * Linked list of clients we are performing requests
+ * for right now.
+ */
+struct ClientList
+{
   /**
-   * Desired block type.
+   * This is a linked list.
    */
-  uint32_t type;
+  struct ClientList *next;
 
   /**
-   * Priority of the request.
+   * ID of a client making a request, NULL if this entry is for a
+   * peer.
    */
-  uint32_t priority;
+  struct GNUNET_SERVER_Client *client;
 
   /**
-   * Size of the 'bf' (in bytes).
+   * Head of list of requests performed on behalf
+   * of this client right now.
    */
-  size_t bf_size;
+  struct ClientRequestList *rl_head;
 
   /**
-   * In what ways are we going to process
-   * the request?
+   * Tail of list of requests performed on behalf
+   * of this client right now.
    */
-  enum RoutingPolicy policy;
+  struct ClientRequestList *rl_tail;
 
   /**
-   * Time-to-live for the request (value
-   * we use).
+   * Head of linked list of responses.
    */
-  int32_t ttl;
+  struct ClientResponseMessage *res_head;
 
   /**
-   * Number to mingle hashes for bloom-filter
-   * tests with.
+   * Tail of linked list of responses.
    */
-  int32_t mingle;
+  struct ClientResponseMessage *res_tail;
 
   /**
-   * Number of results that were found so far.
+   * Context for sending replies.
    */
-  unsigned int results_found;
+  struct GNUNET_CONNECTION_TransmitHandle *th;
+
 };
 
 
 /**
- * Information we keep for each pending reply.
+ * Doubly-linked list of messages we are performing
+ * due to a pending request.
  */
-struct PendingReply
+struct PendingMessageList
 {
+
   /**
-   * This is a linked list.
+   * This is a doubly-linked list of messages on behalf of the same request.
    */
-  struct PendingReply *next;
+  struct PendingMessageList *next;
 
   /**
-   * Size of the reply; actual reply message follows
-   * at the end of this struct.
+   * This is a doubly-linked list of messages on behalf of the same request.
    */
-  size_t msize;
+  struct PendingMessageList *prev;
 
-};
+  /**
+   * Message this entry represents.
+   */
+  struct PendingMessage *pm;
 
+  /**
+   * Request this entry belongs to.
+   */
+  struct PendingRequest *req;
 
-/**
- * All requests from a client are kept in a doubly-linked list.
- */
-struct ClientRequestList;
+  /**
+   * Peer this message is targeted for.
+   */
+  struct ConnectedPeer *target;
+
+};
 
 
 /**
@@ -392,23 +390,23 @@ struct PendingRequest
 {
 
   /**
-   * ID of a client making a request, NULL if this entry is for a
-   * peer.
+   * If this request was made by a client, this is our entry in the
+   * client request list; otherwise NULL.
    */
-  struct GNUNET_SERVER_Client *client;
+  struct ClientRequestList *client_request_list;
 
   /**
-   * If this request was made by a client,
-   * this is our entry in the client request
-   * list; otherwise NULL.
+   * Entry of peer responsible for this entry (if this request
+   * was made by a peer).
    */
-  struct ClientRequestList *crl_entry;
+  struct ConnectedPeer *cp;
 
   /**
    * If this is a namespace query, pointer to the hash of the public
-   * key of the namespace; otherwise NULL.
+   * key of the namespace; otherwise NULL.  Pointer will be to the 
+   * end of this struct (so no need to free it).
    */
-  GNUNET_HashCode *namespace;
+  const GNUNET_HashCode *namespace;
 
   /**
    * Bloomfilter we use to filter out replies that we don't care about
@@ -417,33 +415,34 @@ struct PendingRequest
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
   /**
-   * Replies that we have received but were unable to forward yet
-   * (typically non-null only if we have a pending transmission
-   * request with the client or the respective peer).
+   * Context of our GNUNET_CORE_peer_change_preference call.
+   */
+  struct GNUNET_CORE_InformationRequestContext *irc;
+
+  /**
+   * Hash code of all replies that we have seen so far (only valid
+   * if client is not NULL since we only track replies like this for
+   * our own clients).
    */
-  struct PendingReply *replies_pending;
+  GNUNET_HashCode *replies_seen;
 
   /**
-   * Pending transmission request with the core service for the target
-   * peer (for processing of 'replies_pending') or Handle for a
-   * pending query-request for P2P-transmission with the core service.
-   * If non-NULL, this request must be cancelled should this struct be
-   * destroyed!
+   * Node in the heap representing this entry; NULL
+   * if we have no heap node.
    */
-  struct GNUNET_CORE_TransmitHandle *cth;
+  struct GNUNET_CONTAINER_HeapNode *hnode;
 
   /**
-   * Pending transmission request for the target client (for processing of
-   * 'replies_pending').
+   * Head of list of messages being performed on behalf of this
+   * request.
    */
-  struct GNUNET_CONNECTION_TransmitHandle *th;
+  struct PendingMessageList *pending_head;
 
   /**
-   * Hash code of all replies that we have seen so far (only valid
-   * if client is not NULL since we only track replies like this for
-   * our own clients).
+   * Tail of list of messages being performed on behalf of this
+   * request.
    */
-  GNUNET_HashCode *replies_seen;
+  struct PendingMessageList *pending_tail;
 
   /**
    * When did we first see this request (form this peer), or, if our
@@ -462,12 +461,6 @@ struct PendingRequest
    */
   GNUNET_SCHEDULER_TaskIdentifier task;
 
-  /**
-   * (Interned) Peer identifier (only valid if "client" is NULL)
-   * that identifies a peer that gave us this request.
-   */
-  GNUNET_PEER_Id source_pid;
-
   /**
    * (Interned) Peer identifier that identifies a preferred target
    * for requests.
@@ -479,8 +472,15 @@ struct PendingRequest
    * received our query for this content.
    */
   GNUNET_PEER_Id *used_pids;
+  
+  /**
+   * Our entry in the queue (non-NULL while we wait for our
+   * turn to interact with the local database).
+   */
+  struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
+
    * Size of the 'bf' (in bytes).
    */
   size_t bf_size;
@@ -500,6 +500,11 @@ struct PendingRequest
    */
   unsigned int used_pids_size;
 
+  /**
+   * Number of results found for this request.
+   */
+  unsigned int results_found;
+
   /**
    * How many entries in "replies_seen" are actually valid?
    */
@@ -525,8 +530,7 @@ struct PendingRequest
   uint32_t remaining_priority;
 
   /**
-   * Number to mingle hashes for bloom-filter
-   * tests with.
+   * Number to mingle hashes for bloom-filter tests with.
    */
   int32_t mingle;
 
@@ -539,854 +543,1075 @@ struct PendingRequest
   /**
    * Type of the content that this request is for.
    */
-  uint32_t type;
-
-};
+  enum GNUNET_BLOCK_Type type;
 
-
-/**
- * All requests from a client are kept in a doubly-linked list.
- */
-struct ClientRequestList
-{
   /**
-   * This is a doubly-linked list.
+   * Remove this request after transmission of the current response.
    */
-  struct ClientRequestList *next;
-
-  /**
-   * This is a doubly-linked list.
-   */ 
-  struct ClientRequestList *prev;
+  int16_t do_remove;
 
   /**
-   * A request from this client.
+   * GNUNET_YES if we should not forward this request to other peers.
    */
-  struct PendingRequest *req;
+  int16_t local_only;
 
-  /**
-   * Client list with the head and tail of this DLL.
-   */
-  struct ClientList *cl;
 };
 
 
 /**
- * Linked list of all clients that we are 
- * currently processing requests for.
+ * Block that is ready for migration to other peers.  Actual data is at the end of the block.
  */
-struct ClientList
+struct MigrationReadyBlock
 {
 
   /**
-   * This is a linked list.
-   */
-  struct ClientList *next;
-
-  /**
-   * What client is this entry for?
+   * This is a doubly-linked list.
    */
-  struct GNUNET_SERVER_Client* client;
+  struct MigrationReadyBlock *next;
 
   /**
-   * Head of the DLL of requests from this client.
+   * This is a doubly-linked list.
    */
-  struct ClientRequestList *head;
+  struct MigrationReadyBlock *prev;
 
   /**
-   * Tail of the DLL of requests from this client.
+   * Query for the block.
    */
-  struct ClientRequestList *tail;
-
-};
-
+  GNUNET_HashCode query;
 
-/**
- * Closure for "process_reply" function.
- */
-struct ProcessReplyClosure
-{
   /**
-   * The data for the reply.
+   * When does this block expire? 
    */
-  const void *data;
+  struct GNUNET_TIME_Absolute expiration;
 
   /**
-   * When the reply expires.
+   * Peers we would consider forwarding this
+   * block to.  Zero for empty entries.
    */
-  struct GNUNET_TIME_Absolute expiration;
+  GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
 
   /**
-   * Size of data.
+   * Size of the block.
    */
   size_t size;
 
   /**
-   * Namespace that this reply belongs to
-   * (if it is of type SBLOCK).
+   *  Number of targets already used.
    */
-  GNUNET_HashCode namespace;
+  unsigned int used_targets;
 
   /**
    * Type of the block.
    */
-  uint32_t type;
-
-  /**
-   * How much was this reply worth to us?
-   */
-  uint32_t priority;
+  enum GNUNET_BLOCK_Type type;
 };
 
 
 /**
- * Information about a peer that we are connected to.
- * We track data that is useful for determining which
- * peers should receive our requests.
+ * Our scheduler.
  */
-struct ConnectedPeer
-{
+static struct GNUNET_SCHEDULER_Handle *sched;
 
-  /**
-   * List of the last clients for which this peer
-   * successfully answered a query. 
-   */
-  struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
-
-  /**
-   * List of the last PIDs for which
-   * this peer successfully answered a query;
-   * We use 0 to indicate no successful reply.
-   */
-  GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
-
-  /**
-   * Average delay between sending the peer a request and
-   * getting a reply (only calculated over the requests for
-   * which we actually got a reply).   Calculated
-   * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
-   */ 
-  struct GNUNET_TIME_Relative avg_delay;
-
-  /**
-   * Average priority of successful replies.  Calculated
-   * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
-   */
-  double avg_priority;
-
-  /**
-   * The peer's identity.
-   */
-  GNUNET_PEER_Id pid;  
-
-  /**
-   * Number of requests we have currently pending
-   * with this peer (that is, requests that were
-   * transmitted so recently that we would not retransmit
-   * them right now).
-   */
-  unsigned int pending_requests;
-
-  /**
-   * Which offset in "last_p2p_replies" will be updated next?
-   * (we go round-robin).
-   */
-  unsigned int last_p2p_replies_woff;
-
-  /**
-   * Which offset in "last_client_replies" will be updated next?
-   * (we go round-robin).
-   */
-  unsigned int last_client_replies_woff;
-
-};
+/**
+ * Our configuration.
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
+/**
+ * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
 
 /**
- * Our connection to the datastore.
+ * Map of peer identifiers to "struct PendingRequest" (for that peer).
  */
-static struct GNUNET_DATASTORE_Handle *dsh;
+static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
 
 /**
- * Our scheduler.
+ * Map of query identifiers to "struct PendingRequest" (for that query).
  */
-static struct GNUNET_SCHEDULER_Handle *sched;
+static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
 
 /**
- * Our configuration.
+ * Heap with the request that will expire next at the top.  Contains
+ * pointers of type "struct PendingRequest*"; these will *also* be
+ * aliased from the "requests_by_peer" data structures and the
+ * "requests_by_query" table.  Note that requests from our clients
+ * don't expire and are thus NOT in the "requests_by_expiration"
+ * (or the "requests_by_peer" tables).
  */
-const struct GNUNET_CONFIGURATION_Handle *cfg;
+static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
 
 /**
- * Handle to the core service (NULL until we've connected to it).
+ * Handle for reporting statistics.
  */
-struct GNUNET_CORE_Handle *core;
+static struct GNUNET_STATISTICS_Handle *stats;
 
 /**
- * Head of doubly-linked LGC list.
+ * Linked list of clients we are currently processing requests for.
  */
-static struct LocalGetContext *lgc_head;
+static struct ClientList *client_list;
 
 /**
- * Tail of doubly-linked LGC list.
+ * Pointer to handle to the core service (points to NULL until we've
+ * connected to it).
  */
-static struct LocalGetContext *lgc_tail;
+static struct GNUNET_CORE_Handle *core;
 
 /**
- * Head of request queue for the datastore, sorted by timeout.
+ * Head of linked list of blocks that can be migrated.
  */
-static struct DatastoreRequestQueue *drq_head;
+static struct MigrationReadyBlock *mig_head;
 
 /**
- * Tail of request queue for the datastore.
+ * Tail of linked list of blocks that can be migrated.
  */
-static struct DatastoreRequestQueue *drq_tail;
+static struct MigrationReadyBlock *mig_tail;
 
 /**
- * Linked list of indexed files.
+ * Request to datastore for migration (or NULL).
  */
-static struct IndexInfo *indexed_files;
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
 
 /**
- * Maps hash over content of indexed files to the respective filename.
- * The filenames are pointers into the indexed_files linked list and
- * do not need to be freed.
+ * ID of task that collects blocks for migration.
  */
-static struct GNUNET_CONTAINER_MultiHashMap *ifm;
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
 
 /**
- * Map of query hash codes to requests.
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
  */
-static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
+static struct GNUNET_TIME_Relative min_migration_delay;
 
 /**
- * Map of peer IDs to requests (for those requests coming
- * from other peers).
+ * Size of the doubly-linked list of migration blocks.
  */
-static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
+static unsigned int mig_size;
 
 /**
- * Linked list of all of our clients and their requests.
+ * Are we allowed to migrate content to this peer.
  */
-static struct ClientList *clients;
+static int active_migration;
 
-/**
- * Heap with the request that will expire next at the top.  Contains
- * pointers of type "struct PendingRequest*"; these will *also* be
- * aliased from the "requests_by_peer" data structures and the
- * "requests_by_query" table.  Note that requests from our clients
- * don't expire and are thus NOT in the "requests_by_expiration"
- * (or the "requests_by_peer" tables).
- */
-static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
 
 /**
- * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ * Transmit messages by copying it to the target buffer
+ * "buf".  "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime.  In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
  */
-static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
+static size_t
+transmit_to_peer (void *cls,
+                 size_t size, void *buf);
 
-/**
- * Maximum number of requests (from other peers) that we're
- * willing to have pending at any given point in time.
- * FIXME: set from configuration (and 32 is a tiny value for testing only).
- */
-static uint64_t max_pending_requests = 32;
+
+/* ******************* clean up functions ************************ */
 
 
 /**
- * Write the current index information list to disk.
- */ 
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
 static void
-write_index_list ()
+delete_migration_block (struct MigrationReadyBlock *mb)
 {
-  struct GNUNET_BIO_WriteHandle *wh;
-  char *fn;
-  struct IndexInfo *pos;  
-
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_filename (cfg,
-                                              "FS",
-                                              "INDEXDB",
-                                              &fn))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                 _("Configuration option `%s' in section `%s' missing.\n"),
-                 "INDEXDB",
-                 "FS");
-      return;
-    }
-  wh = GNUNET_BIO_write_open (fn);
-  if (NULL == wh)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                 _("Could not open `%s'.\n"),
-                 fn);
-      GNUNET_free (fn);
-      return;
-    }
-  pos = indexed_files;
-  while (pos != NULL)
-    {
-      if ( (GNUNET_OK !=
-           GNUNET_BIO_write (wh,
-                             &pos->file_id,
-                             sizeof (GNUNET_HashCode))) ||
-          (GNUNET_OK !=
-           GNUNET_BIO_write_string (wh,
-                                    pos->filename)) )
-       break;
-      pos = pos->next;
-    }
-  if (GNUNET_OK != 
-      GNUNET_BIO_write_close (wh))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                 _("Error writing `%s'.\n"),
-                 fn);
-      GNUNET_free (fn);
-      return;
-    }
-  GNUNET_free (fn);
+  GNUNET_CONTAINER_DLL_remove (mig_head,
+                              mig_tail,
+                              mb);
+  GNUNET_PEER_decrement_rcs (mb->target_list,
+                            MIGRATION_LIST_SIZE);
+  mig_size--;
+  GNUNET_free (mb);
 }
 
 
 /**
- * Read index information from disk.
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
  */
-static void
-read_index_list ()
+static int
+is_closer (const GNUNET_HashCode *key,
+          const struct GNUNET_PeerIdentity *p1,
+          const struct GNUNET_PeerIdentity *p2)
 {
-  struct GNUNET_BIO_ReadHandle *rh;
-  char *fn;
-  struct IndexInfo *pos;  
-  char *fname;
-  GNUNET_HashCode hc;
-  size_t slen;
-  char *emsg;
+  return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+                                   &p2->hashPubKey,
+                                   key);
+}
 
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_filename (cfg,
-                                              "FS",
-                                              "INDEXDB",
-                                              &fn))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                 _("Configuration option `%s' in section `%s' missing.\n"),
-                 "INDEXDB",
-                 "FS");
-      return;
-    }
-  if (GNUNET_NO == GNUNET_DISK_file_test (fn))
-    {
-      /* no index info  yet */
-      GNUNET_free (fn);
-      return;
-    }
-  rh = GNUNET_BIO_read_open (fn);
-  if (NULL == rh)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                 _("Could not open `%s'.\n"),
-                 fn);
-      GNUNET_free (fn);
-      return;
-    }
 
-  while ( (GNUNET_OK ==
-          GNUNET_BIO_read (rh,
-                           "Hash of indexed file",
-                           &hc,
-                           sizeof (GNUNET_HashCode))) &&
-         (GNUNET_OK ==
-          GNUNET_BIO_read_string (rh, 
-                                  "Name of indexed file",
-                                  &fname,
-                                  1024 * 16)) )
-    {
-      slen = strlen (fname) + 1;
-      pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
-      pos->file_id = hc;
-      pos->filename = (const char *) &pos[1];
-      memcpy (&pos[1], fname, slen);
-      if (GNUNET_SYSERR ==
-         GNUNET_CONTAINER_multihashmap_put (ifm,
-                                            &hc,
-                                            (void*) pos->filename,
-                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls 'struct MigrationReadyBlock*' to select
+ *            targets for (or NULL for none)
+ * @param key ID of the peer 
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)
+ */
+static int
+consider_migration (void *cls,
+                   const GNUNET_HashCode *key,
+                   void *value)
+{
+  struct MigrationReadyBlock *mb = cls;
+  struct ConnectedPeer *cp = value;
+  struct MigrationReadyBlock *pos;
+  struct GNUNET_PeerIdentity cppid;
+  struct GNUNET_PeerIdentity otherpid;
+  struct GNUNET_PeerIdentity worstpid;
+  size_t msize;
+  unsigned int i;
+  unsigned int repl;
+  
+  /* consider 'cp' as a migration target for mb */
+  if (mb != NULL)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &cppid);
+      repl = MIGRATION_LIST_SIZE;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
        {
-         GNUNET_free (pos);
+         if (mb->target_list[i] == 0)
+           {
+             mb->target_list[i] = cp->pid;
+             GNUNET_PEER_change_rc (mb->target_list[i], 1);
+             repl = MIGRATION_LIST_SIZE;
+             break;
+           }
+         GNUNET_PEER_resolve (mb->target_list[i],
+                              &otherpid);
+         if ( (repl == MIGRATION_LIST_SIZE) &&
+              is_closer (&mb->query,
+                         &cppid,
+                         &otherpid)) 
+           {
+             repl = i;
+             worstpid = otherpid;
+           }
+         else if ( (repl != MIGRATION_LIST_SIZE) &&
+                   (is_closer (&mb->query,
+                               &worstpid,
+                               &otherpid) ) )
+           {
+             repl = i;
+             worstpid = otherpid;
+           }       
        }
-      else
+      if (repl != MIGRATION_LIST_SIZE) 
        {
-         pos->next = indexed_files;
-         indexed_files = pos;
+         GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+         mb->target_list[repl] = cp->pid;
+         GNUNET_PEER_change_rc (mb->target_list[repl], 1);
        }
-      GNUNET_free (fname);
     }
-  if (GNUNET_OK != 
-      GNUNET_BIO_read_close (rh, &emsg))
-    GNUNET_free (emsg);
-  GNUNET_free (fn);
+
+  /* consider scheduling transmission to cp for content migration */
+  if (cp->cth != NULL)
+    return GNUNET_YES; 
+  msize = 0;
+  pos = mig_head;
+  while (pos != NULL)
+    {
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (cp->pid == pos->target_list[i])
+           {
+             if (msize == 0)
+               msize = pos->size;
+             else
+               msize = GNUNET_MIN (msize,
+                                   pos->size);
+             break;
+           }
+       }
+      pos = pos->next;
+    }
+  if (msize == 0)
+    return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
+  cp->cth 
+    = GNUNET_CORE_notify_transmit_ready (core,
+                                        0, GNUNET_TIME_UNIT_FOREVER_REL,
+                                        (const struct GNUNET_PeerIdentity*) key,
+                                        msize + sizeof (struct PutMessage),
+                                        &transmit_to_peer,
+                                        cp);
+  return GNUNET_YES;
 }
 
 
 /**
- * We've validated the hash of the file we're about to
- * index.  Signal success to the client and update
- * our internal data structures.
- *
- * @param ii the index info entry for the request
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
  */
 static void
-signal_index_ok (struct IndexInfo *ii)
-{
-  if (GNUNET_SYSERR ==
-      GNUNET_CONTAINER_multihashmap_put (ifm,
-                                        &ii->file_id,
-                                        (void*) ii->filename,
-                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
-                 ii->filename,
-                 (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
-                                                                  &ii->file_id));
-      GNUNET_SERVER_transmit_context_append (ii->tc,
-                                            NULL, 0,
-                                            GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
-      GNUNET_SERVER_transmit_context_run (ii->tc,
-                                         GNUNET_TIME_UNIT_MINUTES);
-      GNUNET_free (ii);
-      return;
-    }
-  ii->next = indexed_files;
-  indexed_files = ii;
-  write_index_list ();
-  GNUNET_SERVER_transmit_context_append (ii->tc,
-                                        NULL, 0,
-                                        GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
-  GNUNET_SERVER_transmit_context_run (ii->tc,
-                                     GNUNET_TIME_UNIT_MINUTES);
-  ii->tc = NULL;
-}
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
- * Function called once the hash computation over an
- * indexed file has completed.
- *
- * @param cls closure, our publishing context
- * @param res resulting hash, NULL on error
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
  */
-static void 
-hash_for_index_val (void *cls,
-                   const GNUNET_HashCode *
-                   res)
+static void
+consider_migration_gathering ()
 {
-  struct IndexInfo *ii = cls;
-  
-  if ( (res == NULL) ||
-       (0 != memcmp (res,
-                    &ii->file_id,
-                    sizeof(GNUNET_HashCode))) )
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Hash mismatch trying to index file `%s' which has hash `%s'\n"),
-                 ii->filename,
-                 GNUNET_h2s (res));
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Wanted `%s'\n",
-                 GNUNET_h2s (&ii->file_id));
-#endif
-      GNUNET_SERVER_transmit_context_append (ii->tc,
-                                            NULL, 0,
-                                            GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
-      GNUNET_SERVER_transmit_context_run (ii->tc,
-                                         GNUNET_TIME_UNIT_MINUTES);
-      GNUNET_free (ii);
-      return;
-    }
-  signal_index_ok (ii);
+  struct GNUNET_TIME_Relative delay;
+
+  if (mig_qe != NULL)
+    return;
+  if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                        mig_size);
+  delay = GNUNET_TIME_relative_divide (delay,
+                                      MAX_MIGRATION_QUEUE);
+  delay = GNUNET_TIME_relative_max (delay,
+                                   min_migration_delay);
+  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          delay,
+                                          &gather_migration_blocks,
+                                          NULL);
 }
 
 
 /**
- * Handle INDEX_START-message.
+ * Process content offered for migration.
  *
  * @param cls closure
- * @param client identification of the client
- * @param message the actual message
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
  */
 static void
-handle_index_start (void *cls,
-                   struct GNUNET_SERVER_Client *client,
-                   const struct GNUNET_MessageHeader *message)
+process_migration_content (void *cls,
+                          const GNUNET_HashCode * key,
+                          uint32_t size,
+                          const void *data,
+                          enum GNUNET_BLOCK_Type type,
+                          uint32_t priority,
+                          uint32_t anonymity,
+                          struct GNUNET_TIME_Absolute
+                          expiration, uint64_t uid)
 {
-  const struct IndexStartMessage *ism;
-  const char *fn;
-  uint16_t msize;
-  struct IndexInfo *ii;
-  size_t slen;
-  uint32_t dev;
-  uint64_t ino;
-  uint32_t mydev;
-  uint64_t myino;
-
-  msize = ntohs(message->size);
-  if ( (msize <= sizeof (struct IndexStartMessage)) ||
-       ( ((const char *)message)[msize-1] != '\0') )
+  struct MigrationReadyBlock *mb;
+  
+  if (key == NULL)
     {
-      GNUNET_break (0);
-      GNUNET_SERVER_receive_done (client,
-                                 GNUNET_SYSERR);
+      mig_qe = NULL;
+      if (mig_size < MAX_MIGRATION_QUEUE)  
+       consider_migration_gathering ();
       return;
     }
-  ism = (const struct IndexStartMessage*) message;
-  fn = (const char*) &ism[1];
-  dev = ntohl (ism->device);
-  ino = GNUNET_ntohll (ism->inode);
-  ism = (const struct IndexStartMessage*) message;
-  slen = strlen (fn) + 1;
-  ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
-  ii->filename = (const char*) &ii[1];
-  memcpy (&ii[1], fn, slen);
-  ii->file_id = ism->file_id;  
-  ii->tc = GNUNET_SERVER_transmit_context_create (client);
-  if ( ( (dev != 0) ||
-        (ino != 0) ) &&
-       (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
-                                                      &mydev,
-                                                      &myino)) &&
-       ( (dev == mydev) &&
-        (ino == myino) ) )
-    {      
-      /* fast validation OK! */
-      signal_index_ok (ii);
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+    {
+      if (GNUNET_OK !=
+         GNUNET_FS_handle_on_demand_block (key, size, data,
+                                           type, priority, anonymity,
+                                           expiration, uid, 
+                                           &process_migration_content,
+                                           NULL))
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Mismatch in file identifiers (%llu != %llu or %u != %u), need to hash.\n",
-             (unsigned long long) ino,
-             (unsigned long long) myino,
-             (unsigned int) dev,
-             (unsigned int) mydev);
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
 #endif
-  /* slow validation, need to hash full file (again) */
-  GNUNET_CRYPTO_hash_file (sched,
-                          GNUNET_SCHEDULER_PRIORITY_IDLE,
-                          fn,
-                          HASHING_BLOCKSIZE,
-                          &hash_for_index_val,
-                          ii);
+  mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+  mb->query = *key;
+  mb->expiration = expiration;
+  mb->size = size;
+  mb->type = type;
+  memcpy (&mb[1], data, size);
+  GNUNET_CONTAINER_DLL_insert_after (mig_head,
+                                    mig_tail,
+                                    mig_tail,
+                                    mb);
+  mig_size++;
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &consider_migration,
+                                        mb);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
 /**
- * Handle INDEX_LIST_GET-message.
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
  */
 static void
-handle_index_list_get (void *cls,
-                      struct GNUNET_SERVER_Client *client,
-                      const struct GNUNET_MessageHeader *message)
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_SERVER_TransmitContext *tc;
-  struct IndexInfoMessage *iim;
-  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
-  size_t slen;
-  const char *fn;
-  struct GNUNET_MessageHeader *msg;
-  struct IndexInfo *pos;
-
-  tc = GNUNET_SERVER_transmit_context_create (client);
-  iim = (struct IndexInfoMessage*) buf;
-  msg = &iim->header;
-  pos = indexed_files;
-  while (NULL != pos)
-    {
-      iim->reserved = 0;
-      iim->file_id = pos->file_id;
-      fn = pos->filename;
-      slen = strlen (fn) + 1;
-      if (slen + sizeof (struct IndexInfoMessage) > 
-         GNUNET_SERVER_MAX_MESSAGE_SIZE)
-       {
-         GNUNET_break (0);
-         break;
-       }
-      memcpy (&iim[1], fn, slen);
-      GNUNET_SERVER_transmit_context_append
-       (tc,
-        &msg[1],
-        sizeof (struct IndexInfoMessage) 
-        - sizeof (struct GNUNET_MessageHeader) + slen,
-        GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
-      pos = pos->next;
-    }
-  GNUNET_SERVER_transmit_context_append (tc,
-                                        NULL, 0,
-                                        GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
-  GNUNET_SERVER_transmit_context_run (tc,
-                                     GNUNET_TIME_UNIT_MINUTES);
+  mig_task = GNUNET_SCHEDULER_NO_TASK;
+  mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                       &process_migration_content, NULL);
+  GNUNET_assert (mig_qe != NULL);
 }
 
 
 /**
- * Handle UNINDEX-message.
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
+ * We're done with a particular message list entry.
+ * Free all associated resources.
+ * 
+ * @param pml entry to destroy
  */
 static void
-handle_unindex (void *cls,
-               struct GNUNET_SERVER_Client *client,
-               const struct GNUNET_MessageHeader *message)
+destroy_pending_message_list_entry (struct PendingMessageList *pml)
 {
-  const struct UnindexMessage *um;
-  struct IndexInfo *pos;
-  struct IndexInfo *prev;
-  struct IndexInfo *next;
-  struct GNUNET_SERVER_TransmitContext *tc;
-  int found;
-  
-  um = (const struct UnindexMessage*) message;
-  found = GNUNET_NO;
-  prev = NULL;
-  pos = indexed_files;
-  while (NULL != pos)
-    {
-      next = pos->next;
-      if (0 == memcmp (&pos->file_id,
-                      &um->file_id,
-                      sizeof (GNUNET_HashCode)))
-       {
-         if (prev == NULL)
-           indexed_files = pos->next;
-         else
-           prev->next = pos->next;
-         GNUNET_free (pos);
-         found = GNUNET_YES;
-       }
-      else
-       {
-         prev = pos;
-       }
-      pos = next;
-    }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client requested unindexing of file `%s': %s\n",
-             GNUNET_h2s (&um->file_id),
-             found ? "found" : "not found");
-#endif
-  if (GNUNET_YES == found)    
-    write_index_list ();
-  tc = GNUNET_SERVER_transmit_context_create (client);
-  GNUNET_SERVER_transmit_context_append (tc,
-                                        NULL, 0,
-                                        GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
-  GNUNET_SERVER_transmit_context_run (tc,
-                                     GNUNET_TIME_UNIT_MINUTES);
+  GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
+                              pml->req->pending_tail,
+                              pml);
+  GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
+                              pml->target->pending_messages_tail,
+                              pml->pm);
+  pml->target->pending_requests--;
+  GNUNET_free (pml->pm);
+  GNUNET_free (pml);
 }
 
 
 /**
- * Run the next DS request in our
- * queue, we're done with the current one.
+ * Destroy the given pending message (and call the respective
+ * continuation).
+ *
+ * @param pm message to destroy
+ * @param tpid id of peer that the message was delivered to, or 0 for none
  */
 static void
-next_ds_request ()
+destroy_pending_message (struct PendingMessage *pm,
+                        GNUNET_PEER_Id tpid)
 {
-  struct DatastoreRequestQueue *e;
-  
-  while (NULL != (e = drq_head))
-    {
-      if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
-       break;
-      if (e->task != GNUNET_SCHEDULER_NO_TASK)
-       GNUNET_SCHEDULER_cancel (sched, e->task);
-      GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
-      e->req (e->req_cls, GNUNET_NO);
-      GNUNET_free (e);  
-    }
-  if (e == NULL)
-    return;
-  if (e->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (sched, e->task);
-  e->task = GNUNET_SCHEDULER_NO_TASK;
-  e->req (e->req_cls, GNUNET_YES);
-  GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
-  GNUNET_free (e);  
+  struct PendingMessageList *pml = pm->pml;
+  TransmissionContinuation cont;
+  void *cont_cls;
+
+  GNUNET_assert (pml->pm == pm);
+  GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+  cont = pm->cont;
+  cont_cls = pm->cont_cls;
+  destroy_pending_message_list_entry (pml);
+  cont (cont_cls, tpid);  
 }
 
 
 /**
- * A datastore request had to be timed out. 
+ * We're done processing a particular request.
+ * Free all associated resources.
  *
- * @param cls closure (of type "struct DatastoreRequestQueue*")
- * @param tc task context, unused
+ * @param pr request to destroy
  */
 static void
-timeout_ds_request (void *cls,
-                   const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct DatastoreRequestQueue *e = cls;
-
-  e->task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
-  e->req (e->req_cls, GNUNET_NO);
-  GNUNET_free (e);  
-}
-
-
-/**
- * Queue a request for the datastore.
- *
- * @param deadline by when the request should run
- * @param fun function to call once the request can be run
- * @param fun_cls closure for fun
- */
-static struct DatastoreRequestQueue *
-queue_ds_request (struct GNUNET_TIME_Relative deadline,
-                 RequestFunction fun,
-                 void *fun_cls)
+destroy_pending_request (struct PendingRequest *pr)
 {
-  struct DatastoreRequestQueue *e;
-  struct DatastoreRequestQueue *bef;
+  struct GNUNET_PeerIdentity pid;
 
-  if (drq_head == NULL)
+  if (pr->hnode != NULL)
     {
-      /* no other requests pending, run immediately */
-      fun (fun_cls, GNUNET_OK);
-      return NULL;
+      GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
+                                        pr->hnode);
+      pr->hnode = NULL;
     }
-  e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
-  e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
-  e->req = fun;
-  e->req_cls = fun_cls;
-  if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+  if (NULL == pr->client_request_list)
     {
-      /* local request, highest prio, put at head of queue
-        regardless of deadline */
-      bef = NULL;
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches active"),
+                               -1,
+                               GNUNET_NO);
     }
   else
     {
-      bef = drq_tail;
-      while ( (NULL != bef) &&
-             (e->timeout.value < bef->timeout.value) )
-       bef = bef->prev;
-    }
-  GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
-  if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
-    return e;
-  e->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                         deadline,
-                                         &timeout_ds_request,
-                                         e);
-  return e;                                   
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# client searches active"),
+                               -1,
+                               GNUNET_NO);
+    }
+  /* might have already been removed from map in 'process_reply' (if
+     there was a unique reply) or never inserted if it was a
+     duplicate; hence ignore the return value here */
+  (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+                                              &pr->query,
+                                              pr);
+  if (pr->qe != NULL)
+     {
+      GNUNET_DATASTORE_cancel (pr->qe);
+      pr->qe = NULL;
+    }
+  if (pr->client_request_list != NULL)
+    {
+      GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
+                                  pr->client_request_list->client_list->rl_tail,
+                                  pr->client_request_list);
+      GNUNET_free (pr->client_request_list);
+      pr->client_request_list = NULL;
+    }
+  if (pr->cp != NULL)
+    {
+      GNUNET_PEER_resolve (pr->cp->pid,
+                          &pid);
+      (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+                                                  &pid.hashPubKey,
+                                                  pr);
+      pr->cp = NULL;
+    }
+  if (pr->bf != NULL)
+    {
+      GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                       
+      pr->bf = NULL;
+    }
+  if (pr->irc != NULL)
+    {
+      GNUNET_CORE_peer_change_preference_cancel (pr->irc);
+      pr->irc = NULL;
+    }
+  if (pr->replies_seen != NULL)
+    {
+      GNUNET_free (pr->replies_seen);
+      pr->replies_seen = NULL;
+    }
+  if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched,
+                              pr->task);
+      pr->task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  while (NULL != pr->pending_head)    
+    destroy_pending_message_list_entry (pr->pending_head);
+  GNUNET_PEER_change_rc (pr->target_pid, -1);
+  if (pr->used_pids != NULL)
+    {
+      GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
+      GNUNET_free (pr->used_pids);
+      pr->used_pids_off = 0;
+      pr->used_pids_size = 0;
+      pr->used_pids = NULL;
+    }
+  GNUNET_free (pr);
 }
 
 
 /**
- * Free the state associated with a local get context.
+ * Method called whenever a given peer connects.
  *
- * @param lgc the lgc to free
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other' 
  */
-static void
-local_get_context_free (struct LocalGetContext *lgc) 
+static void 
+peer_connect_handler (void *cls,
+                     const struct
+                     GNUNET_PeerIdentity * peer,
+                     struct GNUNET_TIME_Relative latency,
+                     uint32_t distance)
 {
-  GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
-  GNUNET_SERVER_client_drop (lgc->client); 
-  GNUNET_free_non_null (lgc->results);
-  if (lgc->results_bf != NULL)
-    GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
-  if (lgc->req != NULL)
-    {
-      if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
-       GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
-      GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
-      GNUNET_free (lgc->req);
-    }
-  GNUNET_free (lgc);
+  struct ConnectedPeer *cp;
+  struct MigrationReadyBlock *pos;
+  
+  cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+  cp->pid = GNUNET_PEER_intern (peer);
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (connected_peers,
+                                                  &peer->hashPubKey,
+                                                  cp,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+
+  pos = mig_head;
+  while (NULL != pos)
+    {
+      (void) consider_migration (pos, &peer->hashPubKey, cp);
+      pos = pos->next;
+    }
 }
 
 
+
 /**
- * We're able to transmit the next (local) result to the client.
- * Do it and ask the datastore for more.  Or, on error, tell
- * the datastore to stop giving us more.
+ * Free (each) request made by the peer.
  *
- * @param cls our closure (struct LocalGetContext)
- * @param max maximum number of bytes we can transmit
- * @param buf where to copy our message
- * @return number of bytes copied to buf
- */
-static size_t
-transmit_local_result (void *cls,
-                      size_t max,
-                      void *buf)
+ * @param cls closure, points to peer that the request belongs to
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+destroy_request (void *cls,
+                const GNUNET_HashCode * key,
+                void *value)
 {
-  struct LocalGetContext *lgc = cls;  
-  uint16_t msize;
+  const struct GNUNET_PeerIdentity * peer = cls;
+  struct PendingRequest *pr = value;
+  
+  GNUNET_break (GNUNET_YES ==
+               GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+                                                     &peer->hashPubKey,
+                                                     pr));
+  destroy_pending_request (pr);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Method called whenever a peer disconnects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ */
+static void
+peer_disconnect_handler (void *cls,
+                        const struct
+                        GNUNET_PeerIdentity * peer)
+{
+  struct ConnectedPeer *cp;
+  struct PendingMessage *pm;
+  unsigned int i;
+  struct MigrationReadyBlock *pos;
+  struct MigrationReadyBlock *next;
+
+  GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
+                                             &peer->hashPubKey,
+                                             &destroy_request,
+                                             (void*) peer);
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (cp == NULL)
+    return;
+  for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+    {
+      if (NULL != cp->last_client_replies[i])
+       {
+         GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
+         cp->last_client_replies[i] = NULL;
+       }
+    }
+  GNUNET_break (GNUNET_YES ==
+               GNUNET_CONTAINER_multihashmap_remove (connected_peers,
+                                                     &peer->hashPubKey,
+                                                     cp));
+  /* remove this peer from migration considerations; schedule
+     alternatives */
+  next = mig_head;
+  while (NULL != (pos = next))
+    {
+      next = pos->next;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (pos->target_list[i] == cp->pid)
+           {
+             GNUNET_PEER_change_rc (pos->target_list[i], -1);
+             pos->target_list[i] = 0;
+            }
+         }
+      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+       {
+         delete_migration_block (pos);
+         consider_migration_gathering ();
+          continue;
+       }
+      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                            &consider_migration,
+                                            pos);
+    }
+
+  GNUNET_PEER_change_rc (cp->pid, -1);
+  GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+  if (NULL != cp->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+  while (NULL != (pm = cp->pending_messages_head))
+    destroy_pending_message (pm, 0 /* delivery failed */);
+  GNUNET_break (0 == cp->pending_requests);
+  GNUNET_free (cp);
+}
+
+
+/**
+ * Iterator over hash map entries that removes all occurences
+ * of the given 'client' from the 'last_client_replies' of the
+ * given connected peer.
+ *
+ * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
+ * @param key current key code (unused)
+ * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+remove_client_from_last_client_replies (void *cls,
+                                       const GNUNET_HashCode * key,
+                                       void *value)
+{
+  struct GNUNET_SERVER_Client *client = cls;
+  struct ConnectedPeer *cp = value;
+  unsigned int i;
+
+  for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+    {
+      if (cp->last_client_replies[i] == client)
+       {
+         GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
+         cp->last_client_replies[i] = NULL;
+       }
+    }  
+  return GNUNET_YES;
+}
+
+
+/**
+ * A client disconnected.  Remove all of its pending queries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ */
+static void
+handle_client_disconnect (void *cls,
+                         struct GNUNET_SERVER_Client
+                         * client)
+{
+  struct ClientList *pos;
+  struct ClientList *prev;
+  struct ClientRequestList *rcl;
+  struct ClientResponseMessage *creply;
+
+  if (client == NULL)
+    return;
+  prev = NULL;
+  pos = client_list;
+  while ( (NULL != pos) &&
+         (pos->client != client) )
+    {
+      prev = pos;
+      pos = pos->next;
+    }
+  if (pos == NULL)
+    return; /* no requests pending for this client */
+  while (NULL != (rcl = pos->rl_head))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Destroying pending request `%s' on disconnect\n",
+                 GNUNET_h2s (&rcl->req->query));
+      destroy_pending_request (rcl->req);
+    }
+  if (prev == NULL)
+    client_list = pos->next;
+  else
+    prev->next = pos->next;
+  if (pos->th != NULL)
+    {
+      GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+      pos->th = NULL;
+    }
+  while (NULL != (creply = pos->res_head))
+    {
+      GNUNET_CONTAINER_DLL_remove (pos->res_head,
+                                  pos->res_tail,
+                                  creply);
+      GNUNET_free (creply);
+    }    
+  GNUNET_SERVER_client_drop (pos->client);
+  GNUNET_free (pos);
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &remove_client_from_last_client_replies,
+                                        client);
+}
+
+
+/**
+ * Iterator to free peer entries.
+ *
+ * @param cls closure, unused
+ * @param key current key code
+ * @param value value in the hash map (peer entry)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int 
+clean_peer (void *cls,
+           const GNUNET_HashCode * key,
+           void *value)
+{
+  peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Task run during shutdown.
+ *
+ * @param cls unused
+ * @param tc unused
+ */
+static void
+shutdown_task (void *cls,
+              const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  if (mig_qe != NULL)
+    {
+      GNUNET_DATASTORE_cancel (mig_qe);
+      mig_qe = NULL;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != mig_task)
+    {
+      GNUNET_SCHEDULER_cancel (sched, mig_task);
+      mig_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  while (client_list != NULL)
+    handle_client_disconnect (NULL,
+                             client_list->client);
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &clean_peer,
+                                        NULL);
+  GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
+  GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
+  requests_by_expiration_heap = 0;
+  GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
+  connected_peers = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
+  GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
+  query_request_map = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
+  GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
+  peer_request_map = NULL;
+  GNUNET_assert (NULL != core);
+  GNUNET_CORE_disconnect (core);
+  core = NULL;
+  if (stats != NULL)
+    {
+      GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+      stats = NULL;
+    }
+  GNUNET_DATASTORE_disconnect (dsh,
+                              GNUNET_NO);
+  while (mig_head != NULL)
+    delete_migration_block (mig_head);
+  GNUNET_assert (0 == mig_size);
+  dsh = NULL;
+  sched = NULL;
+  cfg = NULL;  
+}
 
+
+/* ******************* Utility functions  ******************** */
+
+
+/**
+ * Transmit messages by copying it to the target buffer
+ * "buf".  "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime.  In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+                 size_t size, void *buf)
+{
+  struct ConnectedPeer *cp = cls;
+  char *cbuf = buf;
+  struct GNUNET_PeerIdentity pid;
+  struct PendingMessage *pm;
+  struct MigrationReadyBlock *mb;
+  struct MigrationReadyBlock *next;
+  struct PutMessage migm;
+  size_t msize;
+  unsigned int i;
+  cp->cth = NULL;
   if (NULL == buf)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Failed to transmit result to local client, aborting datastore iteration.\n");
+                 "Dropping message, core too busy.\n");
 #endif
-      /* error, abort! */
-      GNUNET_free (lgc->result);
-      lgc->result = NULL;
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return 0;
     }
-  msize = ntohs (lgc->result->header.size);
+  msize = 0;
+  while ( (NULL != (pm = cp->pending_messages_head) ) &&
+         (pm->msize <= size) )
+    {
+      memcpy (&cbuf[msize], &pm[1], pm->msize);
+      msize += pm->msize;
+      size -= pm->msize;
+      destroy_pending_message (pm, cp->pid);
+    }
+  if (NULL != pm)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &pid);
+      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                                  pm->priority,
+                                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                                                  &pid,
+                                                  pm->msize,
+                                                  &transmit_to_peer,
+                                                  cp);
+    }
+  else
+    {      
+      next = mig_head;
+      while (NULL != (mb = next))
+       {
+         next = mb->next;
+         for (i=0;i<MIGRATION_LIST_SIZE;i++)
+           {
+             if ( (cp->pid == mb->target_list[i]) &&
+                  (mb->size + sizeof (migm) <= size) )
+               {
+                 GNUNET_PEER_change_rc (mb->target_list[i], -1);
+                 mb->target_list[i] = 0;
+                 mb->used_targets++;
+                 migm.header.size = htons (sizeof (migm) + mb->size);
+                 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+                 migm.type = htonl (mb->type);
+                 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+                 memcpy (&cbuf[msize], &migm, sizeof (migm));
+                 msize += sizeof (migm);
+                 size -= sizeof (migm);
+                 memcpy (&cbuf[msize], &mb[1], mb->size);
+                 msize += mb->size;
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+                 break;
+               }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
+           }
+         if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+              (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
+           {
+             delete_migration_block (mb);
+             consider_migration_gathering ();
+           }
+       }
+      consider_migration (NULL, 
+                         &pid.hashPubKey,
+                         cp);
+    }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes of result to local client.\n",
-             msize);
+             "Transmitting %u bytes to peer %u\n",
+             msize,
+             cp->pid);
 #endif
-  GNUNET_assert (max >= msize);
-  memcpy (buf, lgc->result, msize);
-  GNUNET_free (lgc->result);
-  lgc->result = NULL;
-  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
   return msize;
 }
 
 
 /**
- * Continuation called from datastore's remove
- * function.
+ * Add a message to the set of pending messages for the given peer.
  *
- * @param cls unused
- * @param success did the deletion work?
- * @param msg error message
+ * @param cp peer to send message to
+ * @param pm message to queue
+ * @param pr request on which behalf this message is being queued
  */
 static void
-remove_cont (void *cls,
-            int success,
-            const char *msg)
+add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
+                                 struct PendingMessage *pm,
+                                 struct PendingRequest *pr)
 {
-  if (GNUNET_OK != success)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-               _("Failed to delete bogus block: %s\n"),
-               msg);
-  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+  struct PendingMessage *pos;
+  struct PendingMessageList *pml;
+  struct GNUNET_PeerIdentity pid;
+
+  GNUNET_assert (pm->next == NULL);
+  GNUNET_assert (pm->pml == NULL);    
+  pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+  pml->req = pr;
+  pml->target = cp;
+  pml->pm = pm;
+  pm->pml = pml;  
+  GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+                              pr->pending_tail,
+                              pml);
+  pos = cp->pending_messages_head;
+  while ( (pos != NULL) &&
+         (pm->priority < pos->priority) )
+    pos = pos->next;    
+  GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
+                                    cp->pending_messages_tail,
+                                    pos,
+                                    pm);
+  cp->pending_requests++;
+  if (cp->pending_requests > MAX_QUEUE_PER_PEER)
+    destroy_pending_message (cp->pending_messages_tail, 0);  
+  GNUNET_PEER_resolve (cp->pid, &pid);
+  if (NULL != cp->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+  /* need to schedule transmission */
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              cp->pending_messages_head->priority,
+                                              MAX_TRANSMIT_DELAY,
+                                              &pid,
+                                              cp->pending_messages_head->msize,
+                                              &transmit_to_peer,
+                                              cp);
+  if (cp->cth == NULL)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Failed to schedule transmission with core!\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# CORE transmission failures"),
+                               1,
+                               GNUNET_NO);
+    }
 }
 
 
 /**
- * Mingle hash with the mingle_number to
- * produce different bits.
+ * Mingle hash with the mingle_number to produce different bits.
  */
 static void
 mingle_hash (const GNUNET_HashCode * in,
@@ -1403,126 +1628,106 @@ mingle_hash (const GNUNET_HashCode * in,
 
 
 /**
- * We've received an on-demand encoded block
- * from the datastore.  Attempt to do on-demand
- * encoding and (if successful), call the 
- * continuation with the resulting block.  On
- * error, clean up and ask the datastore for
- * more results.
+ * Test if the load on this peer is too high
+ * to even consider processing the query at
+ * all.
+ * 
+ * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
+ */
+static int
+test_load_too_high ()
+{
+  return GNUNET_NO; // FIXME
+}
+
+
+/* ******************* Pending Request Refresh Task ******************** */
+
+
+
+/**
+ * We use a random delay to make the timing of requests less
+ * predictable.  This function returns such a random delay.  We add a base
+ * delay of MAX_CORK_DELAY (1s).
  *
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
- * @param cont function to call with the actual block
- * @param cont_cls closure for cont
+ * FIXME: make schedule dependent on the specifics of the request?
+ * Or bandwidth and number of connected peers and load?
+ *
+ * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
+ */
+static struct GNUNET_TIME_Relative
+get_processing_delay ()
+{
+  return 
+    GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
+                             GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                            GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                                      TTL_DECREMENT)));
+}
+
+
+/**
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers.  This function is called periodically
+ * and should forward the request to other peers until we have all
+ * possible replies.  If we have transmitted the *only* reply to
+ * the initiator we should destroy the pending request.  If we have
+ * many replies in the queue to the initiator, we should delay sending
+ * out more queries until the reply queue has shrunk some.
+ *
+ * @param cls our "struct ProcessGetContext *"
+ * @param tc unused
+ */
+static void
+forward_request_task (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Function called after we either failed or succeeded
+ * at transmitting a query to a peer.  
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tpid ID of receiving peer, 0 on transmission error
  */
 static void
-handle_on_demand_block (const GNUNET_HashCode * key,
-                       uint32_t size,
-                       const void *data,
-                       uint32_t type,
-                       uint32_t priority,
-                       uint32_t anonymity,
-                       struct GNUNET_TIME_Absolute
-                       expiration, uint64_t uid,
-                       GNUNET_DATASTORE_Iterator cont,
-                       void *cont_cls)
+transmit_query_continuation (void *cls,
+                            GNUNET_PEER_Id tpid)
 {
-  const struct OnDemandBlock *odb;
-  GNUNET_HashCode nkey;
-  struct GNUNET_CRYPTO_AesSessionKey skey;
-  struct GNUNET_CRYPTO_AesInitializationVector iv;
-  GNUNET_HashCode query;
-  ssize_t nsize;
-  char ndata[DBLOCK_SIZE];
-  char edata[DBLOCK_SIZE];
-  const char *fn;
-  struct GNUNET_DISK_FileHandle *fh;
-  uint64_t off;
+  struct PendingRequest *pr = cls;
 
-  if (size != sizeof (struct OnDemandBlock))
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# queries scheduled for forwarding"),
+                           -1,
+                           GNUNET_NO);
+  if (tpid == 0)   
     {
-      GNUNET_break (0);
-      GNUNET_DATASTORE_remove (dsh, 
-                              key,
-                              size,
-                              data,
-                              &remove_cont,
-                              NULL,
-                              GNUNET_TIME_UNIT_FOREVER_REL);     
-      return;
-    }
-  odb = (const struct OnDemandBlock*) data;
-  off = GNUNET_ntohll (odb->offset);
-  fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
-                                                       &odb->file_id);
-  fh = NULL;
-  if ( (NULL == fn) ||
-       (NULL == (fh = GNUNET_DISK_file_open (fn, 
-                                            GNUNET_DISK_OPEN_READ,
-                                            GNUNET_DISK_PERM_NONE))) ||
-       (off !=
-       GNUNET_DISK_file_seek (fh,
-                              off,
-                              GNUNET_DISK_SEEK_SET)) ||
-       (-1 ==
-       (nsize = GNUNET_DISK_file_read (fh,
-                                       ndata,
-                                       sizeof (ndata)))) )
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Could not access indexed file `%s' at offset %llu: %s\n"),
-                 GNUNET_h2s (&odb->file_id),
-                 (unsigned long long) off,
-                 STRERROR (errno));
-      if (fh != NULL)
-       GNUNET_DISK_file_close (fh);
-      /* FIXME: if this happens often, we need
-        to remove the OnDemand block from the DS! */
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);       
-      return;
-    }
-  GNUNET_DISK_file_close (fh);
-  GNUNET_CRYPTO_hash (ndata,
-                     nsize,
-                     &nkey);
-  GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
-  GNUNET_CRYPTO_aes_encrypt (ndata,
-                            nsize,
-                            &skey,
-                            &iv,
-                            edata);
-  GNUNET_CRYPTO_hash (edata,
-                     nsize,
-                     &query);
-  if (0 != memcmp (&query, 
-                  key,
-                  sizeof (GNUNET_HashCode)))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Indexed file `%s' changed at offset %llu\n"),
-                 fn,
-                 (unsigned long long) off);
-      /* FIXME: if this happens often, we need
-        to remove the OnDemand block from the DS! */
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-  cont (cont_cls,
-       key,
-       nsize,
-       edata,
-       GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
-       priority,
-       anonymity,
-       expiration,
-       uid);
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Transmission of request failed, will try again later.\n");
+#endif
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                get_processing_delay (),
+                                                &forward_request_task,
+                                                pr); 
+      return;    
+    }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# queries forwarded"),
+                           1,
+                           GNUNET_NO);
+  GNUNET_PEER_change_rc (tpid, 1);
+  if (pr->used_pids_off == pr->used_pids_size)
+    GNUNET_array_grow (pr->used_pids,
+                      pr->used_pids_size,
+                      pr->used_pids_size * 2 + 2);
+  pr->used_pids[pr->used_pids_off++] = tpid;
+  if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+    pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                            get_processing_delay (),
+                                            &forward_request_task,
+                                            pr);
 }
 
 
@@ -1557,51 +1762,201 @@ compute_bloomfilter_size (unsigned int entry_count)
 
 
 /**
- * Recalculate our bloom filter for filtering replies.
+ * Recalculate our bloom filter for filtering replies.  This function
+ * will create a new bloom filter from scratch, so it should only be
+ * called if we have no bloomfilter at all (and hence can create a
+ * fresh one of minimal size without problems) OR if our peer is the
+ * initiator (in which case we may resize to larger than mimimum size).
  *
- * @param count number of entries we are filtering right now
- * @param mingle set to our new mingling value
- * @param bf_size set to the size of the bloomfilter
- * @param entries the entries to filter
- * @return updated bloomfilter, NULL for none
- */
-static struct GNUNET_CONTAINER_BloomFilter *
-refresh_bloomfilter (unsigned int count,
-                    int32_t * mingle,
-                    size_t *bf_size,
-                    const GNUNET_HashCode *entries)
+ * @param pr request for which the BF is to be recomputed
+ */
+static void
+refresh_bloomfilter (struct PendingRequest *pr)
 {
-  struct GNUNET_CONTAINER_BloomFilter *bf;
-  size_t nsize;
   unsigned int i;
+  size_t nsize;
   GNUNET_HashCode mhash;
 
-  if (0 == count)
-    return NULL;
-  nsize = compute_bloomfilter_size (count);
-  *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
-  *bf_size = nsize;
-  bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
-                                         nsize,
-                                         BLOOMFILTER_K);
-  for (i=0;i<count;i++)
+  nsize = compute_bloomfilter_size (pr->replies_seen_off);
+  if (nsize == pr->bf_size)
+    return; /* size not changed */
+  if (pr->bf != NULL)
+    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+  pr->bf_size = nsize;
+  pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
+  pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
+                                             pr->bf_size,
+                                             BLOOMFILTER_K);
+  for (i=0;i<pr->replies_seen_off;i++)
     {
-      mingle_hash (&entries[i], *mingle, &mhash);
-      GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
+      mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
+      GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
     }
-  return bf;
 }
 
 
 /**
- * Closure used for "target_peer_select_cb".
+ * Function called after we've tried to reserve a certain amount of
+ * bandwidth for a reply.  Check if we succeeded and if so send our
+ * query.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param peer identifies the peer
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
  */
-struct PeerSelectionContext 
+static void
+target_reservation_cb (void *cls,
+                      const struct
+                      GNUNET_PeerIdentity * peer,
+                      struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
+                      struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
+                      int amount,
+                      uint64_t preference)
 {
-  /**
-   * The request for which we are selecting
-   * peers.
-   */
+  struct PendingRequest *pr = cls;
+  struct ConnectedPeer *cp;
+  struct PendingMessage *pm;
+  struct GetMessage *gm;
+  GNUNET_HashCode *ext;
+  char *bfdata;
+  size_t msize;
+  unsigned int k;
+  int no_route;
+  uint32_t bm;
+
+  pr->irc = NULL;
+  if (peer == NULL)
+    {
+      /* error in communication with core, try again later */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
+      return;
+    }
+  // (3) transmit, update ttl/priority
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (cp == NULL)
+    {
+      /* Peer must have just left */
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Selected peer disconnected!\n");
+#endif
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
+      return;
+    }
+  no_route = GNUNET_NO;
+  /* FIXME: check against DBLOCK_SIZE and possibly return
+     amount to reserve; however, this also needs to work
+     with testcases which currently start out with a far
+     too low per-peer bw limit, so they would never send
+     anything.  Big issue. */
+  if (amount == 0)
+    {
+      if (pr->cp == NULL)
+       {
+#if DEBUG_FS > 1
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
+                     amount,
+                     DBLOCK_SIZE);
+#endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# reply bandwidth reservation requests failed"),
+                                   1,
+                                   GNUNET_NO);
+         if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                    get_processing_delay (),
+                                                    &forward_request_task,
+                                                    pr);
+         return;  /* this target round failed */
+       }
+      /* FIXME: if we are "quite" busy, we may still want to skip
+        this round; need more load detection code! */
+      no_route = GNUNET_YES;
+    }
+  
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# queries scheduled for forwarding"),
+                           1,
+                           GNUNET_NO);
+  /* build message and insert message into priority queue */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Forwarding request `%s' to `%4s'!\n",
+             GNUNET_h2s (&pr->query),
+             GNUNET_i2s (peer));
+#endif
+  k = 0;
+  bm = 0;
+  if (GNUNET_YES == no_route)
+    {
+      bm |= GET_MESSAGE_BIT_RETURN_TO;
+      k++;      
+    }
+  if (pr->namespace != NULL)
+    {
+      bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
+      k++;
+    }
+  if (pr->target_pid != 0)
+    {
+      bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
+      k++;
+    }
+  msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+  pm->msize = msize;
+  gm = (struct GetMessage*) &pm[1];
+  gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
+  gm->header.size = htons (msize);
+  gm->type = htonl (pr->type);
+  pr->remaining_priority /= 2;
+  gm->priority = htonl (pr->remaining_priority);
+  gm->ttl = htonl (pr->ttl);
+  gm->filter_mutator = htonl(pr->mingle); 
+  gm->hash_bitmap = htonl (bm);
+  gm->query = pr->query;
+  ext = (GNUNET_HashCode*) &gm[1];
+  k = 0;
+  if (GNUNET_YES == no_route)
+    GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
+  if (pr->namespace != NULL)
+    memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
+  if (pr->target_pid != 0)
+    GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
+  bfdata = (char *) &ext[k];
+  if (pr->bf != NULL)
+    GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
+                                              bfdata,
+                                              pr->bf_size);
+  pm->cont = &transmit_query_continuation;
+  pm->cont_cls = pr;
+  add_to_pending_messages_for_peer (cp, pm, pr);
+}
+
+
+/**
+ * Closure used for "target_peer_select_cb".
+ */
+struct PeerSelectionContext 
+{
+  /**
+   * The request for which we are selecting
+   * peers.
+   */
   struct PendingRequest *pr;
 
   /**
@@ -1638,13 +1993,40 @@ target_peer_select_cb (void *cls,
   struct PendingRequest *pr = psc->pr;
   double score;
   unsigned int i;
+  unsigned int pc;
+
+  /* 1) check that this peer is not the initiator */
+  if (cp == pr->cp)
+    return GNUNET_YES; /* skip */         
 
-  /* 1) check if we have already (recently) forwarded to this peer */
+  /* 2) check if we have already (recently) forwarded to this peer */
+  pc = 0;
   for (i=0;i<pr->used_pids_off;i++)
-    if (pr->used_pids[i] == cp->pid)
-      return GNUNET_YES; /* skip */
-  // 2) calculate how much we'd like to forward to this peer
-  score = 0; // FIXME!
+    if (pr->used_pids[i] == cp->pid) 
+      {
+       pc++;
+       if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                          RETRY_PROBABILITY_INV))
+         {
+#if DEBUG_FS
+           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                       "NOT re-trying query that was previously transmitted %u times\n",
+                       (unsigned int) pr->used_pids_off);
+#endif
+           return GNUNET_YES; /* skip */
+         }
+      }
+#if DEBUG_FS
+  if (0 < pc)
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Re-trying query that was previously transmitted %u times to this peer\n",
+               (unsigned int) pc);
+#endif
+  // 3) calculate how much we'd like to forward to this peer
+  score = 42; // FIXME!
+  // FIXME: also need API to gather data on responsiveness
+  // of this peer (we have fields for that in 'cp', but
+  // they are never set!)
   
   /* store best-fit in closure */
   if (score > psc->target_score)
@@ -1654,900 +2036,670 @@ target_peer_select_cb (void *cls,
     }
   return GNUNET_YES;
 }
-
-
-/**
- * We use a random delay to make the timing of requests
- * less predictable.  This function returns such a random
- * delay.
- *
- * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
- */
-static struct GNUNET_TIME_Relative
-get_processing_delay ()
-{
-  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
-                                       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                                                 TTL_DECREMENT));
-}
-
-
-/**
- * Task that is run for each request with the
- * goal of forwarding the associated query to
- * other peers.  The task should re-schedule
- * itself to be re-run once the TTL has expired.
- * (or at a later time if more peers should
- * be queried earlier).
- *
- * @param cls the requests "struct PendingRequest*"
- * @param tc task context (unused)
- */
-static void
-forward_request_task (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc);
-
+  
 
 /**
- * We've selected a peer for forwarding of a query.
- * Construct the message and then re-schedule the
- * task to forward again to (other) peers.
+ * The priority level imposes a bound on the maximum
+ * value for the ttl that can be requested.
  *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param ttl_in requested ttl
+ * @param prio given priority
+ * @return ttl_in if ttl_in is below the limit,
+ *         otherwise the ttl-limit for the given priority
  */
-static size_t
-transmit_request_cb (void *cls,
-                    size_t size, 
-                    void *buf)
+static int32_t
+bound_ttl (int32_t ttl_in, uint32_t prio)
 {
-  struct PendingRequest *pr = cls;
-  struct GetMessage *gm;
-  GNUNET_HashCode *ext;
-  char *bfdata;
-  size_t msize;
-  unsigned int k;
+  unsigned long long allowed;
 
-  pr->cth = NULL;
-  /* (1) check for timeout */
-  if (NULL == buf)
+  if (ttl_in <= 0)
+    return ttl_in;
+  allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
+  if (ttl_in > allowed)      
     {
-      /* timeout, try another peer immediately again */
-      pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
-                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
-                                                    &forward_request_task,
-                                                    pr);
-      return 0;
+      if (allowed >= (1 << 30))
+        return 1 << 30;
+      return allowed;
     }
-  /* (2) build query message */
-  k = 0; // FIXME: count hash codes!
-  msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
-  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  gm = (struct GetMessage*) buf;
-  gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
-  gm->header.size = htons (msize);
-  gm->type = htonl (pr->type);
-  pr->remaining_priority /= 2;
-  gm->priority = htonl (pr->remaining_priority);
-  gm->ttl = htonl (pr->ttl);
-  gm->filter_mutator = htonl(pr->mingle);
-  gm->hash_bitmap = htonl (42);
-  gm->query = pr->query;
-  ext = (GNUNET_HashCode*) &gm[1];
-  // FIXME: setup "ext[0]..[k-1]"
-  bfdata = (char *) &ext[k];
-  if (pr->bf != NULL)
-    GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
-                                              bfdata,
-                                              pr->bf_size);
-  
-  /* (3) schedule job to do it again (or another peer, etc.) */
-  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          get_processing_delay (), // FIXME!
-                                          &forward_request_task,
-                                          pr);
-
-  return msize;
+  return ttl_in;
 }
 
 
 /**
- * Function called after we've tried to reserve
- * a certain amount of bandwidth for a reply.
- * Check if we succeeded and if so send our query.
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers.  This function is called periodically
+ * and should forward the request to other peers until we have all
+ * possible replies.  If we have transmitted the *only* reply to
+ * the initiator we should destroy the pending request.  If we have
+ * many replies in the queue to the initiator, we should delay sending
+ * out more queries until the reply queue has shrunk some.
  *
- * @param cls the requests "struct PendingRequest*"
- * @param peer identifies the peer
- * @param latency current latency estimate, "FOREVER" if we have been
- *                disconnected
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
- * @param bpm_out set to the current bandwidth limit (sending) for this peer
- * @param amount set to the amount that was actually reserved or unreserved
- * @param preference current traffic preference for the given peer
+ * @param cls our "struct ProcessGetContext *"
+ * @param tc unused
  */
 static void
-target_reservation_cb (void *cls,
-                      const struct
-                      GNUNET_PeerIdentity * peer,
-                      unsigned int bpm_in,
-                      unsigned int bpm_out,
-                      struct GNUNET_TIME_Relative
-                      latency, int amount,
-                      unsigned long long preference)
+forward_request_task (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct PendingRequest *pr = cls;
-  uint32_t priority;
-  uint16_t size;
-  struct GNUNET_TIME_Relative maxdelay;
+  struct PeerSelectionContext psc;
+  struct ConnectedPeer *cp; 
+  struct GNUNET_TIME_Relative delay;
 
-  GNUNET_assert (peer != NULL);
-  if ( (amount != DBLOCK_SIZE) ||
-       (pr->cth != NULL) )
+  pr->task = GNUNET_SCHEDULER_NO_TASK;
+  if (pr->irc != NULL)
     {
-      /* try again later; FIXME: we may need to un-reserve "amount"? */
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), // FIXME: longer?
-                                              &forward_request_task,
-                                              pr);
-      return;
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
+                 GNUNET_h2s (&pr->query));
+#endif
+      return; /* already pending */
     }
-  // (2) transmit, update ttl/priority
-  // FIXME: calculate priority, maxdelay, size properly!
-  priority = 0;
-  size = 60000;
-  maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
-  pr->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                              priority,
-                                              maxdelay,
-                                              peer,
-                                              size,
-                                              &transmit_request_cb,
-                                              pr);
-  if (pr->cth == NULL)
+  if (GNUNET_YES == pr->local_only)
+    return; /* configured to not do P2P search */
+  /* (1) select target */
+  psc.pr = pr;
+  psc.target_score = DBL_MIN;
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &target_peer_select_cb,
+                                        &psc);  
+  if (psc.target_score == DBL_MIN)
     {
-      /* try again later */
+      delay = get_processing_delay ();
+#if DEBUG_FS 
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
+                 GNUNET_h2s (&pr->query),
+                 delay.value);
+#endif
       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), // FIXME: longer?
+                                              delay,
                                               &forward_request_task,
                                               pr);
+      return; /* nobody selected */
+    }
+  /* (3) update TTL/priority */
+  
+  if (pr->client_request_list != NULL)
+    {
+      /* FIXME: use better algorithm!? */
+      if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                        4))
+       pr->priority++;
+      /* FIXME: bound priority by "customary" priority used by other peers
+        at this time! */
+      pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
+                          pr->priority);
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Trying query `%s' with priority %u and TTL %d.\n",
+                 GNUNET_h2s (&pr->query),
+                 pr->priority,
+                 pr->ttl);
+#endif
+    }
+  else
+    {
+      /* FIXME: should we do something here as well!? */
     }
+
+  /* (3) reserve reply bandwidth */
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &psc.target.hashPubKey);
+  GNUNET_assert (NULL != cp);
+  pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
+                                               &psc.target,
+                                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
+                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
+                                               DBLOCK_SIZE * 2, 
+                                               (uint64_t) cp->inc_preference,
+                                               &target_reservation_cb,
+                                               pr);
+  cp->inc_preference = 0.0;
 }
 
 
+/* **************************** P2P PUT Handling ************************ */
+
+
 /**
- * Task that is run for each request with the
- * goal of forwarding the associated query to
- * other peers.  The task should re-schedule
- * itself to be re-run once the TTL has expired.
- * (or at a later time if more peers should
- * be queried earlier).
+ * Function called after we either failed or succeeded
+ * at transmitting a reply to a peer.  
  *
  * @param cls the requests "struct PendingRequest*"
- * @param tc task context (unused)
+ * @param tpid ID of receiving peer, 0 on transmission error
  */
 static void
-forward_request_task (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmit_reply_continuation (void *cls,
+                            GNUNET_PEER_Id tpid)
 {
   struct PendingRequest *pr = cls;
-  struct PeerSelectionContext psc;
-
-  pr->task = GNUNET_SCHEDULER_NO_TASK;
-  if (pr->cth != NULL) 
-    {
-      /* we're busy transmitting a result, wait a bit */
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), 
-                                              &forward_request_task,
-                                              pr);
-      return;
-    }
-  /* (1) select target */
-  psc.pr = pr;
-  psc.target_score = DBL_MIN;
-  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                        &target_peer_select_cb,
-                                        &psc);
-  if (psc.target_score == DBL_MIN)
+  
+  switch (pr->type)
     {
-      /* no possible target found, wait some time */
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
-                                              &forward_request_task,
-                                              pr);
-      return;
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+      /* only one reply expected, done with the request! */
+      destroy_pending_request (pr);
+      break;
+    case GNUNET_BLOCK_TYPE_ANY:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+      break;
+    default:
+      GNUNET_break (0);
+      break;
     }
-  /* (2) reserve reply bandwidth */
-  // FIXME: need a way to cancel; this
-  // async operation is problematic (segv-problematic)
-  // if "pr" is destroyed while it happens!
-  GNUNET_CORE_peer_configure (core,
-                             &psc.target,
-                             GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                             -1,
-                             DBLOCK_SIZE, // FIXME: make dependent on type?
-                             0,
-                             &target_reservation_cb,
-                             pr);
 }
 
 
 /**
- * We're processing (local) results for a search request
- * from a (local) client.  Pass applicable results to the
- * client and if we are done either clean up (operation
- * complete) or switch to P2P search (more results possible).
+ * Transmit the given message by copying it to the target buffer
+ * "buf".  "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime.  In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
  *
- * @param cls our closure (struct LocalGetContext)
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
+ * @param cls closure, pointer to the 'struct ClientList*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
  */
-static void
-process_local_get_result (void *cls,
-                         const GNUNET_HashCode * key,
-                         uint32_t size,
-                         const void *data,
-                         uint32_t type,
-                         uint32_t priority,
-                         uint32_t anonymity,
-                         struct GNUNET_TIME_Absolute
-                         expiration, 
-                         uint64_t uid)
+static size_t
+transmit_to_client (void *cls,
+                 size_t size, void *buf)
 {
-  struct LocalGetContext *lgc = cls;
-  struct PendingRequest *pr;
-  struct ClientRequestList *crl;
-  struct ClientList *cl;
+  struct ClientList *cl = cls;
+  char *cbuf = buf;
+  struct ClientResponseMessage *creply;
   size_t msize;
-  unsigned int i;
-
-  if (key == NULL)
+  
+  cl->th = NULL;
+  if (NULL == buf)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received last result for `%s' from local datastore, deciding what to do next.\n",
-                 GNUNET_h2s (&lgc->query));
-#endif
-      /* no further results from datastore; continue
-        processing further requests from the client and
-        allow the next task to use the datastore; also,
-        switch to P2P requests or clean up our state. */
-      next_ds_request ();
-      GNUNET_SERVER_receive_done (lgc->client,
-                                 GNUNET_OK);
-      if ( (lgc->results_used == 0) ||
-          (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
-          (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
-          (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
-       {
-#if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Forwarding query for `%s' to network.\n",
-                     GNUNET_h2s (&lgc->query));
-#endif
-         cl = clients;
-         while ( (NULL != cl) &&
-                 (cl->client != lgc->client) )
-           cl = cl->next;
-         if (cl == NULL)
-           {
-             cl = GNUNET_malloc (sizeof (struct ClientList));
-             cl->client = lgc->client;
-             cl->next = clients;
-             clients = cl;
-           }
-         crl = GNUNET_malloc (sizeof (struct ClientRequestList));
-         crl->cl = cl;
-         GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
-         pr = GNUNET_malloc (sizeof (struct PendingRequest));
-         pr->client = lgc->client;
-         GNUNET_SERVER_client_keep (pr->client);
-         pr->crl_entry = crl;
-         crl->req = pr;
-         if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
-           {
-             pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
-             *pr->namespace = lgc->namespace;
-           }
-         pr->replies_seen = lgc->results;
-         lgc->results = NULL;
-         pr->start_time = GNUNET_TIME_absolute_get ();
-         pr->query = lgc->query;
-         pr->target_pid = GNUNET_PEER_intern (&lgc->target);
-         pr->replies_seen_off = lgc->results_used;
-         pr->replies_seen_size = lgc->results_size;
-         lgc->results_size = 0;
-         pr->type = lgc->type;
-         pr->anonymity_level = lgc->anonymity_level;
-         pr->bf = refresh_bloomfilter (pr->replies_seen_off,
-                                       &pr->mingle,
-                                       &pr->bf_size,
-                                       pr->replies_seen);
-         GNUNET_CONTAINER_multihashmap_put (requests_by_query,
-                                            &pr->query,
-                                            pr,
-                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                  get_processing_delay (),
-                                                  &forward_request_task,
-                                                  pr);
-         local_get_context_free (lgc);
-         return;
-       }
-      /* got all possible results, clean up! */
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Found all possible results for query for `%s', done!\n",
-                 GNUNET_h2s (&lgc->query));
+                 "Not sending reply, client communication problem.\n");
 #endif
-      local_get_context_free (lgc);
-      return;
+      return 0;
     }
-  if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
-    {
+  msize = 0;
+  while ( (NULL != (creply = cl->res_head) ) &&
+         (creply->msize <= size) )
+    {
+      memcpy (&cbuf[msize], &creply[1], creply->msize);
+      msize += creply->msize;
+      size -= creply->msize;
+      GNUNET_CONTAINER_DLL_remove (cl->res_head,
+                                  cl->res_tail,
+                                  creply);
+      GNUNET_free (creply);
+    }
+  if (NULL != creply)
+    cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+                                                 creply->msize,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 &transmit_to_client,
+                                                 cl);
 #if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received on-demand block for `%s' from local datastore, fetching data.\n",
-                 GNUNET_h2s (&lgc->query));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted %u bytes to client\n",
+             (unsigned int) msize);
 #endif
-      handle_on_demand_block (key, size, data, type, priority, 
-                             anonymity, expiration, uid,
-                             &process_local_get_result,
-                             lgc);
-      return;
-    }
-  if ( (type != lgc->type) &&
-       (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
-    {
-      /* this should be virtually impossible to reach (DBLOCK 
-        query hash being identical to KBLOCK/SBLOCK query hash);
-        nevertheless, if it happens, the correct thing is to
-        simply skip the result. */
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
-                 type,
-                 lgc->type,
-                 GNUNET_h2s (&lgc->query));
-#endif
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);       
-      return;
-    }
-  /* check if this is a result we've alredy
-     received */
-  for (i=0;i<lgc->results_used;i++)
-    if (0 == memcmp (key,
-                    &lgc->results[i],
-                    sizeof (GNUNET_HashCode)))
-      {
-#if DEBUG_FS
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                   "Received duplicate result for `%s' from local datastore, ignoring.\n",
-                   GNUNET_h2s (&lgc->query));
-#endif
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-       return; 
-      }
-  if (lgc->results_used == lgc->results_size)
-    GNUNET_array_grow (lgc->results,
-                      lgc->results_size,
-                      lgc->results_size * 2 + 2);
-  GNUNET_CRYPTO_hash (data, 
-                     size, 
-                     &lgc->results[lgc->results_used++]);    
-  msize = size + sizeof (struct ContentMessage);
-  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  lgc->result = GNUNET_malloc (msize);
-  lgc->result->header.size = htons (msize);
-  lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
-  lgc->result->type = htonl (type);
-  lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
-  memcpy (&lgc->result[1],
-         data,
-         size);
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received new result for `%s' from local datastore, passing to client.\n",
-             GNUNET_h2s (&lgc->query));
-#endif
-  GNUNET_SERVER_notify_transmit_ready (lgc->client,
-                                      msize,
-                                      GNUNET_TIME_UNIT_FOREVER_REL,
-                                      &transmit_local_result,
-                                      lgc);
+  return msize;
 }
 
 
 /**
- * We're processing a search request from a local
- * client.  Now it is our turn to query the datastore.
- * 
- * @param cls our closure (struct LocalGetContext)
- * @param tc unused
+ * Closure for "process_reply" function.
  */
-static void
-transmit_local_get (void *cls,
-                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+struct ProcessReplyClosure
 {
-  struct LocalGetContext *lgc = cls;
-  uint32_t type;
-  
-  type = lgc->type;
-  if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
-    type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
-  GNUNET_DATASTORE_get (dsh,
-                       &lgc->query,
-                       type,
-                       &process_local_get_result,
-                       lgc,
-                       GNUNET_TIME_UNIT_FOREVER_REL);
-}
+  /**
+   * The data for the reply.
+   */
+  const void *data;
 
+  /**
+   * Who gave us this reply? NULL for local host.
+   */
+  struct ConnectedPeer *sender;
 
-/**
- * We're processing a search request from a local
- * client.  Now it is our turn to query the datastore.
- * 
- * @param cls our closure (struct LocalGetContext)
- * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
- */
-static void 
-transmit_local_get_ready (void *cls,
-                         int ok)
-{
-  struct LocalGetContext *lgc = cls;
+  /**
+   * When the reply expires.
+   */
+  struct GNUNET_TIME_Absolute expiration;
 
-  GNUNET_assert (GNUNET_OK == ok);
-  GNUNET_SCHEDULER_add_continuation (sched,
-                                    &transmit_local_get,
-                                    lgc,
-                                    GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-}
+  /**
+   * Size of data.
+   */
+  size_t size;
 
+  /**
+   * Namespace that this reply belongs to
+   * (if it is of type SBLOCK).
+   */
+  GNUNET_HashCode namespace;
 
-/**
- * Handle START_SEARCH-message (search request from client).
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_start_search (void *cls,
-                    struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *message)
-{
-  const struct SearchMessage *sm;
-  struct LocalGetContext *lgc;
-  uint16_t msize;
-  unsigned int sc;
-  
-  msize = ntohs (message->size);
-  if ( (msize < sizeof (struct SearchMessage)) ||
-       (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
-    {
-      GNUNET_break (0);
-      GNUNET_SERVER_receive_done (client,
-                                 GNUNET_SYSERR);
-      return;
-    }
-  sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
-  sm = (const struct SearchMessage*) message;
-  GNUNET_SERVER_client_keep (client);
-  lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
-  if  (sc > 0)
-    {
-      lgc->results_used = sc;
-      GNUNET_array_grow (lgc->results,
-                        lgc->results_size,
-                        sc * 2);
-      memcpy (lgc->results,
-             &sm[1],
-             sc * sizeof (GNUNET_HashCode));
-    }
-  lgc->client = client;
-  lgc->type = ntohl (sm->type);
-  lgc->anonymity_level = ntohl (sm->anonymity_level);
-  switch (lgc->type)
-    {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
-      lgc->target.hashPubKey = sm->target;
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
-      lgc->namespace = sm->target;
-      break;
-    default:
-      break;
-    }
-  lgc->query = sm->query;
-  GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
-  lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
-                              &transmit_local_get_ready,
-                              lgc);
-}
+  /**
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
 
+  /**
+   * How much was this reply worth to us?
+   */
+  uint32_t priority;
 
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
 };
 
 
 /**
- * Clean up the memory used by the PendingRequest structure (except
- * for the client or peer list that the request may be part of).
+ * We have received a reply; handle it!
  *
- * @param pr request to clean up
+ * @param cls response (struct ProcessReplyClosure)
+ * @param key our query
+ * @param value value in the hash map (info about the query)
+ * @return GNUNET_YES (we should continue to iterate)
  */
-static void
-destroy_pending_request (struct PendingRequest *pr)
+static int
+process_reply (void *cls,
+              const GNUNET_HashCode * key,
+              void *value)
 {
-  struct PendingReply *reply;
+  struct ProcessReplyClosure *prq = cls;
+  struct PendingRequest *pr = value;
+  struct PendingMessage *reply;
+  struct ClientResponseMessage *creply;
   struct ClientList *cl;
+  struct PutMessage *pm;
+  struct ConnectedPeer *cp;
+  struct GNUNET_TIME_Relative cur_delay;
+  GNUNET_HashCode chash;
+  GNUNET_HashCode mhash;
+  size_t msize;
 
-  GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
-                                       &pr->query,
-                                       pr);
-  // FIXME: not sure how this can work (efficiently)
-  // also, what does the return value mean?
-  if (pr->client == NULL)
-    {
-      GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
-                                        pr);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Matched result (type %u) for query `%s' with pending request\n",
+             (unsigned int) prq->type,
+             GNUNET_h2s (key));
+#endif  
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# replies received and matched"),
+                           1,
+                           GNUNET_NO);
+  if (prq->sender != NULL)
+    {
+      /* FIXME: should we be more precise here and not use
+        "start_time" but a peer-specific time stamp? */
+      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
+      prq->sender->avg_delay.value
+       = (prq->sender->avg_delay.value * 
+          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
+      prq->sender->avg_priority
+       = (prq->sender->avg_priority * 
+          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+      if (pr->cp != NULL)
+       {
+         GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
+         GNUNET_PEER_change_rc (pr->cp->pid, 1);
+         prq->sender->last_p2p_replies
+           [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+           = pr->cp->pid;
+       }
+      else
+       {
+         if (NULL != prq->sender->last_client_replies
+             [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+           GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+                                      [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
+         prq->sender->last_client_replies
+           [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+           = pr->client_request_list->client_list->client;
+         GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+       }
     }
-  else
+  GNUNET_CRYPTO_hash (prq->data,
+                     prq->size,
+                     &chash);
+  switch (prq->type)
     {
-      cl = pr->crl_entry->cl;
-      GNUNET_CONTAINER_DLL_remove (cl->head,
-                                  cl->tail,
-                                  pr->crl_entry);
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+      /* only possible reply, stop requesting! */
+      while (NULL != pr->pending_head)
+       destroy_pending_message_list_entry (pr->pending_head);
+      if (pr->qe != NULL)
+       {
+         if (pr->client_request_list != NULL)
+           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
+                                       GNUNET_YES);
+         GNUNET_DATASTORE_cancel (pr->qe);
+         pr->qe = NULL;
+       }
+      pr->do_remove = GNUNET_YES;
+      if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+       {
+         GNUNET_SCHEDULER_cancel (sched,
+                                  pr->task);
+         pr->task = GNUNET_SCHEDULER_NO_TASK;
+       }
+      GNUNET_break (GNUNET_YES ==
+                   GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+                                                         key,
+                                                         pr));
+      break;
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+      if (pr->namespace == NULL)
+       {
+         GNUNET_break (0);
+         return GNUNET_YES;
+       }
+      if (0 != memcmp (pr->namespace,
+                      &prq->namespace,
+                      sizeof (GNUNET_HashCode)))
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Reply mismatched in terms of namespace.  Discarded.\n"));
+         return GNUNET_YES; /* wrong namespace */      
+       }
+      /* then: fall-through! */
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_NBLOCK:
+      if (pr->bf != NULL) 
+       {
+         mingle_hash (&chash, pr->mingle, &mhash);
+         if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
+                                                              &mhash))
+           {
+             GNUNET_STATISTICS_update (stats,
+                                       gettext_noop ("# duplicate replies discarded (bloomfilter)"),
+                                       1,
+                                       GNUNET_NO);
+#if DEBUG_FS
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Duplicate response `%s', discarding.\n",
+                         GNUNET_h2s (&mhash));
+#endif
+             return GNUNET_YES; /* duplicate */
+           }
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "New response `%s', adding to filter.\n",
+                     GNUNET_h2s (&mhash));
+#endif
+       }
+      if (pr->client_request_list != NULL)
+       {
+         if (pr->replies_seen_size == pr->replies_seen_off)
+           GNUNET_array_grow (pr->replies_seen,
+                              pr->replies_seen_size,
+                              pr->replies_seen_size * 2 + 4);  
+           pr->replies_seen[pr->replies_seen_off++] = chash;         
+       }
+      if ( (pr->bf == NULL) ||
+          (pr->client_request_list != NULL) )
+       refresh_bloomfilter (pr);
+      GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+                                       &mhash);
+      break;
+    default:
+      GNUNET_break (0);
+      return GNUNET_YES;
     }
-  if (GNUNET_SCHEDULER_NO_TASK != pr->task)
-    GNUNET_SCHEDULER_cancel (sched, pr->task);
-  if (NULL != pr->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
-  if (NULL != pr->bf)
-    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
-  if (NULL != pr->th)
-    GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
-  while (NULL != (reply = pr->replies_pending))
+  prq->priority += pr->remaining_priority;
+  pr->remaining_priority = 0;
+  if (NULL != pr->client_request_list)
     {
-      pr->replies_pending = reply->next;
-      GNUNET_free (reply);
-    }
-  GNUNET_PEER_change_rc (pr->source_pid, -1);
-  GNUNET_PEER_change_rc (pr->target_pid, -1);
-  GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
-  GNUNET_free_non_null (pr->used_pids);
-  GNUNET_free_non_null (pr->replies_seen);
-  GNUNET_free_non_null (pr->namespace);
-  GNUNET_free (pr);
-}
-
-
-/**
- * A client disconnected.  Remove all of its pending queries.
- *
- * @param cls closure, NULL
- * @param client identification of the client
- */
-static void
-handle_client_disconnect (void *cls,
-                         struct GNUNET_SERVER_Client
-                         * client)
-{
-  struct LocalGetContext *lgc;
-  struct ClientList *cpos;
-  struct ClientList *cprev;
-  struct ClientRequestList *rl;
-
-  lgc = lgc_head;
-  while ( (NULL != lgc) &&
-         (lgc->client != client) )
-    lgc = lgc->next;
-  if (lgc != NULL)
-    local_get_context_free (lgc);
-  cprev = NULL;
-  cpos = clients;
-  while ( (NULL != cpos) &&
-         (clients->client != client) )
-    {
-      cprev = cpos;
-      cpos = cpos->next;
-    }
-  if (cpos != NULL)
-    {
-      if (cprev == NULL)
-       clients = cpos->next;
-      else
-       cprev->next = cpos->next;
-      while (NULL != (rl = cpos->head))
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies received for local clients"),
+                               1,
+                               GNUNET_NO);
+      cl = pr->client_request_list->client_list;
+      msize = sizeof (struct PutMessage) + prq->size;
+      creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
+      creply->msize = msize;
+      creply->client_list = cl;
+      GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
+                                        cl->res_tail,
+                                        cl->res_tail,
+                                        creply);      
+      pm = (struct PutMessage*) &creply[1];
+      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+      pm->header.size = htons (msize);
+      pm->type = htonl (prq->type);
+      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      memcpy (&pm[1], prq->data, prq->size);      
+      if (NULL == cl->th)
        {
-         cpos->head = rl->next;
-         destroy_pending_request (rl->req);
-         GNUNET_free (rl);
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Transmitting result for query `%s' to client\n",
+                     GNUNET_h2s (key));
+#endif  
+         cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client,
+                                                       cl);
        }
-      GNUNET_free (cpos);
+      GNUNET_break (cl->th != NULL);
+      if (pr->do_remove)               
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
+    }
+  else
+    {
+      cp = pr->cp;
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Transmitting result for query `%s' to other peer (PID=%u)\n",
+                 GNUNET_h2s (key),
+                 (unsigned int) cp->pid);
+#endif  
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies received for other peers"),
+                               1,
+                               GNUNET_NO);
+      msize = sizeof (struct PutMessage) + prq->size;
+      reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
+      reply->cont = &transmit_reply_continuation;
+      reply->cont_cls = pr;
+      reply->msize = msize;
+      reply->priority = UINT32_MAX; /* send replies first! */
+      pm = (struct PutMessage*) &reply[1];
+      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+      pm->header.size = htons (msize);
+      pm->type = htonl (prq->type);
+      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      memcpy (&pm[1], prq->data, prq->size);
+      add_to_pending_messages_for_peer (cp, reply, pr);
     }
+  return GNUNET_YES;
 }
 
 
 /**
- * Iterator over entries in the "requests_by_query" map
- * that frees all the entries.
+ * Continuation called to notify client about result of the
+ * operation.
  *
- * @param cls closure, NULL
- * @param key current key code (the query, unused) 
- * @param value value in the hash map, of type "struct PendingRequest*"
- * @return GNUNET_YES (we should continue to  iterate)
+ * @param cls closure
+ * @param success GNUNET_SYSERR on failure
+ * @param msg NULL on success, otherwise an error message
  */
-static int 
-destroy_pending_request_cb (void *cls,
-                           const GNUNET_HashCode * key,
-                           void *value)
+static void 
+put_migration_continuation (void *cls,
+                           int success,
+                           const char *msg)
 {
-  struct PendingRequest *pr = value;
-
-  destroy_pending_request (pr);
-  return GNUNET_YES;
+  /* FIXME */
 }
 
 
 /**
- * Task run during shutdown.
+ * Handle P2P "PUT" message.
  *
- * @param cls unused
- * @param tc unused
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ *        for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other' 
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
  */
-static void
-shutdown_task (void *cls,
-              const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+handle_p2p_put (void *cls,
+               const struct GNUNET_PeerIdentity *other,
+               const struct GNUNET_MessageHeader *message,
+               struct GNUNET_TIME_Relative latency,
+               uint32_t distance)
 {
-  struct IndexInfo *pos;  
+  const struct PutMessage *put;
+  uint16_t msize;
+  size_t dsize;
+  enum GNUNET_BLOCK_Type type;
+  struct GNUNET_TIME_Absolute expiration;
+  GNUNET_HashCode query;
+  struct ProcessReplyClosure prq;
+  const struct SBlock *sb;
 
-  if (NULL != core)
+  msize = ntohs (message->size);
+  if (msize < sizeof (struct PutMessage))
     {
-      GNUNET_CORE_disconnect (core);
-      core = NULL;
+      GNUNET_break_op(0);
+      return GNUNET_SYSERR;
     }
-  if (NULL != dsh)
+  put = (const struct PutMessage*) message;
+  dsize = msize - sizeof (struct PutMessage);
+  type = ntohl (put->type);
+  expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
+
+  if (GNUNET_OK !=
+      GNUNET_BLOCK_check_block (type,
+                               &put[1],
+                               dsize,
+                               &query))
     {
-      GNUNET_DATASTORE_disconnect (dsh,
-                                  GNUNET_NO);
-      dsh = NULL;
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
     }
-  GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
-                                        &destroy_pending_request_cb,
-                                        NULL);
-  while (clients != NULL)
-    handle_client_disconnect (NULL,
-                             clients->client);
-  GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
-  requests_by_query = NULL;
-  GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
-  requests_by_peer = NULL;
-  GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
-  requests_by_expiration = NULL;
-  // FIXME: iterate over entries and free individually?
-  // (or do we get disconnect notifications?)
-  GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
-  connected_peers = NULL;
-  GNUNET_CONTAINER_multihashmap_destroy (ifm);
-  ifm = NULL;
-  while (NULL != (pos = indexed_files))
-    {
-      indexed_files = pos->next;
-      GNUNET_free (pos);
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+    return GNUNET_SYSERR;
+  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+    { 
+      sb = (const struct SBlock*) &put[1];
+      GNUNET_CRYPTO_hash (&sb->subspace,
+                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+                         &prq.namespace);
     }
-}
-
-
-/**
- * Free (each) request made by the peer.
- *
- * @param cls closure, points to peer that the request belongs to
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES (we should continue to iterate)
- */
-static int
-destroy_request (void *cls,
-                const GNUNET_HashCode * key,
-                void *value)
-{
-  const struct GNUNET_PeerIdentity * peer = cls;
-  struct PendingRequest *pr = value;
-  
-  GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
-                                       &peer->hashPubKey,
-                                       pr);
-  destroy_pending_request (pr);
-  return GNUNET_YES;
-}
 
-
-
-/**
- * Method called whenever a given peer connects.
- *
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- */
-static void 
-peer_connect_handler (void *cls,
-                     const struct
-                     GNUNET_PeerIdentity * peer)
-{
-  struct ConnectedPeer *cp;
-
-  cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
-  cp->pid = GNUNET_PEER_intern (peer);
-  GNUNET_CONTAINER_multihashmap_put (connected_peers,
-                                    &peer->hashPubKey,
-                                    cp,
-                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received result for query `%s' from peer `%4s'\n",
+             GNUNET_h2s (&query),
+             GNUNET_i2s (other));
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# replies received (overall)"),
+                           1,
+                           GNUNET_NO);
+  /* now, lookup 'query' */
+  prq.data = (const void*) &put[1];
+  if (other != NULL)
+    prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                                   &other->hashPubKey);
+  prq.size = dsize;
+  prq.type = type;
+  prq.expiration = expiration;
+  prq.priority = 0;
+  prq.finished = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                             &query,
+                                             &process_reply,
+                                             &prq);
+  if (GNUNET_YES == active_migration)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
+      GNUNET_DATASTORE_put (dsh,
+                           0, &query, dsize, &put[1],
+                           type, prq.priority, 1 /* anonymity */, 
+                           expiration, 
+                           1 + prq.priority, MAX_DATASTORE_QUEUE,
+                           GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                           &put_migration_continuation, 
+                           NULL);
+    }
+  return GNUNET_OK;
 }
 
 
-/**
- * Method called whenever a peer disconnects.
- *
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- */
-static void
-peer_disconnect_handler (void *cls,
-                        const struct
-                        GNUNET_PeerIdentity * peer)
-{
-  struct ConnectedPeer *cp;
-
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &peer->hashPubKey);
-  GNUNET_PEER_change_rc (cp->pid, -1);
-  GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
-  GNUNET_free (cp);
-  GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
-                                             &peer->hashPubKey,
-                                             &destroy_request,
-                                             (void*) peer);
-}
+/* **************************** P2P GET Handling ************************ */
 
 
 /**
- * We're processing a GET request from
- * another peer and have decided to forward
- * it to other peers.
- *
- * @param cls our "struct ProcessGetContext *"
- * @param tc unused
+ * Closure for 'check_duplicate_request_{peer,client}'.
  */
-static void
-forward_get_request (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+struct CheckDuplicateRequestClosure
 {
-  struct ProcessGetContext *pgc = cls;
-  struct PendingRequest *pr;
-  struct PendingRequest *eer;
-  struct GNUNET_PeerIdentity target;
-
-  pr = GNUNET_malloc (sizeof (struct PendingRequest));
-  if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
-    {
-      pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
-      *pr->namespace = pgc->namespace;
-    }
-  pr->bf = pgc->bf;
-  pr->bf_size = pgc->bf_size;
-  pgc->bf = NULL;
-  pr->start_time = pgc->start_time;
-  pr->query = pgc->query;
-  pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
-  if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
-    pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
-  pr->anonymity_level = 1; /* default */
-  pr->priority = pgc->priority;
-  pr->remaining_priority = pr->priority;
-  pr->mingle = pgc->mingle;
-  pr->ttl = pgc->ttl; 
-  pr->type = pgc->type;
-  GNUNET_CONTAINER_multihashmap_put (requests_by_query,
-                                    &pr->query,
-                                    pr,
-                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
-                                    &pgc->reply_to.hashPubKey,
-                                    pr,
-                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_CONTAINER_heap_insert (requests_by_expiration,
-                               pr,
-                               pr->start_time.value + pr->ttl);
-  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
-    {
-      /* expire oldest request! */
-      eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
-      GNUNET_PEER_resolve (eer->source_pid,
-                          &target);    
-      GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
-                                           &target.hashPubKey,
-                                           eer);
-      destroy_pending_request (eer);     
-    }
-  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          get_processing_delay (),
-                                          &forward_request_task,
-                                          pr);
-  GNUNET_free (pgc); 
-}
-/**
- * Transmit the given message by copying it to
- * the target buffer "buf".  "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime.  In that case, only
+  /**
+   * The new request we should check if it already exists.
+   */
+  const struct PendingRequest *pr;
 
- * free the message
- *
- * @param cls closure, pointer to the message
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_message (void *cls,
-                 size_t size, void *buf)
-{
-  struct GNUNET_MessageHeader *msg = cls;
-  uint16_t msize;
-  
-  if (NULL == buf)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping reply, core too busy.\n");
-#endif
-      GNUNET_free (msg);
-      return 0;
-    }
-  msize = ntohs (msg->size);
-  GNUNET_assert (size >= msize);
-  memcpy (buf, msg, msize);
-  GNUNET_free (msg);
-  return msize;
-}
+  /**
+   * Existing request found by the checker, NULL if none.
+   */
+  struct PendingRequest *have;
+};
 
 
 /**
- * Test if the load on this peer is too high
- * to even consider processing the query at
- * all.
- * 
- * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same client already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest' 
+ *              that already exists)
+ * @return GNUNET_YES if we should continue to
+ *         iterate (no match yet)
+ *         GNUNET_NO if not (match found).
  */
 static int
-test_load_too_high ()
+check_duplicate_request_client (void *cls,
+                               const GNUNET_HashCode * key,
+                               void *value)
 {
-  return GNUNET_NO; // FIXME
+  struct CheckDuplicateRequestClosure *cdc = cls;
+  struct PendingRequest *have = value;
+
+  if (have->client_request_list == NULL)
+    return GNUNET_YES;
+  if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
+       (cdc->pr != have) )
+    {
+      cdc->have = have;
+      return GNUNET_NO;
+    }
+  return GNUNET_YES;
 }
 
 
@@ -2569,743 +2721,740 @@ test_load_too_high ()
  *        maybe 0 if no unique identifier is available
  */
 static void
-process_p2p_get_result (void *cls,
-                       const GNUNET_HashCode * key,
-                       uint32_t size,
-                       const void *data,
-                       uint32_t type,
-                       uint32_t priority,
-                       uint32_t anonymity,
-                       struct GNUNET_TIME_Absolute
-                       expiration, 
-                       uint64_t uid)
+process_local_reply (void *cls,
+                    const GNUNET_HashCode * key,
+                    uint32_t size,
+                    const void *data,
+                    enum GNUNET_BLOCK_Type type,
+                    uint32_t priority,
+                    uint32_t anonymity,
+                    struct GNUNET_TIME_Absolute
+                    expiration, 
+                    uint64_t uid)
 {
-  struct ProcessGetContext *pgc = cls;
+  struct PendingRequest *pr = cls;
+  struct ProcessReplyClosure prq;
+  struct CheckDuplicateRequestClosure cdrc;
+  const struct SBlock *sb;
   GNUNET_HashCode dhash;
   GNUNET_HashCode mhash;
-  struct PutMessage *reply;
+  GNUNET_HashCode query;
   
   if (NULL == key)
     {
-      /* no more results */
-      if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
-          ( (0 == pgc->results_found) ||
-            (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
-            (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
-            (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
-       {
-         GNUNET_SCHEDULER_add_continuation (sched,
-                                            &forward_get_request,
-                                            pgc,
-                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-       }
-      else
+#if DEBUG_FS > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Done processing local replies, forwarding request to other peers.\n");
+#endif
+      pr->qe = NULL;
+      if (pr->client_request_list != NULL)
        {
-         if (pgc->bf != NULL)
-           GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
-         GNUNET_free (pgc); 
+         GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
+                                     GNUNET_YES);
+         /* Figure out if this is a duplicate request and possibly
+            merge 'struct PendingRequest' entries */
+         cdrc.have = NULL;
+         cdrc.pr = pr;
+         GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                                     &pr->query,
+                                                     &check_duplicate_request_client,
+                                                     &cdrc);
+         if (cdrc.have != NULL)
+           {
+#if DEBUG_FS
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Received request for block `%s' twice from client, will only request once.\n",
+                         GNUNET_h2s (&pr->query));
+#endif
+             
+             destroy_pending_request (pr);
+             return;
+           }
        }
-      next_ds_request ();
+
+      /* no more results */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_now (sched,
+                                            &forward_request_task,
+                                            pr);      
       return;
     }
-  if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "New local response to `%s' of type %u.\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
     {
-      handle_on_demand_block (key, size, data, type, priority, 
-                             anonymity, expiration, uid,
-                             &process_p2p_get_result,
-                             pgc);
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Found ONDEMAND block, performing on-demand encoding\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# on-demand blocks matched requests"),
+                               1,
+                               GNUNET_NO);
+      if (GNUNET_OK != 
+         GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
+                                           anonymity, expiration, uid, 
+                                           &process_local_reply,
+                                           pr))
+      if (pr->qe != NULL)
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
   /* check for duplicates */
   GNUNET_CRYPTO_hash (data, size, &dhash);
   mingle_hash (&dhash, 
-              pgc->mingle,
+              pr->mingle,
               &mhash);
-  if ( (pgc->bf != NULL) &&
+  if ( (pr->bf != NULL) &&
        (GNUNET_YES ==
-       GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
+       GNUNET_CONTAINER_bloomfilter_test (pr->bf,
                                           &mhash)) )
     {      
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result from datastore filtered by bloomfilter.\n");
+                 "Result from datastore filtered by bloomfilter (duplicate).\n");
 #endif
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# results filtered by query bloomfilter"),
+                               1,
+                               GNUNET_NO);
+      if (pr->qe != NULL)
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
-  pgc->results_found++;
-  if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
-       (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
-       (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Found result for query `%s' in local datastore\n",
+             GNUNET_h2s (key));
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# results found locally"),
+                           1,
+                           GNUNET_NO);
+  pr->results_found++;
+  memset (&prq, 0, sizeof (prq));
+  prq.data = data;
+  prq.expiration = expiration;
+  prq.size = size;  
+  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+    { 
+      sb = (const struct SBlock*) data;
+      GNUNET_CRYPTO_hash (&sb->subspace,
+                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+                         &prq.namespace);
+    }
+  if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
+                                            data,
+                                            size,
+                                            &query))
     {
-      if (pgc->bf == NULL)
-       {
-         pgc->bf_size = 32;
-         pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
-                                                      pgc->bf_size
-                                                      BLOOMFILTER_K);
-       }
-      GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
-                                       &mhash);
+      GNUNET_break (0);
+      GNUNET_DATASTORE_remove (dsh,
+                              key,
+                              size, data,
+                              -1, -1
+                              GNUNET_TIME_UNIT_FOREVER_REL,
+                              NULL, NULL);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      return;
     }
-
-  reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
-  reply->header.size = htons (sizeof (struct PutMessage) + size);
-  reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-  reply->type = htonl (type);
-  reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
-  memcpy (&reply[1], data, size);
-  GNUNET_CORE_notify_transmit_ready (core,
-                                    pgc->priority,
-                                    ACCEPTABLE_REPLY_DELAY,
-                                    &pgc->reply_to,
-                                    sizeof (struct PutMessage) + size,
-                                    &transmit_message,
-                                    reply);
-  if ( (GNUNET_YES == test_load_too_high()) ||
-       (pgc->results_found > 5 + 2 * pgc->priority) )
+  prq.type = type;
+  prq.priority = priority;  
+  prq.finished = GNUNET_NO;
+  process_reply (&prq, key, pr);
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
+  if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
     {
       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
-      pgc->policy &= ~ ROUTING_POLICY_FORWARD;
       return;
     }
-  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-}
-  
-
-/**
- * We're processing a GET request from another peer.  Give it to our
- * local datastore.
- *
- * @param cls our "struct ProcessGetContext"
- * @param ok did we get a datastore slice or not?
- */
-static void
-ds_get_request (void *cls, 
-               int ok)
-{
-  struct ProcessGetContext *pgc = cls;
-  uint32_t type;
-  struct GNUNET_TIME_Relative timeout;
-
-  if (GNUNET_OK != ok)
+  if ( (pr->client_request_list == NULL) &&
+       ( (GNUNET_YES == test_load_too_high()) ||
+        (pr->results_found > 5 + 2 * pr->priority) ) )
     {
-      /* no point in doing P2P stuff if we can't even do local */
-      GNUNET_free (dsh);
+#if DEBUG_FS > 2
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Load too high, done with request\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# processing result set cut short due to load"),
+                               1,
+                               GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return;
     }
-  type = pgc->type;
-  if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
-    type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
-  timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
-                                          (pgc->priority + 1));
-  GNUNET_DATASTORE_get (dsh,
-                       &pgc->query,
-                       type,
-                       &process_p2p_get_result,
-                       pgc,
-                       timeout);
-}
-
-
-/**
- * The priority level imposes a bound on the maximum
- * value for the ttl that can be requested.
- *
- * @param ttl_in requested ttl
- * @param prio given priority
- * @return ttl_in if ttl_in is below the limit,
- *         otherwise the ttl-limit for the given priority
- */
-static int32_t
-bound_ttl (int32_t ttl_in, uint32_t prio)
-{
-  unsigned long long allowed;
-
-  if (ttl_in <= 0)
-    return ttl_in;
-  allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
-  if (ttl_in > allowed)      
-    {
-      if (allowed >= (1 << 30))
-        return 1 << 30;
-      return allowed;
-    }
-  return ttl_in;
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
 /**
- * We've received a request with the specified
- * priority.  Bound it according to how much
- * we trust the given peer.
+ * We've received a request with the specified priority.  Bound it
+ * according to how much we trust the given peer.
  * 
  * @param prio_in requested priority
- * @param peer the peer making the request
+ * @param cp the peer making the request
  * @return effective priority
  */
 static uint32_t
 bound_priority (uint32_t prio_in,
-               const struct GNUNET_PeerIdentity *peer)
+               struct ConnectedPeer *cp)
 {
   return 0; // FIXME!
 }
 
 
-/**
- * Handle P2P "GET" request.
- *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-handle_p2p_get (void *cls,
-               const struct GNUNET_PeerIdentity *other,
-               const struct GNUNET_MessageHeader *message)
-{
-  uint16_t msize;
-  const struct GetMessage *gm;
-  unsigned int bits;
-  const GNUNET_HashCode *opt;
-  struct ProcessGetContext *pgc;
-  uint32_t bm;
-  size_t bfsize;
-  uint32_t ttl_decrement;
-  double preference;
-  int net_load_up;
-  int net_load_down;
-
-  msize = ntohs(message->size);
-  if (msize < sizeof (struct GetMessage))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  gm = (const struct GetMessage*) message;
-  bm = ntohl (gm->hash_bitmap);
-  bits = 0;
-  while (bm > 0)
-    {
-      if (1 == (bm & 1))
-       bits++;
-      bm >>= 1;
-    }
-  if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }  
-  opt = (const GNUNET_HashCode*) &gm[1];
-  bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
-  pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
-  if (bfsize > 0)
-    {
-      pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
-                                                  bfsize,
-                                                  BLOOMFILTER_K);
-      pgc->bf_size = bfsize;
-    }
-  pgc->type = ntohl (gm->type);
-  pgc->bm = ntohl (gm->hash_bitmap);
-  pgc->mingle = gm->filter_mutator;
-  bits = 0;
-  if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
-    pgc->reply_to.hashPubKey = opt[bits++];
-  else
-    pgc->reply_to = *other;
-  if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
-    pgc->namespace = opt[bits++];
-  else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
-    {
-      GNUNET_break_op (0);
-      GNUNET_free (pgc);
-      return GNUNET_SYSERR;
-    }
-  if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
-    pgc->prime_target.hashPubKey = opt[bits++];
-  /* note that we can really only check load here since otherwise
-     peers could find out that we are overloaded by being disconnected
-     after sending us a malformed query... */
-  if (GNUNET_YES == test_load_too_high ())
-    {
-      if (NULL != pgc->bf)
-       GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
-      GNUNET_free (pgc);
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping query from `%s', this peer is too busy.\n",
-                 GNUNET_i2s (other));
-#endif
-      return GNUNET_OK;
-    }
-  net_load_up = 50; // FIXME
-  net_load_down = 50; // FIXME
-  pgc->policy = ROUTING_POLICY_NONE;
-  if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
-       (net_load_down < IDLE_LOAD_THRESHOLD) )
-    {
-      pgc->policy |= ROUTING_POLICY_ALL;
-      pgc->priority = 0; /* no charge */
-    }
-  else
-    {
-      pgc->priority = bound_priority (ntohl (gm->priority), other);
-      if ( (net_load_up < 
-           IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
-          (net_load_down < 
-           IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
-       {
-         pgc->policy |= ROUTING_POLICY_ALL;
-       }
-      else
-       {
-         // FIXME: is this sound?
-         if (net_load_up < 90 + 10 * pgc->priority)
-           pgc->policy |= ROUTING_POLICY_FORWARD;
-         if (net_load_down < 90 + 10 * pgc->priority)
-           pgc->policy |= ROUTING_POLICY_ANSWER;
-        }
-    }
-  if (pgc->policy == ROUTING_POLICY_NONE)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping query from `%s', network saturated.\n",
-                 GNUNET_i2s (other));
-#endif
-      if (NULL != pgc->bf)
-       GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
-      GNUNET_free (pgc);
-      return GNUNET_OK;     /* drop */
-    }
-  if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
-    pgc->priority = 0;  /* kill the priority (we cannot benefit) */
-  pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
-  /* decrement ttl (always) */
-  ttl_decrement = 2 * TTL_DECREMENT +
-    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                             TTL_DECREMENT);
-  if ( (pgc->ttl < 0) &&
-       (pgc->ttl - ttl_decrement > 0) )
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping query from `%s' due to TTL underflow.\n",
-                 GNUNET_i2s (other));
-#endif
-      /* integer underflow => drop (should be very rare)! */
-      if (NULL != pgc->bf)
-       GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
-      GNUNET_free (pgc);
-      return GNUNET_OK;
-    }
-  pgc->ttl -= ttl_decrement;
-  pgc->start_time = GNUNET_TIME_absolute_get ();
-  preference = (double) pgc->priority;
-  if (preference < QUERY_BANDWIDTH_VALUE)
-    preference = QUERY_BANDWIDTH_VALUE;
-  // FIXME: also reserve bandwidth for reply?
-  GNUNET_CORE_peer_configure (core,
-                             other,
-                             GNUNET_TIME_UNIT_FOREVER_REL,
-                             0, 0, preference, NULL, NULL);
-  if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
-    pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
-                                &ds_get_request,
-                                pgc);
-  else
-    GNUNET_SCHEDULER_add_continuation (sched,
-                                      &forward_get_request,
-                                      pgc,
-                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-  return GNUNET_OK;
-}
-
-
-/**
- * Function called to notify us that we can now transmit a reply to a
- * client or peer.  "buf" will be NULL and "size" zero if the socket was
- * closed for writing in the meantime.
- *
- * @param cls closure, points to a "struct PendingRequest*" with
- *            one or more pending replies
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_result (void *cls,
-                size_t size, 
-                void *buf)
-{
-  struct PendingRequest *pr = cls;
-  char *cbuf = buf;
-  struct PendingReply *reply;
-  size_t ret;
-
-  ret = 0;
-  while (NULL != (reply = pr->replies_pending))
-    {
-      if ( (reply->msize + ret < ret) ||
-          (reply->msize + ret > size) )
-       break;
-      pr->replies_pending = reply->next;
-      memcpy (&cbuf[ret], &reply[1], reply->msize);
-      ret += reply->msize;
-      GNUNET_free (reply);
-    }
-  return ret;
-}
-
-
-/**
- * Iterator over pending requests.
- *
- * @param cls response (struct ProcessReplyClosure)
- * @param key our query
- * @param value value in the hash map (meta-info about the query)
- * @return GNUNET_YES (we should continue to iterate)
- */
-static int
-process_reply (void *cls,
-              const GNUNET_HashCode * key,
-              void *value)
-{
-  struct ProcessReplyClosure *prq = cls;
-  struct PendingRequest *pr = value;
-  struct PendingRequest *eer;
-  struct PendingReply *reply;
-  struct PutMessage *pm;
-  struct ContentMessage *cm;
-  GNUNET_HashCode chash;
-  GNUNET_HashCode mhash;
-  struct GNUNET_PeerIdentity target;
-  size_t msize;
-  uint32_t prio;
-  struct GNUNET_TIME_Relative max_delay;
-  
-  GNUNET_CRYPTO_hash (prq->data,
-                     prq->size,
-                     &chash);
-  switch (prq->type)
-    {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
-      /* FIXME: does prq->namespace match our expectations? */
-      /* then: fall-through??? */
-    case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
-      if (pr->bf != NULL) 
-       {
-         mingle_hash (&chash, pr->mingle, &mhash);
-         if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
-                                                              &mhash))
-           return GNUNET_YES; /* duplicate */
-         GNUNET_CONTAINER_bloomfilter_add (pr->bf,
-                                           &mhash);
-       }
-      break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
-      // FIXME: any checks against duplicates for SKBlocks?
-      break;
-    }
-  prio = pr->priority;
-  prq->priority += pr->remaining_priority;
-  pr->remaining_priority = 0;
-  if (pr->client != NULL)
-    {
-      if (pr->replies_seen_size == pr->replies_seen_off)
-       GNUNET_array_grow (pr->replies_seen,
-                          pr->replies_seen_size,
-                          pr->replies_seen_size * 2 + 4);
-      pr->replies_seen[pr->replies_seen_off++] = chash;
-      // FIXME: possibly recalculate BF!
-    }
-  if (pr->client == NULL)
-    {
-      msize = sizeof (struct ContentMessage) + prq->size;
-      reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
-      reply->msize = msize;
-      cm = (struct ContentMessage*) &reply[1];
-      cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
-      cm->header.size = htons (msize);
-      cm->type = htonl (prq->type);
-      cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
-      reply->next = pr->replies_pending;
-      pr->replies_pending = reply;
-      memcpy (&reply[1], prq->data, prq->size);
-      if (pr->cth != NULL)
-       return GNUNET_YES;
-      max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
-      if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
-       {
-         /* estimate expiration time from time difference between
-            first request that will be discarded and this request */
-         eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
-         max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
-                                                          eer->start_time);
-       }
-      GNUNET_PEER_resolve (pr->source_pid,
-                          &target);
-      pr->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  prio,
-                                                  max_delay,
-                                                  &target,
-                                                  msize,
-                                                  &transmit_result,
-                                                  pr);
-      if (NULL == pr->cth)
-       {
-         // FIXME: now what? discard?
-       }
-    }
-  else
+/**
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same peer already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest' 
+ *              that already exists)
+ * @return GNUNET_YES if we should continue to
+ *         iterate (no match yet)
+ *         GNUNET_NO if not (match found).
+ */
+static int
+check_duplicate_request_peer (void *cls,
+                             const GNUNET_HashCode * key,
+                             void *value)
+{
+  struct CheckDuplicateRequestClosure *cdc = cls;
+  struct PendingRequest *have = value;
+
+  if (cdc->pr->target_pid == have->target_pid)
     {
-      msize = sizeof (struct PutMessage) + prq->size;
-      reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
-      reply->msize = msize;
-      reply->next = pr->replies_pending;
-      pm = (struct PutMessage*) &reply[1];
-      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-      pm->header.size = htons (msize);
-      pm->type = htonl (prq->type);
-      pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
-      pr->replies_pending = reply;
-      memcpy (&reply[1], prq->data, prq->size);
-      if (pr->th != NULL)
-       return GNUNET_YES;
-      pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
-                                                   msize,
-                                                   GNUNET_TIME_UNIT_FOREVER_REL,
-                                                   &transmit_result,
-                                                   pr);
-      if (pr->th == NULL)
-       {
-         // FIXME: need to try again later (not much
-         // to do here specifically, but we need to
-         // check somewhere else to handle this case!)
-       }
+      cdc->have = have;
+      return GNUNET_NO;
     }
-  // FIXME: implement hot-path routing statistics keeping!
   return GNUNET_YES;
 }
 
 
 /**
- * Check if the given KBlock is well-formed.
+ * Handle P2P "GET" request.
  *
- * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
- * @param query where to store the query that this block answers
- * @return GNUNET_OK if this is actually a well-formed KBlock
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ *        for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other' 
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
  */
 static int
-check_kblock (const struct KBlock *kb,
-             size_t dsize,
-             GNUNET_HashCode *query)
+handle_p2p_get (void *cls,
+               const struct GNUNET_PeerIdentity *other,
+               const struct GNUNET_MessageHeader *message,
+               struct GNUNET_TIME_Relative latency,
+               uint32_t distance)
 {
-  if (dsize < sizeof (struct KBlock))
+  struct PendingRequest *pr;
+  struct ConnectedPeer *cp;
+  struct ConnectedPeer *cps;
+  struct CheckDuplicateRequestClosure cdc;
+  struct GNUNET_TIME_Relative timeout;
+  uint16_t msize;
+  const struct GetMessage *gm;
+  unsigned int bits;
+  const GNUNET_HashCode *opt;
+  uint32_t bm;
+  size_t bfsize;
+  uint32_t ttl_decrement;
+  enum GNUNET_BLOCK_Type type;
+  double preference;
+  int have_ns;
+
+  msize = ntohs(message->size);
+  if (msize < sizeof (struct GetMessage))
     {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
-  if (dsize - sizeof (struct KBlock) !=
-      ntohs (kb->purpose.size) 
-      - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
-      - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
+  gm = (const struct GetMessage*) message;
+  type = ntohl (gm->type);
+  switch (type)
     {
+    case GNUNET_BLOCK_TYPE_ANY:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+      break;
+    default:
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
-                               &kb->purpose,
-                               &kb->signature,
-                               &kb->keyspace)) 
+  bm = ntohl (gm->hash_bitmap);
+  bits = 0;
+  while (bm > 0)
+    {
+      if (1 == (bm & 1))
+       bits++;
+      bm >>= 1;
+    }
+  if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }  
+  opt = (const GNUNET_HashCode*) &gm[1];
+  bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
+  bm = ntohl (gm->hash_bitmap);
+  if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
+       (type != GNUNET_BLOCK_TYPE_SBLOCK) )
     {
       GNUNET_break_op (0);
+      return GNUNET_SYSERR;      
+    }
+  bits = 0;
+  cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                          &other->hashPubKey);
+  if (NULL == cps)
+    {
+      /* peer must have just disconnected */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to initiator not being connected"),
+                               1,
+                               GNUNET_NO);
       return GNUNET_SYSERR;
     }
-  if (query != NULL)
-    GNUNET_CRYPTO_hash (&kb->keyspace,
-                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                       query);
-  return GNUNET_OK;
-}
+  if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+    cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                           &opt[bits++]);
+  else
+    cp = cps;
+  if (cp == NULL)
+    {
+#if DEBUG_FS
+      if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
+                   GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
+      
+      else
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
+                   GNUNET_i2s (other));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to missing reverse route"),
+                               1,
+                               GNUNET_NO);
+     /* FIXME: try connect? */
+      return GNUNET_OK;
+    }
+  /* note that we can really only check load here since otherwise
+     peers could find out that we are overloaded by not being
+     disconnected after sending us a malformed query... */
+  if (GNUNET_YES == test_load_too_high ())
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Dropping query from `%s', this peer is too busy.\n",
+                 GNUNET_i2s (other));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to high load"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_OK;
+    }
 
+#if DEBUG_FS 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
+             GNUNET_h2s (&gm->query),
+             (unsigned int) type,
+             GNUNET_i2s (other),
+             (unsigned int) bm);
+#endif
+  have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
+  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
+                     (have_ns ? sizeof(GNUNET_HashCode) : 0));
+  if (have_ns)
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
+  pr->type = type;
+  pr->mingle = ntohl (gm->filter_mutator);
+  if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
+    pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
+
+  pr->anonymity_level = 1;
+  pr->priority = bound_priority (ntohl (gm->priority), cps);
+  pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
+  pr->query = gm->query;
+  /* decrement ttl (always) */
+  ttl_decrement = 2 * TTL_DECREMENT +
+    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                             TTL_DECREMENT);
+  if ( (pr->ttl < 0) &&
+       (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
+                 GNUNET_i2s (other),
+                 pr->ttl,
+                 ttl_decrement);
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due TTL underflow"),
+                               1,
+                               GNUNET_NO);
+      /* integer underflow => drop (should be very rare)! */      
+      GNUNET_free (pr);
+      return GNUNET_OK;
+    } 
+  pr->ttl -= ttl_decrement;
+  pr->start_time = GNUNET_TIME_absolute_get ();
 
-/**
- * Check if the given SBlock is well-formed.
- *
- * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
- * @param query where to store the query that this block answers
- * @param namespace where to store the namespace that this block belongs to
- * @return GNUNET_OK if this is actually a well-formed SBlock
- */
-static int
-check_sblock (const struct SBlock *sb,
-             size_t dsize,
-             GNUNET_HashCode *query,   
-             GNUNET_HashCode *namespace)
-{
-  if (dsize < sizeof (struct SBlock))
+  /* get bloom filter */
+  if (bfsize > 0)
     {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
+      pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
+                                                 bfsize,
+                                                 BLOOMFILTER_K);
+      pr->bf_size = bfsize;
     }
-  if (dsize !=
-      ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
+
+  cdc.have = NULL;
+  cdc.pr = pr;
+  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                             &gm->query,
+                                             &check_duplicate_request_peer,
+                                             &cdc);
+  if (cdc.have != NULL)
     {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
+      if (cdc.have->start_time.value + cdc.have->ttl >=
+         pr->start_time.value + pr->ttl)
+       {
+         /* existing request has higher TTL, drop new one! */
+         cdc.have->priority += pr->priority;
+         destroy_pending_request (pr);
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Have existing request with higher TTL, dropping new request.\n",
+                     GNUNET_i2s (other));
+#endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped due to higher-TTL request"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_OK;
+       }
+      else
+       {
+         /* existing request has lower TTL, drop old one! */
+         pr->priority += cdc.have->priority;
+         /* Possible optimization: if we have applicable pending
+            replies in 'cdc.have', we might want to move those over
+            (this is a really rare special-case, so it is not clear
+            that this would be worth it) */
+         destroy_pending_request (cdc.have);
+         /* keep processing 'pr'! */
+       }
     }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
-                               &sb->purpose,
-                               &sb->signature,
-                               &sb->subspace)) 
+
+  pr->cp = cp;
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+                                                  &gm->query,
+                                                  pr,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (peer_request_map,
+                                                  &other->hashPubKey,
+                                                  pr,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  
+  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+                                           pr,
+                                           pr->start_time.value + pr->ttl);
+
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# P2P searches received"),
+                           1,
+                           GNUNET_NO);
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# P2P searches active"),
+                           1,
+                           GNUNET_NO);
+
+  /* calculate change in traffic preference */
+  preference = (double) pr->priority;
+  if (preference < QUERY_BANDWIDTH_VALUE)
+    preference = QUERY_BANDWIDTH_VALUE;
+  cps->inc_preference += preference;
+
+  /* process locally */
+  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+    type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
+  timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
+                                          (pr->priority + 1)); 
+  pr->qe = GNUNET_DATASTORE_get (dsh,
+                                &gm->query,
+                                type,                         
+                                pr->priority + 1,
+                                MAX_DATASTORE_QUEUE,                            
+                                timeout,
+                                &process_local_reply,
+                                pr);
+
+  /* Are multiple results possible?  If so, start processing remotely now! */
+  switch (pr->type)
+    {
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+      /* only one result, wait for datastore */
+      break;
+    default:
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_now (sched,
+                                            &forward_request_task,
+                                            pr);
+    }
+
+  /* make sure we don't track too many requests */
+  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
     {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
+      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+      destroy_pending_request (pr);
     }
-  if (query != NULL)
-    *query = sb->identifier;
-  if (namespace != NULL)
-    GNUNET_CRYPTO_hash (&sb->subspace,
-                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                       namespace);
   return GNUNET_OK;
 }
 
 
+/* **************************** CS GET Handling ************************ */
+
+
 /**
- * Handle P2P "PUT" request.
+ * Handle START_SEARCH-message (search request from client).
  *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
+ * @param cls closure
+ * @param client identification of the client
  * @param message the actual message
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
  */
-static int
-handle_p2p_put (void *cls,
-               const struct GNUNET_PeerIdentity *other,
-               const struct GNUNET_MessageHeader *message)
+static void
+handle_start_search (void *cls,
+                    struct GNUNET_SERVER_Client *client,
+                    const struct GNUNET_MessageHeader *message)
 {
-  const struct PutMessage *put;
+  static GNUNET_HashCode all_zeros;
+  const struct SearchMessage *sm;
+  struct ClientList *cl;
+  struct ClientRequestList *crl;
+  struct PendingRequest *pr;
   uint16_t msize;
-  size_t dsize;
-  uint32_t type;
-  struct GNUNET_TIME_Absolute expiration;
-  GNUNET_HashCode query;
-  struct ProcessReplyClosure prq;
+  unsigned int sc;
+  enum GNUNET_BLOCK_Type type;
 
   msize = ntohs (message->size);
-  if (msize < sizeof (struct PutMessage))
+  if ( (msize < sizeof (struct SearchMessage)) ||
+       (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
     {
-      GNUNET_break_op(0);
-      return GNUNET_SYSERR;
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client,
+                                 GNUNET_SYSERR);
+      return;
     }
-  put = (const struct PutMessage*) message;
-  dsize = msize - sizeof (struct PutMessage);
-  type = ntohl (put->type);
-  expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
-
-  /* first, validate! */
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# client searches received"),
+                           1,
+                           GNUNET_NO);
+  sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
+  sm = (const struct SearchMessage*) message;
+  type = ntohl (sm->type);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s' of type %u from local client\n",
+             GNUNET_h2s (&sm->query),
+             (unsigned int) type);
+#endif
   switch (type)
     {
-    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
-    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
-      GNUNET_CRYPTO_hash (&put[1], dsize, &query);
+    case GNUNET_BLOCK_TYPE_ANY:
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_KBLOCK:
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_NBLOCK:
       break;
-    case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
-      if (GNUNET_OK !=
-         check_kblock ((const struct KBlock*) &put[1],
-                       dsize,
-                       &query))
-       return GNUNET_SYSERR;
+    default:
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client,
+                                 GNUNET_SYSERR);
+      return;
+    }  
+
+  cl = client_list;
+  while ( (cl != NULL) &&
+         (cl->client != client) )
+    cl = cl->next;
+  if (cl == NULL)
+    {
+      cl = GNUNET_malloc (sizeof (struct ClientList));
+      cl->client = client;
+      GNUNET_SERVER_client_keep (client);
+      cl->next = client_list;
+      client_list = cl;
+    }
+  /* detect duplicate KBLOCK requests */
+  if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_ANY) )
+    {
+      crl = cl->rl_head;
+      while ( (crl != NULL) &&
+             ( (0 != memcmp (&crl->req->query,
+                             &sm->query,
+                             sizeof (GNUNET_HashCode))) ||
+               (crl->req->type != type) ) )
+       crl = crl->next;
+      if (crl != NULL)         
+       { 
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Have existing request, merging content-seen lists.\n");
+#endif
+         pr = crl->req;
+         /* Duplicate request (used to send long list of
+            known/blocked results); merge 'pr->replies_seen'
+            and update bloom filter */
+         GNUNET_array_grow (pr->replies_seen,
+                            pr->replies_seen_size,
+                            pr->replies_seen_off + sc);
+         memcpy (&pr->replies_seen[pr->replies_seen_off],
+                 &sm[1],
+                 sc * sizeof (GNUNET_HashCode));
+         pr->replies_seen_off += sc;
+         refresh_bloomfilter (pr);
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# client searches updated (merged content seen list)"),
+                                   1,
+                                   GNUNET_NO);
+         GNUNET_SERVER_receive_done (client,
+                                     GNUNET_OK);
+         return;
+       }
+    }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# client searches active"),
+                           1,
+                           GNUNET_NO);
+  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
+                     ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
+  crl = GNUNET_malloc (sizeof (struct ClientRequestList));
+  memset (crl, 0, sizeof (struct ClientRequestList));
+  crl->client_list = cl;
+  GNUNET_CONTAINER_DLL_insert (cl->rl_head,
+                              cl->rl_tail,
+                              crl);  
+  crl->req = pr;
+  pr->type = type;
+  pr->client_request_list = crl;
+  GNUNET_array_grow (pr->replies_seen,
+                    pr->replies_seen_size,
+                    sc);
+  memcpy (pr->replies_seen,
+         &sm[1],
+         sc * sizeof (GNUNET_HashCode));
+  pr->replies_seen_off = sc;
+  pr->anonymity_level = ntohl (sm->anonymity_level); 
+  refresh_bloomfilter (pr);
+  pr->query = sm->query;
+  if (0 == (1 & ntohl (sm->options)))
+    pr->local_only = GNUNET_NO;
+  else
+    pr->local_only = GNUNET_YES;
+  switch (type)
+    {
+    case GNUNET_BLOCK_TYPE_DBLOCK:
+    case GNUNET_BLOCK_TYPE_IBLOCK:
+      if (0 != memcmp (&sm->target,
+                      &all_zeros,
+                      sizeof (GNUNET_HashCode)))
+       pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
       break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
-      if (GNUNET_OK !=
-         check_sblock ((const struct SBlock*) &put[1],
-                       dsize,
-                       &query,
-                       &prq.namespace))
-       return GNUNET_SYSERR;
+    case GNUNET_BLOCK_TYPE_SBLOCK:
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
       break;
-    case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
-      // FIXME -- validate SKBLOCK!
-      GNUNET_break (0);
-      return GNUNET_OK;
     default:
-      /* unknown block type */
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
+      break;
     }
-
-  /* now, lookup 'query' */
-  prq.data = (const void*) &put[1];
-  prq.size = dsize;
-  prq.type = type;
-  prq.expiration = expiration;
-  prq.priority = 0;
-  GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
-                                             &query,
-                                             &process_reply,
-                                             &prq);
-  // FIXME: if migration is on and load is low,
-  // queue to store data in datastore;
-  // use "prq.priority" for that!
-  return GNUNET_OK;
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+                                                  &sm->query,
+                                                  pr,
+                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+    type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
+  pr->qe = GNUNET_DATASTORE_get (dsh,
+                                &sm->query,
+                                type,
+                                -3, -1,
+                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
+                                &process_local_reply,
+                                pr);
 }
 
 
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
+/* **************************** Startup ************************ */
 
 /**
  * Process fs requests.
  *
- * @param cls closure
  * @param s scheduler to use
  * @param server the initialized server
  * @param c configuration to use
  */
-static void
-run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *s,
-     struct GNUNET_SERVER_Handle *server,
-     const struct GNUNET_CONFIGURATION_Handle *c)
+static int
+main_init (struct GNUNET_SCHEDULER_Handle *s,
+          struct GNUNET_SERVER_Handle *server,
+          const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+
   sched = s;
   cfg = c;
-
-  ifm = GNUNET_CONTAINER_multihashmap_create (128);
-  requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
-  requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
-  read_index_list ();
-  dsh = GNUNET_DATASTORE_connect (cfg,
-                                 sched);
+  stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+  min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
+  connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
   core = GNUNET_CORE_connect (sched,
                              cfg,
                              GNUNET_TIME_UNIT_FOREVER_REL,
@@ -3313,11 +3462,36 @@ run (void *cls,
                              NULL,
                              &peer_connect_handler,
                              &peer_disconnect_handler,
-                             NULL, 
                              NULL, GNUNET_NO,
                              NULL, GNUNET_NO,
                              p2p_handlers);
-
+  if (NULL == core)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 _("Failed to connect to `%s' service.\n"),
+                 "core");
+      GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
+      connected_peers = NULL;
+      GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
+      query_request_map = NULL;
+      GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
+      requests_by_expiration_heap = NULL;
+      GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
+      peer_request_map = NULL;
+      if (dsh != NULL)
+       {
+         GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
+         dsh = NULL;
+       }
+      return GNUNET_SYSERR;
+    }
+  /* FIXME: distinguish between sending and storing in options? */
+  if (active_migration) 
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Content migration is enabled, will start to gather data\n"));
+      consider_migration_gathering ();
+    }
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);
@@ -3326,21 +3500,41 @@ run (void *cls,
                                GNUNET_TIME_UNIT_FOREVER_REL,
                                &shutdown_task,
                                NULL);
-  if (NULL == dsh)
+  return GNUNET_OK;
+}
+
+
+/**
+ * Process fs requests.
+ *
+ * @param cls closure
+ * @param sched scheduler to use
+ * @param server the initialized server
+ * @param cfg configuration to use
+ */
+static void
+run (void *cls,
+     struct GNUNET_SCHEDULER_Handle *sched,
+     struct GNUNET_SERVER_Handle *server,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                          "FS",
+                                                          "ACTIVEMIGRATION");
+  dsh = GNUNET_DATASTORE_connect (cfg,
+                                 sched);
+  if (dsh == NULL)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("Failed to connect to `%s' service.\n"),
-                 "datastore");
       GNUNET_SCHEDULER_shutdown (sched);
       return;
     }
-  if (NULL == core)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("Failed to connect to `%s' service.\n"),
-                 "core");
+  if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
+       (GNUNET_OK != main_init (sched, server, cfg)) )
+    {    
       GNUNET_SCHEDULER_shutdown (sched);
-      return;
+      GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
+      dsh = NULL;
+      return;   
     }
 }
 
@@ -3358,7 +3552,9 @@ main (int argc, char *const *argv)
   return (GNUNET_OK ==
           GNUNET_SERVICE_run (argc,
                               argv,
-                              "fs", &run, NULL)) ? 0 : 1;
+                              "fs",
+                             GNUNET_SERVICE_OPTION_NONE,
+                             &run, NULL)) ? 0 : 1;
 }
 
 /* end of gnunet-service-fs.c */