-help seaspider some more
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 1eb491877ef81c11c8174ca6f88f74aaf9eead85..06ac91c73d5f1a21f9b79d59f6454801106b0df7 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
  * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
- * TODO:
- * - trust not properly received and pushed back to peerinfo!
- * - bound_priority by priorities used by other peers
- * - have a way to drop queries based on load
- * - introduce random latency in processing
- * - consider more precise latency estimation (per-peer & request)
- * - better algorithm for priority selection for requests we initiate?
- * - tell other peers to stop migration if our PUTs fail (or if
- *   we don't support migration per configuration?)
- * - more statistics
+ * To use:
+ * - consider re-issue GSF_dht_lookup_ after non-DHT reply received
  */
 #include "platform.h"
 #include <float.h>
 #include "gnunet_constants.h"
 #include "gnunet_core_service.h"
  */
 #include "platform.h"
 #include <float.h>
 #include "gnunet_constants.h"
 #include "gnunet_core_service.h"
+#include "gnunet_dht_service.h"
 #include "gnunet_datastore_service.h"
 #include "gnunet_datastore_service.h"
+#include "gnunet_load_lib.h"
 #include "gnunet_peer_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_peer_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_transport_service.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_util_lib.h"
+#include "gnunet-service-fs_cp.h"
 #include "gnunet-service-fs_indexing.h"
 #include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_lc.h"
+#include "gnunet-service-fs_pe.h"
+#include "gnunet-service-fs_pr.h"
+#include "gnunet-service-fs_push.h"
+#include "gnunet-service-fs_put.h"
 #include "fs.h"
 
 #include "fs.h"
 
-#define DEBUG_FS GNUNET_NO
-
-/**
- * Maximum number of outgoing messages we queue per peer.
- */
-#define MAX_QUEUE_PER_PEER 16
-
 /**
 /**
- * Inverse of the probability that we will submit the same query
- * to the same peer again.  If the same peer already got the query
- * repeatedly recently, the probability is multiplied by the inverse
- * of this number each time.  Note that we only try about every TTL_DECREMENT/2
- * plus MAX_CORK_DELAY (so roughly every 3.5s).
+ * Size for the hash map for DHT requests from the FS
+ * service.  Should be about the number of concurrent
+ * DHT requests we plan to make.
  */
  */
-#define RETRY_PROBABILITY_INV 3
-
-/**
- * What is the maximum delay for a P2P FS message (in our interaction
- * with core)?  FS-internal delays are another story.  The value is
- * chosen based on the 32k block size.  Given that peers typcially
- * have at least 1 kb/s bandwidth, 45s waits give us a chance to
- * transmit one message even to the lowest-bandwidth peers.
- */
-#define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
-
+#define FS_DHT_HT_SIZE 1024
 
 
 /**
 
 
 /**
- * Maximum number of requests (from other peers) that we're
- * willing to have pending at any given point in time.
- * FIXME: set from configuration.
+ * How quickly do we age cover traffic?  At the given
+ * time interval, remaining cover traffic counters are
+ * decremented by 1/16th.
  */
  */
-static uint64_t max_pending_requests = (32 * 1024);
+#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 
 
 
-/**
- * Information we keep for each pending reply.  The
- * actual message follows at the end of this struct.
- */
-struct PendingMessage;
+/* ****************************** globals ****************************** */
 
 /**
  * Our connection to the datastore.
  */
 
 /**
  * Our connection to the datastore.
  */
-static struct GNUNET_DATASTORE_Handle *dsh;
-
-
-/**
- * Function called upon completion of a transmission.
- *
- * @param cls closure
- * @param pid ID of receiving peer, 0 on transmission error
- */
-typedef void (*TransmissionContinuation)(void * cls, 
-                                        GNUNET_PEER_Id tpid);
-
-
-/**
- * Information we keep for each pending message (GET/PUT).  The
- * actual message follows at the end of this struct.
- */
-struct PendingMessage
-{
-  /**
-   * This is a doubly-linked list of messages to the same peer.
-   */
-  struct PendingMessage *next;
-
-  /**
-   * This is a doubly-linked list of messages to the same peer.
-   */
-  struct PendingMessage *prev;
-
-  /**
-   * Entry in pending message list for this pending message.
-   */ 
-  struct PendingMessageList *pml;  
-
-  /**
-   * Function to call immediately once we have transmitted this
-   * message.
-   */
-  TransmissionContinuation cont;
-
-  /**
-   * Closure for cont.
-   */
-  void *cont_cls;
-
-  /**
-   * Size of the reply; actual reply message follows
-   * at the end of this struct.
-   */
-  size_t msize;
-  
-  /**
-   * How important is this message for us?
-   */
-  uint32_t priority;
-};
-
-
-/**
- * Information about a peer that we are connected to.
- * We track data that is useful for determining which
- * peers should receive our requests.  We also keep
- * a list of messages to transmit to this peer.
- */
-struct ConnectedPeer
-{
-
-  /**
-   * List of the last clients for which this peer successfully
-   * answered a query.
-   */
-  struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
-
-  /**
-   * List of the last PIDs for which
-   * this peer successfully answered a query;
-   * We use 0 to indicate no successful reply.
-   */
-  GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
-
-  /**
-   * Average delay between sending the peer a request and
-   * getting a reply (only calculated over the requests for
-   * which we actually got a reply).   Calculated
-   * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
-   */ 
-  struct GNUNET_TIME_Relative avg_delay;
-
-  /**
-   * Handle for an active request for transmission to this
-   * peer, or NULL.
-   */
-  struct GNUNET_CORE_TransmitHandle *cth;
-
-  /**
-   * Messages (replies, queries, content migration) we would like to
-   * send to this peer in the near future.  Sorted by priority, head.
-   */
-  struct PendingMessage *pending_messages_head;
-
-  /**
-   * Messages (replies, queries, content migration) we would like to
-   * send to this peer in the near future.  Sorted by priority, tail.
-   */
-  struct PendingMessage *pending_messages_tail;
-
-  /**
-   * Average priority of successful replies.  Calculated
-   * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
-   */
-  double avg_priority;
-
-  /**
-   * Increase in traffic preference still to be submitted
-   * to the core service for this peer.
-   */
-  uint64_t inc_preference;
-
-  /**
-   * Trust delta to still commit to the system.
-   */
-  uint32_t trust_delta;
-
-  /**
-   * The peer's identity.
-   */
-  GNUNET_PEER_Id pid;  
-
-  /**
-   * Size of the linked list of 'pending_messages'.
-   */
-  unsigned int pending_requests;
-
-  /**
-   * Which offset in "last_p2p_replies" will be updated next?
-   * (we go round-robin).
-   */
-  unsigned int last_p2p_replies_woff;
-
-  /**
-   * Which offset in "last_client_replies" will be updated next?
-   * (we go round-robin).
-   */
-  unsigned int last_client_replies_woff;
-
-};
-
-
-/**
- * Information we keep for each pending request.  We should try to
- * keep this struct as small as possible since its memory consumption
- * is key to how many requests we can have pending at once.
- */
-struct PendingRequest;
-
-
-/**
- * Doubly-linked list of requests we are performing
- * on behalf of the same client.
- */
-struct ClientRequestList
-{
-
-  /**
-   * This is a doubly-linked list.
-   */
-  struct ClientRequestList *next;
-
-  /**
-   * This is a doubly-linked list.
-   */
-  struct ClientRequestList *prev;
-
-  /**
-   * Request this entry represents.
-   */
-  struct PendingRequest *req;
-
-  /**
-   * Client list this request belongs to.
-   */
-  struct ClientList *client_list;
-
-};
-
-
-/**
- * Replies to be transmitted to the client.  The actual
- * response message is allocated after this struct.
- */
-struct ClientResponseMessage
-{
-  /**
-   * This is a doubly-linked list.
-   */
-  struct ClientResponseMessage *next;
-
-  /**
-   * This is a doubly-linked list.
-   */
-  struct ClientResponseMessage *prev;
-
-  /**
-   * Client list entry this response belongs to.
-   */
-  struct ClientList *client_list;
-
-  /**
-   * Number of bytes in the response.
-   */
-  size_t msize;
-};
-
-
-/**
- * Linked list of clients we are performing requests
- * for right now.
- */
-struct ClientList
-{
-  /**
-   * This is a linked list.
-   */
-  struct ClientList *next;
-
-  /**
-   * ID of a client making a request, NULL if this entry is for a
-   * peer.
-   */
-  struct GNUNET_SERVER_Client *client;
-
-  /**
-   * Head of list of requests performed on behalf
-   * of this client right now.
-   */
-  struct ClientRequestList *rl_head;
-
-  /**
-   * Tail of list of requests performed on behalf
-   * of this client right now.
-   */
-  struct ClientRequestList *rl_tail;
-
-  /**
-   * Head of linked list of responses.
-   */
-  struct ClientResponseMessage *res_head;
-
-  /**
-   * Tail of linked list of responses.
-   */
-  struct ClientResponseMessage *res_tail;
-
-  /**
-   * Context for sending replies.
-   */
-  struct GNUNET_CONNECTION_TransmitHandle *th;
-
-};
-
-
-/**
- * Doubly-linked list of messages we are performing
- * due to a pending request.
- */
-struct PendingMessageList
-{
-
-  /**
-   * This is a doubly-linked list of messages on behalf of the same request.
-   */
-  struct PendingMessageList *next;
-
-  /**
-   * This is a doubly-linked list of messages on behalf of the same request.
-   */
-  struct PendingMessageList *prev;
-
-  /**
-   * Message this entry represents.
-   */
-  struct PendingMessage *pm;
-
-  /**
-   * Request this entry belongs to.
-   */
-  struct PendingRequest *req;
-
-  /**
-   * Peer this message is targeted for.
-   */
-  struct ConnectedPeer *target;
-
-};
-
-
-/**
- * Information we keep for each pending request.  We should try to
- * keep this struct as small as possible since its memory consumption
- * is key to how many requests we can have pending at once.
- */
-struct PendingRequest
-{
-
-  /**
-   * If this request was made by a client, this is our entry in the
-   * client request list; otherwise NULL.
-   */
-  struct ClientRequestList *client_request_list;
-
-  /**
-   * Entry of peer responsible for this entry (if this request
-   * was made by a peer).
-   */
-  struct ConnectedPeer *cp;
-
-  /**
-   * If this is a namespace query, pointer to the hash of the public
-   * key of the namespace; otherwise NULL.  Pointer will be to the 
-   * end of this struct (so no need to free it).
-   */
-  const GNUNET_HashCode *namespace;
-
-  /**
-   * Bloomfilter we use to filter out replies that we don't care about
-   * (anymore).  NULL as long as we are interested in all replies.
-   */
-  struct GNUNET_CONTAINER_BloomFilter *bf;
-
-  /**
-   * Context of our GNUNET_CORE_peer_change_preference call.
-   */
-  struct GNUNET_CORE_InformationRequestContext *irc;
-
-  /**
-   * Hash code of all replies that we have seen so far (only valid
-   * if client is not NULL since we only track replies like this for
-   * our own clients).
-   */
-  GNUNET_HashCode *replies_seen;
-
-  /**
-   * Node in the heap representing this entry; NULL
-   * if we have no heap node.
-   */
-  struct GNUNET_CONTAINER_HeapNode *hnode;
-
-  /**
-   * Head of list of messages being performed on behalf of this
-   * request.
-   */
-  struct PendingMessageList *pending_head;
-
-  /**
-   * Tail of list of messages being performed on behalf of this
-   * request.
-   */
-  struct PendingMessageList *pending_tail;
-
-  /**
-   * When did we first see this request (form this peer), or, if our
-   * client is initiating, when did we last initiate a search?
-   */
-  struct GNUNET_TIME_Absolute start_time;
-
-  /**
-   * The query that this request is for.
-   */
-  GNUNET_HashCode query;
-
-  /**
-   * The task responsible for transmitting queries
-   * for this request.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier task;
-
-  /**
-   * (Interned) Peer identifier that identifies a preferred target
-   * for requests.
-   */
-  GNUNET_PEER_Id target_pid;
-
-  /**
-   * (Interned) Peer identifiers of peers that have already
-   * received our query for this content.
-   */
-  GNUNET_PEER_Id *used_pids;
-  
-  /**
-   * Our entry in the queue (non-NULL while we wait for our
-   * turn to interact with the local database).
-   */
-  struct GNUNET_DATASTORE_QueueEntry *qe;
-
-  /**
-   * Size of the 'bf' (in bytes).
-   */
-  size_t bf_size;
-
-  /**
-   * Desired anonymity level; only valid for requests from a local client.
-   */
-  uint32_t anonymity_level;
-
-  /**
-   * How many entries in "used_pids" are actually valid?
-   */
-  unsigned int used_pids_off;
-
-  /**
-   * How long is the "used_pids" array?
-   */
-  unsigned int used_pids_size;
-
-  /**
-   * Number of results found for this request.
-   */
-  unsigned int results_found;
-
-  /**
-   * How many entries in "replies_seen" are actually valid?
-   */
-  unsigned int replies_seen_off;
-
-  /**
-   * How long is the "replies_seen" array?
-   */
-  unsigned int replies_seen_size;
-  
-  /**
-   * Priority with which this request was made.  If one of our clients
-   * made the request, then this is the current priority that we are
-   * using when initiating the request.  This value is used when
-   * we decide to reward other peers with trust for providing a reply.
-   */
-  uint32_t priority;
-
-  /**
-   * Priority points left for us to spend when forwarding this request
-   * to other peers.
-   */
-  uint32_t remaining_priority;
-
-  /**
-   * Number to mingle hashes for bloom-filter tests with.
-   */
-  int32_t mingle;
-
-  /**
-   * TTL with which we saw this request (or, if we initiated, TTL that
-   * we used for the request).
-   */
-  int32_t ttl;
-  
-  /**
-   * Type of the content that this request is for.
-   */
-  enum GNUNET_BLOCK_Type type;
-
-  /**
-   * Remove this request after transmission of the current response.
-   */
-  int16_t do_remove;
-
-  /**
-   * GNUNET_YES if we should not forward this request to other peers.
-   */
-  int16_t local_only;
-
-};
-
+struct GNUNET_DATASTORE_Handle *GSF_dsh;
 
 /**
 
 /**
- * Block that is ready for migration to other peers.  Actual data is at the end of the block.
+ * Our configuration.
  */
  */
-struct MigrationReadyBlock
-{
-
-  /**
-   * This is a doubly-linked list.
-   */
-  struct MigrationReadyBlock *next;
-
-  /**
-   * This is a doubly-linked list.
-   */
-  struct MigrationReadyBlock *prev;
-
-  /**
-   * Query for the block.
-   */
-  GNUNET_HashCode query;
-
-  /**
-   * When does this block expire? 
-   */
-  struct GNUNET_TIME_Absolute expiration;
-
-  /**
-   * Peers we would consider forwarding this
-   * block to.  Zero for empty entries.
-   */
-  GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
-
-  /**
-   * Size of the block.
-   */
-  size_t size;
-
-  /**
-   *  Number of targets already used.
-   */
-  unsigned int used_targets;
-
-  /**
-   * Type of the block.
-   */
-  enum GNUNET_BLOCK_Type type;
-};
-
+const struct GNUNET_CONFIGURATION_Handle *GSF_cfg;
 
 /**
 
 /**
- * Our scheduler.
+ * Handle for reporting statistics.
  */
  */
-static struct GNUNET_SCHEDULER_Handle *sched;
+struct GNUNET_STATISTICS_Handle *GSF_stats;
 
 /**
 
 /**
- * Our configuration.
+ * Handle for DHT operations.
  */
  */
-static const struct GNUNET_CONFIGURATION_Handle *cfg;
+struct GNUNET_DHT_Handle *GSF_dht;
 
 /**
 
 /**
- * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ * How long do requests typically stay in the routing table?
  */
  */
-static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
+struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime;
 
 /**
 
 /**
- * Map of peer identifiers to "struct PendingRequest" (for that peer).
+ * Running average of the observed latency to other peers (round trip).
+ * Initialized to 5s as the initial default.
  */
  */
-static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
+struct GNUNET_TIME_Relative GSF_avg_latency = { 500 };
 
 /**
 
 /**
- * Map of query identifiers to "struct PendingRequest" (for that query).
+ * Typical priorities we're seeing from other peers right now.  Since
+ * most priorities will be zero, this value is the weighted average of
+ * non-zero priorities seen "recently".  In order to ensure that new
+ * values do not dramatically change the ratio, values are first
+ * "capped" to a reasonable range (+N of the current value) and then
+ * averaged into the existing value by a ratio of 1:N.  Hence
+ * receiving the largest possible priority can still only raise our
+ * "current_priorities" by at most 1.
  */
  */
-static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
+double GSF_current_priorities;
 
 /**
 
 /**
- * Heap with the request that will expire next at the top.  Contains
- * pointers of type "struct PendingRequest*"; these will *also* be
- * aliased from the "requests_by_peer" data structures and the
- * "requests_by_query" table.  Note that requests from our clients
- * don't expire and are thus NOT in the "requests_by_expiration"
- * (or the "requests_by_peer" tables).
+ * How many query messages have we received 'recently' that
+ * have not yet been claimed as cover traffic?
  */
  */
-static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
+unsigned int GSF_cover_query_count;
 
 /**
 
 /**
- * Handle for reporting statistics.
+ * How many content messages have we received 'recently' that
+ * have not yet been claimed as cover traffic?
  */
  */
-static struct GNUNET_STATISTICS_Handle *stats;
+unsigned int GSF_cover_content_count;
 
 /**
 
 /**
- * Linked list of clients we are currently processing requests for.
+ * Our block context.
  */
  */
-static struct ClientList *client_list;
+struct GNUNET_BLOCK_Context *GSF_block_ctx;
 
 /**
  * Pointer to handle to the core service (points to NULL until we've
  * connected to it).
  */
 
 /**
  * Pointer to handle to the core service (points to NULL until we've
  * connected to it).
  */
-static struct GNUNET_CORE_Handle *core;
-
-/**
- * Head of linked list of blocks that can be migrated.
- */
-static struct MigrationReadyBlock *mig_head;
-
-/**
- * Tail of linked list of blocks that can be migrated.
- */
-static struct MigrationReadyBlock *mig_tail;
+struct GNUNET_CORE_Handle *GSF_core;
 
 /**
 
 /**
- * Request to datastore for migration (or NULL).
+ * Are we introducing randomized delays for better anonymity?
  */
  */
-static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+int GSF_enable_randomized_delays;
 
 
-/**
- * ID of task that collects blocks for migration.
- */
-static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+/* ***************************** locals ******************************* */
 
 /**
 
 /**
- * What is the maximum frequency at which we are allowed to
- * poll the datastore for migration content?
+ * Configuration for block library.
  */
  */
-static struct GNUNET_TIME_Relative min_migration_delay;
+static struct GNUNET_CONFIGURATION_Handle *block_cfg;
 
 /**
 
 /**
- * Size of the doubly-linked list of migration blocks.
+ * ID of our task that we use to age the cover counters.
  */
  */
-static unsigned int mig_size;
+static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
 
 /**
 
 /**
- * Are we allowed to migrate content to this peer.
+ * Datastore 'GET' load tracking.
  */
  */
-static int active_migration;
-
+static struct GNUNET_LOAD_Value *datastore_get_load;
 
 /**
 
 /**
- * Transmit messages by copying it to the target buffer
- * "buf".  "buf" will be NULL and "size" zero if the socket was closed
- * for writing in the meantime.  In that case, do nothing
- * (the disconnect or shutdown handler will take care of the rest).
- * If we were able to transmit messages and there are still more
- * pending, ask core again for further calls to this function.
- *
- * @param cls closure, pointer to the 'struct ConnectedPeer*'
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * Identity of this peer.
  */
  */
-static size_t
-transmit_to_peer (void *cls,
-                 size_t size, void *buf);
-
-
-/* ******************* clean up functions ************************ */
-
+static struct GNUNET_PeerIdentity my_id;
 
 /**
 
 /**
- * Delete the given migration block.
+ * Task that periodically ages our cover traffic statistics.
  *
  *
- * @param mb block to delete
+ * @param cls unused closure
+ * @param tc task context
  */
 static void
  */
 static void
-delete_migration_block (struct MigrationReadyBlock *mb)
+age_cover_counters (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
-  GNUNET_CONTAINER_DLL_remove (mig_head,
-                              mig_tail,
-                              mb);
-  GNUNET_PEER_decrement_rcs (mb->target_list,
-                            MIGRATION_LIST_SIZE);
-  mig_size--;
-  GNUNET_free (mb);
+  GSF_cover_content_count = (GSF_cover_content_count * 15) / 16;
+  GSF_cover_query_count = (GSF_cover_query_count * 15) / 16;
+  cover_age_task =
+      GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters,
+                                    NULL);
 }
 
 
 /**
 }
 
 
 /**
- * Compare the distance of two peers to a key.
+ * We've just now completed a datastore request.  Update our
+ * datastore load calculations.
  *
  *
- * @param key key
- * @param p1 first peer
- * @param p2 second peer
- * @return GNUNET_YES if P1 is closer to key than P2
+ * @param start time when the datastore request was issued
  */
  */
-static int
-is_closer (const GNUNET_HashCode *key,
-          const struct GNUNET_PeerIdentity *p1,
-          const struct GNUNET_PeerIdentity *p2)
+void
+GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start)
 {
 {
-  return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
-                                   &p2->hashPubKey,
-                                   key);
+  struct GNUNET_TIME_Relative delay;
+
+  delay = GNUNET_TIME_absolute_get_duration (start);
+  GNUNET_LOAD_update (datastore_get_load, delay.rel_value);
 }
 
 
 /**
 }
 
 
 /**
- * Consider migrating content to a given peer.
+ * Test if the DATABASE (GET) load on this peer is too high
+ * to even consider processing the query at
+ * all.
  *
  *
- * @param cls 'struct MigrationReadyBlock*' to select
- *            targets for (or NULL for none)
- * @param key ID of the peer 
- * @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal)
+ *         GNUNET_SYSERR to process for free (load low)
  */
  */
-static int
-consider_migration (void *cls,
-                   const GNUNET_HashCode *key,
-                   void *value)
+int
+GSF_test_get_load_too_high_ (uint32_t priority)
 {
 {
-  struct MigrationReadyBlock *mb = cls;
-  struct ConnectedPeer *cp = value;
-  struct MigrationReadyBlock *pos;
-  struct GNUNET_PeerIdentity cppid;
-  struct GNUNET_PeerIdentity otherpid;
-  struct GNUNET_PeerIdentity worstpid;
-  size_t msize;
-  unsigned int i;
-  unsigned int repl;
-  
-  /* consider 'cp' as a migration target for mb */
-  if (mb != NULL)
-    {
-      GNUNET_PEER_resolve (cp->pid,
-                          &cppid);
-      repl = MIGRATION_LIST_SIZE;
-      for (i=0;i<MIGRATION_LIST_SIZE;i++)
-       {
-         if (mb->target_list[i] == 0)
-           {
-             mb->target_list[i] = cp->pid;
-             GNUNET_PEER_change_rc (mb->target_list[i], 1);
-             repl = MIGRATION_LIST_SIZE;
-             break;
-           }
-         GNUNET_PEER_resolve (mb->target_list[i],
-                              &otherpid);
-         if ( (repl == MIGRATION_LIST_SIZE) &&
-              is_closer (&mb->query,
-                         &cppid,
-                         &otherpid)) 
-           {
-             repl = i;
-             worstpid = otherpid;
-           }
-         else if ( (repl != MIGRATION_LIST_SIZE) &&
-                   (is_closer (&mb->query,
-                               &worstpid,
-                               &otherpid) ) )
-           {
-             repl = i;
-             worstpid = otherpid;
-           }       
-       }
-      if (repl != MIGRATION_LIST_SIZE) 
-       {
-         GNUNET_PEER_change_rc (mb->target_list[repl], -1);
-         mb->target_list[repl] = cp->pid;
-         GNUNET_PEER_change_rc (mb->target_list[repl], 1);
-       }
-    }
+  double ld;
 
 
-  /* consider scheduling transmission to cp for content migration */
-  if (cp->cth != NULL)
-    return GNUNET_YES; 
-  msize = 0;
-  pos = mig_head;
-  while (pos != NULL)
-    {
-      for (i=0;i<MIGRATION_LIST_SIZE;i++)
-       {
-         if (cp->pid == pos->target_list[i])
-           {
-             if (msize == 0)
-               msize = pos->size;
-             else
-               msize = GNUNET_MIN (msize,
-                                   pos->size);
-             break;
-           }
-       }
-      pos = pos->next;
-    }
-  if (msize == 0)
-    return GNUNET_YES; /* no content available */
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Trying to migrate at least %u bytes to peer `%s'\n",
-             msize,
-             GNUNET_h2s (key));
-#endif
-  cp->cth 
-    = GNUNET_CORE_notify_transmit_ready (core,
-                                        0, GNUNET_TIME_UNIT_FOREVER_REL,
-                                        (const struct GNUNET_PeerIdentity*) key,
-                                        msize + sizeof (struct PutMessage),
-                                        &transmit_to_peer,
-                                        cp);
+  ld = GNUNET_LOAD_get_load (datastore_get_load);
+  if (ld < 1)
+    return GNUNET_SYSERR;
+  if (ld <= priority)
+    return GNUNET_NO;
   return GNUNET_YES;
 }
 
 
 /**
   return GNUNET_YES;
 }
 
 
 /**
- * Task that is run periodically to obtain blocks for content
- * migration
- * 
- * @param cls unused
- * @param tc scheduler context (also unused)
- */
-static void
-gather_migration_blocks (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-/**
- * If the migration task is not currently running, consider
- * (re)scheduling it with the appropriate delay.
+ * We've received peer performance information. Update
+ * our running average for the P2P latency.
+ *
+ * @param atsi performance information
+ * @param atsi_count number of 'atsi' records
  */
 static void
  */
 static void
-consider_migration_gathering ()
+update_latencies (const struct GNUNET_ATS_Information *atsi,
+                  unsigned int atsi_count)
 {
 {
-  struct GNUNET_TIME_Relative delay;
+  unsigned int i;
 
 
-  if (mig_qe != NULL)
-    return;
-  if (mig_task != GNUNET_SCHEDULER_NO_TASK)
-    return;
-  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                        mig_size);
-  delay = GNUNET_TIME_relative_divide (delay,
-                                      MAX_MIGRATION_QUEUE);
-  delay = GNUNET_TIME_relative_max (delay,
-                                   min_migration_delay);
-  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          delay,
-                                          &gather_migration_blocks,
-                                          NULL);
+  for (i = 0; i < atsi_count; i++)
+  {
+    if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DELAY)
+    {
+      GSF_avg_latency.rel_value =
+          (GSF_avg_latency.rel_value * 31 +
+           GNUNET_MIN (5000, ntohl (atsi[i].value))) / 32;
+      GNUNET_STATISTICS_set (GSF_stats,
+                             gettext_noop
+                             ("# running average P2P latency (ms)"),
+                             GSF_avg_latency.rel_value, GNUNET_NO);
+      break;
+    }
+  }
 }
 
 
 /**
 }
 
 
 /**
- * Process content offered for migration.
+ * Handle P2P "PUT" message.
  *
  *
- * @param cls closure
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ *        for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param atsi performance information
+ * @param atsi_count number of records in 'atsi'
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
  */
  */
-static void
-process_migration_content (void *cls,
-                          const GNUNET_HashCode * key,
-                          uint32_t size,
-                          const void *data,
-                          enum GNUNET_BLOCK_Type type,
-                          uint32_t priority,
-                          uint32_t anonymity,
-                          struct GNUNET_TIME_Absolute
-                          expiration, uint64_t uid)
+static int
+handle_p2p_put (void *cls, const struct GNUNET_PeerIdentity *other,
+                const struct GNUNET_MessageHeader *message,
+                const struct GNUNET_ATS_Information *atsi,
+                unsigned int atsi_count)
 {
 {
-  struct MigrationReadyBlock *mb;
-  
-  if (key == NULL)
-    {
-      mig_qe = NULL;
-      if (mig_size < MAX_MIGRATION_QUEUE)  
-       consider_migration_gathering ();
-      return;
-    }
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
-    {
-      if (GNUNET_OK !=
-         GNUNET_FS_handle_on_demand_block (key, size, data,
-                                           type, priority, anonymity,
-                                           expiration, uid, 
-                                           &process_migration_content,
-                                           NULL))
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Retrieved block `%s' of type %u for migration\n",
-             GNUNET_h2s (key),
-             type);
-#endif
-  mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
-  mb->query = *key;
-  mb->expiration = expiration;
-  mb->size = size;
-  mb->type = type;
-  memcpy (&mb[1], data, size);
-  GNUNET_CONTAINER_DLL_insert_after (mig_head,
-                                    mig_tail,
-                                    mig_tail,
-                                    mb);
-  mig_size++;
-  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                        &consider_migration,
-                                        mb);
-  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+  struct GSF_ConnectedPeer *cp;
+
+  cp = GSF_peer_get_ (other);
+  if (NULL == cp)
+  {
+    GNUNET_break (0);
+    return GNUNET_OK;
+  }
+  GSF_cover_content_count++;
+  update_latencies (atsi, atsi_count);
+  return GSF_handle_p2p_content_ (cp, message);
 }
 
 
 /**
 }
 
 
 /**
- * Task that is run periodically to obtain blocks for content
- * migration
- * 
- * @param cls unused
- * @param tc scheduler context (also unused)
+ * We have a new request, consider forwarding it to the given
+ * peer.
+ *
+ * @param cls the 'struct GSF_PendingRequest'
+ * @param peer identity of the peer
+ * @param cp handle to the connected peer record
+ * @param ppd peer performance data
  */
 static void
  */
 static void
-gather_migration_blocks (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  mig_task = GNUNET_SCHEDULER_NO_TASK;
-  mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
-                                       GNUNET_TIME_UNIT_FOREVER_REL,
-                                       &process_migration_content, NULL);
-  GNUNET_assert (mig_qe != NULL);
+consider_request_for_forwarding (void *cls,
+                                 const struct GNUNET_PeerIdentity *peer,
+                                 struct GSF_ConnectedPeer *cp,
+                                 const struct GSF_PeerPerformanceData *ppd)
+{
+  struct GSF_PendingRequest *pr = cls;
+
+  if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer))
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                              gettext_noop ("# Loopback routes suppressed"), 1,
+                              GNUNET_NO);
+    return;
+  }
+  GSF_plan_add_ (cp, pr);
 }
 
 
 /**
 }
 
 
 /**
- * We're done with a particular message list entry.
- * Free all associated resources.
- * 
- * @param pml entry to destroy
+ * Function to be called after we're done processing
+ * replies from the local lookup.  If the result status
+ * code indicates that there may be more replies, plan
+ * forwarding the request.
+ *
+ * @param cls closure (NULL)
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
  */
 static void
  */
 static void
-destroy_pending_message_list_entry (struct PendingMessageList *pml)
+consider_forwarding (void *cls, struct GSF_PendingRequest *pr,
+                     enum GNUNET_BLOCK_EvaluationResult result)
 {
 {
-  GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
-                              pml->req->pending_tail,
-                              pml);
-  GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
-                              pml->target->pending_messages_tail,
-                              pml->pm);
-  pml->target->pending_requests--;
-  GNUNET_free (pml->pm);
-  GNUNET_free (pml);
+  if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
+    return;                     /* we're done... */
+  GSF_iterate_connected_peers_ (&consider_request_for_forwarding, pr);
 }
 
 
 /**
 }
 
 
 /**
- * Destroy the given pending message (and call the respective
- * continuation).
+ * Handle P2P "GET" request.
  *
  *
- * @param pm message to destroy
- * @param tpid id of peer that the message was delivered to, or 0 for none
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ *        for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param atsi performance information
+ * @param atsi_count number of records in 'atsi'
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
  */
  */
-static void
-destroy_pending_message (struct PendingMessage *pm,
-                        GNUNET_PEER_Id tpid)
+static int
+handle_p2p_get (void *cls, const struct GNUNET_PeerIdentity *other,
+                const struct GNUNET_MessageHeader *message,
+                const struct GNUNET_ATS_Information *atsi,
+                unsigned int atsi_count)
 {
 {
-  struct PendingMessageList *pml = pm->pml;
-  TransmissionContinuation cont;
-  void *cont_cls;
-
-  GNUNET_assert (pml->pm == pm);
-  GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
-  cont = pm->cont;
-  cont_cls = pm->cont_cls;
-  destroy_pending_message_list_entry (pml);
-  cont (cont_cls, tpid);  
+  struct GSF_PendingRequest *pr;
+
+  pr = GSF_handle_p2p_query_ (other, message);
+  if (NULL == pr)
+    return GNUNET_SYSERR;
+  GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
+  GSF_local_lookup_ (pr, &consider_forwarding, NULL);
+  update_latencies (atsi, atsi_count);
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * We're done processing a particular request.
- * Free all associated resources.
+ * We're done with the local lookup, now consider
+ * P2P processing (depending on request options and
+ * result status).  Also signal that we can now
+ * receive more request information from the client.
  *
  *
- * @param pr request to destroy
+ * @param cls the client doing the request ('struct GNUNET_SERVER_Client')
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
  */
 static void
  */
 static void
-destroy_pending_request (struct PendingRequest *pr)
+start_p2p_processing (void *cls, struct GSF_PendingRequest *pr,
+                      enum GNUNET_BLOCK_EvaluationResult result)
 {
 {
-  struct GNUNET_PeerIdentity pid;
+  struct GNUNET_SERVER_Client *client = cls;
+  struct GSF_PendingRequestData *prd;
 
 
-  if (pr->hnode != NULL)
-    {
-      GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
-                                        pr->hnode);
-      pr->hnode = NULL;
-    }
-  if (NULL == pr->client_request_list)
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# P2P searches active"),
-                               -1,
-                               GNUNET_NO);
-    }
-  else
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# client searches active"),
-                               -1,
-                               GNUNET_NO);
-    }
-  /* might have already been removed from map in 'process_reply' (if
-     there was a unique reply) or never inserted if it was a
-     duplicate; hence ignore the return value here */
-  (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
-                                              &pr->query,
-                                              pr);
-  if (pr->qe != NULL)
-     {
-      GNUNET_DATASTORE_cancel (pr->qe);
-      pr->qe = NULL;
-    }
-  if (pr->client_request_list != NULL)
-    {
-      GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
-                                  pr->client_request_list->client_list->rl_tail,
-                                  pr->client_request_list);
-      GNUNET_free (pr->client_request_list);
-      pr->client_request_list = NULL;
-    }
-  if (pr->cp != NULL)
-    {
-      GNUNET_PEER_resolve (pr->cp->pid,
-                          &pid);
-      (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
-                                                  &pid.hashPubKey,
-                                                  pr);
-      pr->cp = NULL;
-    }
-  if (pr->bf != NULL)
-    {
-      GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                       
-      pr->bf = NULL;
-    }
-  if (pr->irc != NULL)
-    {
-      GNUNET_CORE_peer_change_preference_cancel (pr->irc);
-      pr->irc = NULL;
-    }
-  if (pr->replies_seen != NULL)
-    {
-      GNUNET_free (pr->replies_seen);
-      pr->replies_seen = NULL;
-    }
-  if (pr->task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (sched,
-                              pr->task);
-      pr->task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  while (NULL != pr->pending_head)    
-    destroy_pending_message_list_entry (pr->pending_head);
-  GNUNET_PEER_change_rc (pr->target_pid, -1);
-  if (pr->used_pids != NULL)
-    {
-      GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
-      GNUNET_free (pr->used_pids);
-      pr->used_pids_off = 0;
-      pr->used_pids_size = 0;
-      pr->used_pids = NULL;
-    }
-  GNUNET_free (pr);
+  prd = GSF_pending_request_get_data_ (pr);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Finished database lookup for local request `%s' with result %d\n",
+              GNUNET_h2s (&prd->query), result);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
+    return;                     /* we're done, 'pr' was already destroyed... */
+  if (0 != (GSF_PRO_LOCAL_ONLY & prd->options))
+  {
+    GSF_pending_request_cancel_ (pr, GNUNET_YES);
+    return;
+  }
+  GSF_dht_lookup_ (pr);
+  consider_forwarding (NULL, pr, result);
 }
 
 
 /**
 }
 
 
 /**
- * Method called whenever a given peer connects.
+ * Handle START_SEARCH-message (search request from client).
  *
  *
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
  */
  */
-static void 
-peer_connect_handler (void *cls,
-                     const struct
-                     GNUNET_PeerIdentity * peer,
-                     struct GNUNET_TIME_Relative latency,
-                     uint32_t distance)
+static void
+handle_start_search (void *cls, struct GNUNET_SERVER_Client *client,
+                     const struct GNUNET_MessageHeader *message)
 {
 {
-  struct ConnectedPeer *cp;
-  struct MigrationReadyBlock *pos;
-  
-  cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
-  cp->pid = GNUNET_PEER_intern (peer);
-  GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (connected_peers,
-                                                  &peer->hashPubKey,
-                                                  cp,
-                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-
-  pos = mig_head;
-  while (NULL != pos)
-    {
-      (void) consider_migration (pos, &peer->hashPubKey, cp);
-      pos = pos->next;
-    }
-}
+  struct GSF_PendingRequest *pr;
+  int ret;
 
 
+  pr = NULL;
+  ret = GSF_local_client_start_search_handler_ (client, message, &pr);
+  switch (ret)
+  {
+  case GNUNET_SYSERR:
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    break;
+  case GNUNET_NO:
+    GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    break;
+  case GNUNET_YES:
+    GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
+    GSF_local_lookup_ (pr, &start_p2p_processing, client);
+    break;
+  default:
+    GNUNET_assert (0);
+  }
+}
 
 
 /**
 
 
 /**
- * Free (each) request made by the peer.
+ * Task run during shutdown.
  *
  *
- * @param cls closure, points to peer that the request belongs to
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES (we should continue to iterate)
+ * @param cls unused
+ * @param tc unused
+ */
+static void
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  if (NULL != GSF_core)
+  {
+    GNUNET_CORE_disconnect (GSF_core);
+    GSF_core = NULL;
+  }
+  GSF_put_done_ ();
+  GSF_push_done_ ();
+  GSF_pending_request_done_ ();
+  GSF_plan_done ();
+  GSF_connected_peer_done_ ();
+  GNUNET_DATASTORE_disconnect (GSF_dsh, GNUNET_NO);
+  GSF_dsh = NULL;
+  GNUNET_DHT_disconnect (GSF_dht);
+  GSF_dht = NULL;
+  GNUNET_BLOCK_context_destroy (GSF_block_ctx);
+  GSF_block_ctx = NULL;
+  GNUNET_CONFIGURATION_destroy (block_cfg);
+  block_cfg = NULL;
+  GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO);
+  GSF_stats = NULL;
+  if (GNUNET_SCHEDULER_NO_TASK != cover_age_task)
+  {
+    GNUNET_SCHEDULER_cancel (cover_age_task);
+    cover_age_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  GNUNET_FS_indexing_done ();
+  GNUNET_LOAD_value_free (datastore_get_load);
+  datastore_get_load = NULL;
+  GNUNET_LOAD_value_free (GSF_rt_entry_lifetime);
+  GSF_rt_entry_lifetime = NULL;
+}
+
+
+/**
+ * Function called for each pending request whenever a new
+ * peer connects, giving us a chance to decide about submitting
+ * the existing request to the new peer.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' of the new peer
+ * @param key query for the request
+ * @param pr handle to the pending request
+ * @return GNUNET_YES to continue to iterate
  */
 static int
  */
 static int
-destroy_request (void *cls,
-                const GNUNET_HashCode * key,
-                void *value)
+consider_peer_for_forwarding (void *cls, const GNUNET_HashCode * key,
+                              struct GSF_PendingRequest *pr)
 {
 {
-  const struct GNUNET_PeerIdentity * peer = cls;
-  struct PendingRequest *pr = value;
-  
-  GNUNET_break (GNUNET_YES ==
-               GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
-                                                     &peer->hashPubKey,
-                                                     pr));
-  destroy_pending_request (pr);
+  struct GSF_ConnectedPeer *cp = cls;
+  struct GNUNET_PeerIdentity pid;
+
+  GSF_connected_peer_get_identity_ (cp, &pid);
+  if (GNUNET_YES != GSF_pending_request_test_target_ (pr, &pid))
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                              gettext_noop ("# Loopback routes suppressed"), 1,
+                              GNUNET_NO);
+    return GNUNET_YES;
+  }
+  GSF_plan_add_ (cp, pr);
   return GNUNET_YES;
 }
 
 
 /**
   return GNUNET_YES;
 }
 
 
 /**
- * Method called whenever a peer disconnects.
+ * Method called whenever a given peer connects.
  *
  * @param cls closure, not used
  * @param peer peer identity this notification is about
  *
  * @param cls closure, not used
  * @param peer peer identity this notification is about
+ * @param atsi performance information
+ * @param atsi_count number of records in 'atsi'
  */
 static void
  */
 static void
-peer_disconnect_handler (void *cls,
-                        const struct
-                        GNUNET_PeerIdentity * peer)
-{
-  struct ConnectedPeer *cp;
-  struct PendingMessage *pm;
-  unsigned int i;
-  struct MigrationReadyBlock *pos;
-  struct MigrationReadyBlock *next;
-
-  GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
-                                             &peer->hashPubKey,
-                                             &destroy_request,
-                                             (void*) peer);
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &peer->hashPubKey);
-  if (cp == NULL)
-    return;
-  for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
-    {
-      if (NULL != cp->last_client_replies[i])
-       {
-         GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
-         cp->last_client_replies[i] = NULL;
-       }
-    }
-  GNUNET_break (GNUNET_YES ==
-               GNUNET_CONTAINER_multihashmap_remove (connected_peers,
-                                                     &peer->hashPubKey,
-                                                     cp));
-  /* remove this peer from migration considerations; schedule
-     alternatives */
-  next = mig_head;
-  while (NULL != (pos = next))
-    {
-      next = pos->next;
-      for (i=0;i<MIGRATION_LIST_SIZE;i++)
-       {
-         if (pos->target_list[i] == cp->pid)
-           {
-             GNUNET_PEER_change_rc (pos->target_list[i], -1);
-             pos->target_list[i] = 0;
-            }
-         }
-      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
-       {
-         delete_migration_block (pos);
-         consider_migration_gathering ();
-          continue;
-       }
-      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                            &consider_migration,
-                                            pos);
-    }
-  if (cp->trust_delta > 0)
-    {
-      /* FIXME: push trust back to peerinfo! 
-        (need better peerinfo API!) */
-    }
-  GNUNET_PEER_change_rc (cp->pid, -1);
-  GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
-  if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
-  while (NULL != (pm = cp->pending_messages_head))
-    destroy_pending_message (pm, 0 /* delivery failed */);
-  GNUNET_break (0 == cp->pending_requests);
-  GNUNET_free (cp);
-}
-
-
-/**
- * Iterator over hash map entries that removes all occurences
- * of the given 'client' from the 'last_client_replies' of the
- * given connected peer.
- *
- * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
- * @param key current key code (unused)
- * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
- * @return GNUNET_YES (we should continue to iterate)
- */
-static int
-remove_client_from_last_client_replies (void *cls,
-                                       const GNUNET_HashCode * key,
-                                       void *value)
+peer_connect_handler (void *cls, const struct GNUNET_PeerIdentity *peer,
+                      const struct GNUNET_ATS_Information *atsi,
+                      unsigned int atsi_count)
 {
 {
-  struct GNUNET_SERVER_Client *client = cls;
-  struct ConnectedPeer *cp = value;
-  unsigned int i;
+  struct GSF_ConnectedPeer *cp;
 
 
-  for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
-    {
-      if (cp->last_client_replies[i] == client)
-       {
-         GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
-         cp->last_client_replies[i] = NULL;
-       }
-    }  
-  return GNUNET_YES;
+  if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
+    return;
+  cp = GSF_peer_connect_handler_ (peer, atsi, atsi_count);
+  if (NULL == cp)
+    return;
+  GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, cp);
 }
 
 
 /**
 }
 
 
 /**
- * A client disconnected.  Remove all of its pending queries.
+ * Function called after GNUNET_CORE_connect has succeeded
+ * (or failed for good).  Note that the private key of the
+ * peer is intentionally not exposed here; if you need it,
+ * your process should try to read the private key file
+ * directly (which should work if you are authorized...).
  *
  *
- * @param cls closure, NULL
- * @param client identification of the client
+ * @param cls closure
+ * @param server handle to the server, NULL if we failed
+ * @param my_identity ID of this peer, NULL if we failed
  */
 static void
  */
 static void
-handle_client_disconnect (void *cls,
-                         struct GNUNET_SERVER_Client
-                         * client)
+peer_init_handler (void *cls, struct GNUNET_CORE_Handle *server,
+                   const struct GNUNET_PeerIdentity *my_identity)
 {
 {
-  struct ClientList *pos;
-  struct ClientList *prev;
-  struct ClientRequestList *rcl;
-  struct ClientResponseMessage *creply;
-
-  if (client == NULL)
-    return;
-  prev = NULL;
-  pos = client_list;
-  while ( (NULL != pos) &&
-         (pos->client != client) )
-    {
-      prev = pos;
-      pos = pos->next;
-    }
-  if (pos == NULL)
-    return; /* no requests pending for this client */
-  while (NULL != (rcl = pos->rl_head))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                 "Destroying pending request `%s' on disconnect\n",
-                 GNUNET_h2s (&rcl->req->query));
-      destroy_pending_request (rcl->req);
-    }
-  if (prev == NULL)
-    client_list = pos->next;
-  else
-    prev->next = pos->next;
-  if (pos->th != NULL)
-    {
-      GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
-      pos->th = NULL;
-    }
-  while (NULL != (creply = pos->res_head))
-    {
-      GNUNET_CONTAINER_DLL_remove (pos->res_head,
-                                  pos->res_tail,
-                                  creply);
-      GNUNET_free (creply);
-    }    
-  GNUNET_SERVER_client_drop (pos->client);
-  GNUNET_free (pos);
-  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                        &remove_client_from_last_client_replies,
-                                        client);
+  my_id = *my_identity;
 }
 
 
 /**
 }
 
 
 /**
- * Iterator to free peer entries.
+ * Process fs requests.
  *
  *
- * @param cls closure, unused
- * @param key current key code
- * @param value value in the hash map (peer entry)
- * @return GNUNET_YES (we should continue to iterate)
+ * @param server the initialized server
+ * @param c configuration to use
  */
  */
-static int 
-clean_peer (void *cls,
-           const GNUNET_HashCode * key,
-           void *value)
-{
-  peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
-  return GNUNET_YES;
+static int
+main_init (struct GNUNET_SERVER_Handle *server,
+           const struct GNUNET_CONFIGURATION_Handle *c)
+{
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = {
+    {&handle_p2p_get,
+     GNUNET_MESSAGE_TYPE_FS_GET, 0},
+    {&handle_p2p_put,
+     GNUNET_MESSAGE_TYPE_FS_PUT, 0},
+    {&GSF_handle_p2p_migration_stop_,
+     GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+     sizeof (struct MigrationStopMessage)},
+    {NULL, 0, 0}
+  };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL,
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL,
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET,
+     sizeof (struct GNUNET_MessageHeader)},
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
+     sizeof (struct UnindexMessage)},
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
+     0},
+    {NULL, NULL, 0, 0}
+  };
+
+  GSF_core =
+      GNUNET_CORE_connect (GSF_cfg, 1, NULL, &peer_init_handler,
+                           &peer_connect_handler, &GSF_peer_disconnect_handler_,
+                           NULL, GNUNET_NO, NULL, GNUNET_NO, p2p_handlers);
+  if (NULL == GSF_core)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("Failed to connect to `%s' service.\n"), "core");
+    return GNUNET_SYSERR;
+  }
+  GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_,
+                                   NULL);
+  GNUNET_SERVER_add_handlers (server, handlers);
+  cover_age_task =
+      GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters,
+                                    NULL);
+  datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
+                                NULL);
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Task run during shutdown.
+ * Process fs requests.
  *
  *
- * @param cls unused
- * @param tc unused
+ * @param cls closure
+ * @param server the initialized server
+ * @param cfg configuration to use
  */
 static void
  */
 static void
-shutdown_task (void *cls,
-              const struct GNUNET_SCHEDULER_TaskContext *tc)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
 {
-  if (mig_qe != NULL)
-    {
-      GNUNET_DATASTORE_cancel (mig_qe);
-      mig_qe = NULL;
-    }
-  if (GNUNET_SCHEDULER_NO_TASK != mig_task)
-    {
-      GNUNET_SCHEDULER_cancel (sched, mig_task);
-      mig_task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  while (client_list != NULL)
-    handle_client_disconnect (NULL,
-                             client_list->client);
-  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                        &clean_peer,
-                                        NULL);
-  GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
-  GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
-  requests_by_expiration_heap = 0;
-  GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
-  connected_peers = NULL;
-  GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
-  GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
-  query_request_map = NULL;
-  GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
-  GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
-  peer_request_map = NULL;
-  GNUNET_assert (NULL != core);
-  GNUNET_CORE_disconnect (core);
-  core = NULL;
-  if (stats != NULL)
-    {
-      GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
-      stats = NULL;
-    }
-  GNUNET_DATASTORE_disconnect (dsh,
-                              GNUNET_NO);
-  while (mig_head != NULL)
-    delete_migration_block (mig_head);
-  GNUNET_assert (0 == mig_size);
-  dsh = NULL;
-  sched = NULL;
-  cfg = NULL;  
-}
-
-
-/* ******************* Utility functions  ******************** */
-
-
-/**
- * Transmit messages by copying it to the target buffer
- * "buf".  "buf" will be NULL and "size" zero if the socket was closed
- * for writing in the meantime.  In that case, do nothing
- * (the disconnect or shutdown handler will take care of the rest).
- * If we were able to transmit messages and there are still more
- * pending, ask core again for further calls to this function.
- *
- * @param cls closure, pointer to the 'struct ConnectedPeer*'
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_to_peer (void *cls,
-                 size_t size, void *buf)
-{
-  struct ConnectedPeer *cp = cls;
-  char *cbuf = buf;
-  struct GNUNET_PeerIdentity pid;
-  struct PendingMessage *pm;
-  struct MigrationReadyBlock *mb;
-  struct MigrationReadyBlock *next;
-  struct PutMessage migm;
-  size_t msize;
-  unsigned int i;
-  cp->cth = NULL;
-  if (NULL == buf)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping message, core too busy.\n");
-#endif
-      return 0;
-    }
-  msize = 0;
-  while ( (NULL != (pm = cp->pending_messages_head) ) &&
-         (pm->msize <= size) )
-    {
-      memcpy (&cbuf[msize], &pm[1], pm->msize);
-      msize += pm->msize;
-      size -= pm->msize;
-      destroy_pending_message (pm, cp->pid);
-    }
-  if (NULL != pm)
-    {
-      GNUNET_PEER_resolve (cp->pid,
-                          &pid);
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  pm->priority,
-                                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,
-                                                  &pid,
-                                                  pm->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
-    }
-  else
-    {      
-      next = mig_head;
-      while (NULL != (mb = next))
-       {
-         next = mb->next;
-         for (i=0;i<MIGRATION_LIST_SIZE;i++)
-           {
-             if ( (cp->pid == mb->target_list[i]) &&
-                  (mb->size + sizeof (migm) <= size) )
-               {
-                 GNUNET_PEER_change_rc (mb->target_list[i], -1);
-                 mb->target_list[i] = 0;
-                 mb->used_targets++;
-                 migm.header.size = htons (sizeof (migm) + mb->size);
-                 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-                 migm.type = htonl (mb->type);
-                 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
-                 memcpy (&cbuf[msize], &migm, sizeof (migm));
-                 msize += sizeof (migm);
-                 size -= sizeof (migm);
-                 memcpy (&cbuf[msize], &mb[1], mb->size);
-                 msize += mb->size;
-                 size -= mb->size;
-#if DEBUG_FS
-                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
-                             GNUNET_h2s (&mb->query),
-                             mb->size,
-                             GNUNET_i2s (&pid));
-#endif   
-                 break;
-               }
-             else
-               {
-#if DEBUG_FS
-                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
-                             GNUNET_h2s (&mb->query),
-                             mb->size,
-                             GNUNET_i2s (&pid));
-#endif   
-               }
-           }
-         if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
-              (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
-           {
-             delete_migration_block (mb);
-             consider_migration_gathering ();
-           }
-       }
-      consider_migration (NULL, 
-                         &pid.hashPubKey,
-                         cp);
-    }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes to peer %u\n",
-             msize,
-             cp->pid);
-#endif
-  return msize;
-}
-
-
-/**
- * Add a message to the set of pending messages for the given peer.
- *
- * @param cp peer to send message to
- * @param pm message to queue
- * @param pr request on which behalf this message is being queued
- */
-static void
-add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
-                                 struct PendingMessage *pm,
-                                 struct PendingRequest *pr)
-{
-  struct PendingMessage *pos;
-  struct PendingMessageList *pml;
-  struct GNUNET_PeerIdentity pid;
-
-  GNUNET_assert (pm->next == NULL);
-  GNUNET_assert (pm->pml == NULL);    
-  pml = GNUNET_malloc (sizeof (struct PendingMessageList));
-  pml->req = pr;
-  pml->target = cp;
-  pml->pm = pm;
-  pm->pml = pml;  
-  GNUNET_CONTAINER_DLL_insert (pr->pending_head,
-                              pr->pending_tail,
-                              pml);
-  pos = cp->pending_messages_head;
-  while ( (pos != NULL) &&
-         (pm->priority < pos->priority) )
-    pos = pos->next;    
-  GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
-                                    cp->pending_messages_tail,
-                                    pos,
-                                    pm);
-  cp->pending_requests++;
-  if (cp->pending_requests > MAX_QUEUE_PER_PEER)
-    destroy_pending_message (cp->pending_messages_tail, 0);  
-  GNUNET_PEER_resolve (cp->pid, &pid);
-  if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
-  /* need to schedule transmission */
-  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                              cp->pending_messages_head->priority,
-                                              MAX_TRANSMIT_DELAY,
-                                              &pid,
-                                              cp->pending_messages_head->msize,
-                                              &transmit_to_peer,
-                                              cp);
-  if (cp->cth == NULL)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Failed to schedule transmission with core!\n");
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# CORE transmission failures"),
-                               1,
-                               GNUNET_NO);
-    }
-}
-
-
-/**
- * Mingle hash with the mingle_number to produce different bits.
- */
-static void
-mingle_hash (const GNUNET_HashCode * in,
-            int32_t mingle_number, 
-            GNUNET_HashCode * hc)
-{
-  GNUNET_HashCode m;
-
-  GNUNET_CRYPTO_hash (&mingle_number, 
-                     sizeof (int32_t), 
-                     &m);
-  GNUNET_CRYPTO_hash_xor (&m, in, hc);
-}
-
-
-/**
- * Test if the load on this peer is too high
- * to even consider processing the query at
- * all.
- * 
- * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
- */
-static int
-test_load_too_high ()
-{
-  return GNUNET_NO; // FIXME
-}
-
-
-/* ******************* Pending Request Refresh Task ******************** */
-
-
-
-/**
- * We use a random delay to make the timing of requests less
- * predictable.  This function returns such a random delay.  We add a base
- * delay of MAX_CORK_DELAY (1s).
- *
- * FIXME: make schedule dependent on the specifics of the request?
- * Or bandwidth and number of connected peers and load?
- *
- * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
- */
-static struct GNUNET_TIME_Relative
-get_processing_delay ()
-{
-  return 
-    GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
-                             GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
-                                                            GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                                                                      TTL_DECREMENT)));
-}
-
-
-/**
- * We're processing a GET request from another peer and have decided
- * to forward it to other peers.  This function is called periodically
- * and should forward the request to other peers until we have all
- * possible replies.  If we have transmitted the *only* reply to
- * the initiator we should destroy the pending request.  If we have
- * many replies in the queue to the initiator, we should delay sending
- * out more queries until the reply queue has shrunk some.
- *
- * @param cls our "struct ProcessGetContext *"
- * @param tc unused
- */
-static void
-forward_request_task (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-/**
- * Function called after we either failed or succeeded
- * at transmitting a query to a peer.  
- *
- * @param cls the requests "struct PendingRequest*"
- * @param tpid ID of receiving peer, 0 on transmission error
- */
-static void
-transmit_query_continuation (void *cls,
-                            GNUNET_PEER_Id tpid)
-{
-  struct PendingRequest *pr = cls;
-
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# queries scheduled for forwarding"),
-                           -1,
-                           GNUNET_NO);
-  if (tpid == 0)   
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Transmission of request failed, will try again later.\n");
-#endif
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
-                                                &forward_request_task,
-                                                pr); 
-      return;    
-    }
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# queries forwarded"),
-                           1,
-                           GNUNET_NO);
-  GNUNET_PEER_change_rc (tpid, 1);
-  if (pr->used_pids_off == pr->used_pids_size)
-    GNUNET_array_grow (pr->used_pids,
-                      pr->used_pids_size,
-                      pr->used_pids_size * 2 + 2);
-  pr->used_pids[pr->used_pids_off++] = tpid;
-  if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-    pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                            get_processing_delay (),
-                                            &forward_request_task,
-                                            pr);
-}
-
-
-/**
- * How many bytes should a bloomfilter be if we have already seen
- * entry_count responses?  Note that BLOOMFILTER_K gives us the number
- * of bits set per entry.  Furthermore, we should not re-size the
- * filter too often (to keep it cheap).
- *
- * Since other peers will also add entries but not resize the filter,
- * we should generally pick a slightly larger size than what the
- * strict math would suggest.
- *
- * @return must be a power of two and smaller or equal to 2^15.
- */
-static size_t
-compute_bloomfilter_size (unsigned int entry_count)
-{
-  size_t size;
-  unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
-  uint16_t max = 1 << 15;
-
-  if (entry_count > max)
-    return max;
-  size = 8;
-  while ((size < max) && (size < ideal))
-    size *= 2;
-  if (size > max)
-    return max;
-  return size;
-}
-
-
-/**
- * Recalculate our bloom filter for filtering replies.  This function
- * will create a new bloom filter from scratch, so it should only be
- * called if we have no bloomfilter at all (and hence can create a
- * fresh one of minimal size without problems) OR if our peer is the
- * initiator (in which case we may resize to larger than mimimum size).
- *
- * @param pr request for which the BF is to be recomputed
- */
-static void
-refresh_bloomfilter (struct PendingRequest *pr)
-{
-  unsigned int i;
-  size_t nsize;
-  GNUNET_HashCode mhash;
-
-  nsize = compute_bloomfilter_size (pr->replies_seen_off);
-  if (nsize == pr->bf_size)
-    return; /* size not changed */
-  if (pr->bf != NULL)
-    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
-  pr->bf_size = nsize;
-  pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
-  pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
-                                             pr->bf_size,
-                                             BLOOMFILTER_K);
-  for (i=0;i<pr->replies_seen_off;i++)
-    {
-      mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
-      GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
-    }
-}
-
-
-/**
- * Function called after we've tried to reserve a certain amount of
- * bandwidth for a reply.  Check if we succeeded and if so send our
- * query.
- *
- * @param cls the requests "struct PendingRequest*"
- * @param peer identifies the peer
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
- * @param bpm_out set to the current bandwidth limit (sending) for this peer
- * @param amount set to the amount that was actually reserved or unreserved
- * @param preference current traffic preference for the given peer
- */
-static void
-target_reservation_cb (void *cls,
-                      const struct
-                      GNUNET_PeerIdentity * peer,
-                      struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
-                      struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
-                      int amount,
-                      uint64_t preference)
-{
-  struct PendingRequest *pr = cls;
-  struct ConnectedPeer *cp;
-  struct PendingMessage *pm;
-  struct GetMessage *gm;
-  GNUNET_HashCode *ext;
-  char *bfdata;
-  size_t msize;
-  unsigned int k;
-  int no_route;
-  uint32_t bm;
-
-  pr->irc = NULL;
-  if (peer == NULL)
-    {
-      /* error in communication with core, try again later */
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
-                                                &forward_request_task,
-                                                pr);
-      return;
-    }
-  // (3) transmit, update ttl/priority
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &peer->hashPubKey);
-  if (cp == NULL)
-    {
-      /* Peer must have just left */
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Selected peer disconnected!\n");
-#endif
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                get_processing_delay (),
-                                                &forward_request_task,
-                                                pr);
-      return;
-    }
-  no_route = GNUNET_NO;
-  if (amount == 0)
-    {
-      if (pr->cp == NULL)
-       {
-#if DEBUG_FS > 1
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
-                     amount,
-                     DBLOCK_SIZE);
-#endif
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# reply bandwidth reservation requests failed"),
-                                   1,
-                                   GNUNET_NO);
-         if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                    get_processing_delay (),
-                                                    &forward_request_task,
-                                                    pr);
-         return;  /* this target round failed */
-       }
-      /* FIXME: if we are "quite" busy, we may still want to skip
-        this round; need more load detection code! */
-      no_route = GNUNET_YES;
-    }
-  
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# queries scheduled for forwarding"),
-                           1,
-                           GNUNET_NO);
-  /* build message and insert message into priority queue */
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Forwarding request `%s' to `%4s'!\n",
-             GNUNET_h2s (&pr->query),
-             GNUNET_i2s (peer));
-#endif
-  k = 0;
-  bm = 0;
-  if (GNUNET_YES == no_route)
-    {
-      bm |= GET_MESSAGE_BIT_RETURN_TO;
-      k++;      
-    }
-  if (pr->namespace != NULL)
-    {
-      bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
-      k++;
-    }
-  if (pr->target_pid != 0)
-    {
-      bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
-      k++;
-    }
-  msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
-  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
-  pm->msize = msize;
-  gm = (struct GetMessage*) &pm[1];
-  gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
-  gm->header.size = htons (msize);
-  gm->type = htonl (pr->type);
-  pr->remaining_priority /= 2;
-  gm->priority = htonl (pr->remaining_priority);
-  gm->ttl = htonl (pr->ttl);
-  gm->filter_mutator = htonl(pr->mingle); 
-  gm->hash_bitmap = htonl (bm);
-  gm->query = pr->query;
-  ext = (GNUNET_HashCode*) &gm[1];
-  k = 0;
-  if (GNUNET_YES == no_route)
-    GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
-  if (pr->namespace != NULL)
-    memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
-  if (pr->target_pid != 0)
-    GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
-  bfdata = (char *) &ext[k];
-  if (pr->bf != NULL)
-    GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
-                                              bfdata,
-                                              pr->bf_size);
-  pm->cont = &transmit_query_continuation;
-  pm->cont_cls = pr;
-  add_to_pending_messages_for_peer (cp, pm, pr);
-}
-
-
-/**
- * Closure used for "target_peer_select_cb".
- */
-struct PeerSelectionContext 
-{
-  /**
-   * The request for which we are selecting
-   * peers.
-   */
-  struct PendingRequest *pr;
-
-  /**
-   * Current "prime" target.
-   */
-  struct GNUNET_PeerIdentity target;
-
-  /**
-   * How much do we like this target?
-   */
-  double target_score;
-
-};
-
-
-/**
- * Function called for each connected peer to determine
- * which one(s) would make good targets for forwarding.
- *
- * @param cls closure (struct PeerSelectionContext)
- * @param key current key code (peer identity)
- * @param value value in the hash map (struct ConnectedPeer)
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-target_peer_select_cb (void *cls,
-                      const GNUNET_HashCode * key,
-                      void *value)
-{
-  struct PeerSelectionContext *psc = cls;
-  struct ConnectedPeer *cp = value;
-  struct PendingRequest *pr = psc->pr;
-  double score;
-  unsigned int i;
-  unsigned int pc;
-
-  /* 1) check that this peer is not the initiator */
-  if (cp == pr->cp)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Skipping initiator in forwarding selection\n");
-#endif
-      return GNUNET_YES; /* skip */       
-    }
-
-  /* 2) check if we have already (recently) forwarded to this peer */
-  pc = 0;
-  for (i=0;i<pr->used_pids_off;i++)
-    if (pr->used_pids[i] == cp->pid) 
-      {
-       pc++;
-       if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                          RETRY_PROBABILITY_INV))
-         {
-#if DEBUG_FS
-           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                       "NOT re-trying query that was previously transmitted %u times\n",
-                       (unsigned int) pr->used_pids_off);
-#endif
-           return GNUNET_YES; /* skip */
-         }
-      }
-#if DEBUG_FS
-  if (0 < pc)
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-               "Re-trying query that was previously transmitted %u times to this peer\n",
-               (unsigned int) pc);
-#endif
-  /* 3) calculate how much we'd like to forward to this peer,
-     starting with a random value that is strong enough
-     to at least give any peer a chance sometimes 
-     (compared to the other factors that come later) */
-  /* 3a) count successful (recent) routes from cp for same source */
-  if (pr->cp != NULL)
-    {
-      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                       P2P_SUCCESS_LIST_SIZE);
-      for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
-       if (cp->last_p2p_replies[i] == pr->cp->pid)
-         score += 1; /* likely successful based on hot path */
-    }
-  else
-    {
-      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                       CS2P_SUCCESS_LIST_SIZE);
-      for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
-       if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
-         score += 1; /* likely successful based on hot path */
-    }
-  /* 3b) include latency */
-  if (cp->avg_delay.value < 4 * TTL_DECREMENT)
-    score += 1; /* likely fast based on latency */
-  /* 3c) include priorities */
-  if (cp->avg_priority <= pr->remaining_priority / 2.0)
-    score += 1; /* likely successful based on priorities */
-  /* 3d) penalize for queue size */  
-  score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
-  /* 3e) include peer proximity */
-  score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
-                                                   &pr->query)) / (double) UINT32_MAX);
-  /* store best-fit in closure */
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Peer `%s' gets score %f for forwarding query, max is %f\n",
-             GNUNET_h2s (key),
-             score,
-             psc->target_score);
-#endif  
-  score++; /* avoid zero */
-  if (score > psc->target_score)
-    {
-      psc->target_score = score;
-      psc->target.hashPubKey = *key; 
-    }
-  return GNUNET_YES;
-}
-  
-
-/**
- * The priority level imposes a bound on the maximum
- * value for the ttl that can be requested.
- *
- * @param ttl_in requested ttl
- * @param prio given priority
- * @return ttl_in if ttl_in is below the limit,
- *         otherwise the ttl-limit for the given priority
- */
-static int32_t
-bound_ttl (int32_t ttl_in, uint32_t prio)
-{
-  unsigned long long allowed;
-
-  if (ttl_in <= 0)
-    return ttl_in;
-  allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
-  if (ttl_in > allowed)      
-    {
-      if (allowed >= (1 << 30))
-        return 1 << 30;
-      return allowed;
-    }
-  return ttl_in;
-}
-
-
-/**
- * We're processing a GET request and have decided
- * to forward it to other peers.  This function is called periodically
- * and should forward the request to other peers until we have all
- * possible replies.  If we have transmitted the *only* reply to
- * the initiator we should destroy the pending request.  If we have
- * many replies in the queue to the initiator, we should delay sending
- * out more queries until the reply queue has shrunk some.
- *
- * @param cls our "struct ProcessGetContext *"
- * @param tc unused
- */
-static void
-forward_request_task (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct PendingRequest *pr = cls;
-  struct PeerSelectionContext psc;
-  struct ConnectedPeer *cp; 
-  struct GNUNET_TIME_Relative delay;
-
-  pr->task = GNUNET_SCHEDULER_NO_TASK;
-  if (pr->irc != NULL)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
-                 GNUNET_h2s (&pr->query));
-#endif
-      return; /* already pending */
-    }
-  if (GNUNET_YES == pr->local_only)
-    return; /* configured to not do P2P search */
-  /* (1) select target */
-  psc.pr = pr;
-  psc.target_score = -DBL_MAX;
-  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                        &target_peer_select_cb,
-                                        &psc);  
-  if (psc.target_score == -DBL_MAX)
-    {
-      delay = get_processing_delay ();
-#if DEBUG_FS 
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
-                 GNUNET_h2s (&pr->query),
-                 delay.value);
-#endif
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              delay,
-                                              &forward_request_task,
-                                              pr);
-      return; /* nobody selected */
-    }
-  /* (3) update TTL/priority */
-  if (pr->client_request_list != NULL)
-    {
-      /* FIXME: use better algorithm!? */
-      if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                        4))
-       pr->priority++;
-      /* FIXME: bound priority by "customary" priority used by other peers
-        at this time! */
-      pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
-                          pr->priority);
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Trying query `%s' with priority %u and TTL %d.\n",
-                 GNUNET_h2s (&pr->query),
-                 pr->priority,
-                 pr->ttl);
-#endif
-    }
-
-  /* (3) reserve reply bandwidth */
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &psc.target.hashPubKey);
-  GNUNET_assert (NULL != cp);
-  pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
-                                               &psc.target,
-                                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
-                                               DBLOCK_SIZE * 2, 
-                                               cp->inc_preference,
-                                               &target_reservation_cb,
-                                               pr);
-  cp->inc_preference = 0;
-}
-
-
-/* **************************** P2P PUT Handling ************************ */
-
-
-/**
- * Function called after we either failed or succeeded
- * at transmitting a reply to a peer.  
- *
- * @param cls the requests "struct PendingRequest*"
- * @param tpid ID of receiving peer, 0 on transmission error
- */
-static void
-transmit_reply_continuation (void *cls,
-                            GNUNET_PEER_Id tpid)
-{
-  struct PendingRequest *pr = cls;
-  
-  switch (pr->type)
-    {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-      /* only one reply expected, done with the request! */
-      destroy_pending_request (pr);
-      break;
-    case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-      break;
-    default:
-      GNUNET_break (0);
-      break;
-    }
-}
-
-
-/**
- * Transmit the given message by copying it to the target buffer
- * "buf".  "buf" will be NULL and "size" zero if the socket was closed
- * for writing in the meantime.  In that case, do nothing
- * (the disconnect or shutdown handler will take care of the rest).
- * If we were able to transmit messages and there are still more
- * pending, ask core again for further calls to this function.
- *
- * @param cls closure, pointer to the 'struct ClientList*'
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_to_client (void *cls,
-                 size_t size, void *buf)
-{
-  struct ClientList *cl = cls;
-  char *cbuf = buf;
-  struct ClientResponseMessage *creply;
-  size_t msize;
-  
-  cl->th = NULL;
-  if (NULL == buf)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Not sending reply, client communication problem.\n");
-#endif
-      return 0;
-    }
-  msize = 0;
-  while ( (NULL != (creply = cl->res_head) ) &&
-         (creply->msize <= size) )
-    {
-      memcpy (&cbuf[msize], &creply[1], creply->msize);
-      msize += creply->msize;
-      size -= creply->msize;
-      GNUNET_CONTAINER_DLL_remove (cl->res_head,
-                                  cl->res_tail,
-                                  creply);
-      GNUNET_free (creply);
-    }
-  if (NULL != creply)
-    cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
-                                                 creply->msize,
-                                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                                 &transmit_to_client,
-                                                 cl);
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitted %u bytes to client\n",
-             (unsigned int) msize);
-#endif
-  return msize;
-}
-
-
-/**
- * Closure for "process_reply" function.
- */
-struct ProcessReplyClosure
-{
-  /**
-   * The data for the reply.
-   */
-  const void *data;
-
-  /**
-   * Who gave us this reply? NULL for local host.
-   */
-  struct ConnectedPeer *sender;
-
-  /**
-   * When the reply expires.
-   */
-  struct GNUNET_TIME_Absolute expiration;
-
-  /**
-   * Size of data.
-   */
-  size_t size;
-
-  /**
-   * Namespace that this reply belongs to
-   * (if it is of type SBLOCK).
-   */
-  GNUNET_HashCode namespace;
-
-  /**
-   * Type of the block.
-   */
-  enum GNUNET_BLOCK_Type type;
-
-  /**
-   * How much was this reply worth to us?
-   */
-  uint32_t priority;
-
-  /**
-   * Did we finish processing the associated request?
-   */ 
-  int finished;
-};
-
-
-/**
- * We have received a reply; handle it!
- *
- * @param cls response (struct ProcessReplyClosure)
- * @param key our query
- * @param value value in the hash map (info about the query)
- * @return GNUNET_YES (we should continue to iterate)
- */
-static int
-process_reply (void *cls,
-              const GNUNET_HashCode * key,
-              void *value)
-{
-  struct ProcessReplyClosure *prq = cls;
-  struct PendingRequest *pr = value;
-  struct PendingMessage *reply;
-  struct ClientResponseMessage *creply;
-  struct ClientList *cl;
-  struct PutMessage *pm;
-  struct ConnectedPeer *cp;
-  struct GNUNET_TIME_Relative cur_delay;
-  GNUNET_HashCode chash;
-  GNUNET_HashCode mhash;
-  size_t msize;
-
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Matched result (type %u) for query `%s' with pending request\n",
-             (unsigned int) prq->type,
-             GNUNET_h2s (key));
-#endif  
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# replies received and matched"),
-                           1,
-                           GNUNET_NO);
-  if (prq->sender != NULL)
-    {
-      /* FIXME: should we be more precise here and not use
-        "start_time" but a peer-specific time stamp? */
-      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
-      prq->sender->avg_delay.value
-       = (prq->sender->avg_delay.value * 
-          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
-      prq->sender->avg_priority
-       = (prq->sender->avg_priority * 
-          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
-      if (pr->cp != NULL)
-       {
-         GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
-                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
-                                -1);
-         GNUNET_PEER_change_rc (pr->cp->pid, 1);
-         prq->sender->last_p2p_replies
-           [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
-           = pr->cp->pid;
-       }
-      else
-       {
-         if (NULL != prq->sender->last_client_replies
-             [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
-           GNUNET_SERVER_client_drop (prq->sender->last_client_replies
-                                      [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
-         prq->sender->last_client_replies
-           [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
-           = pr->client_request_list->client_list->client;
-         GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
-       }
-    }
-  GNUNET_CRYPTO_hash (prq->data,
-                     prq->size,
-                     &chash);
-  switch (prq->type)
-    {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-      /* only possible reply, stop requesting! */
-      while (NULL != pr->pending_head)
-       destroy_pending_message_list_entry (pr->pending_head);
-      if (pr->qe != NULL)
-       {
-         if (pr->client_request_list != NULL)
-           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
-                                       GNUNET_YES);
-         GNUNET_DATASTORE_cancel (pr->qe);
-         pr->qe = NULL;
-       }
-      pr->do_remove = GNUNET_YES;
-      if (pr->task != GNUNET_SCHEDULER_NO_TASK)
-       {
-         GNUNET_SCHEDULER_cancel (sched,
-                                  pr->task);
-         pr->task = GNUNET_SCHEDULER_NO_TASK;
-       }
-      GNUNET_break (GNUNET_YES ==
-                   GNUNET_CONTAINER_multihashmap_remove (query_request_map,
-                                                         key,
-                                                         pr));
-      break;
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-      if (pr->namespace == NULL)
-       {
-         GNUNET_break (0);
-         return GNUNET_YES;
-       }
-      if (0 != memcmp (pr->namespace,
-                      &prq->namespace,
-                      sizeof (GNUNET_HashCode)))
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Reply mismatched in terms of namespace.  Discarded.\n"));
-         return GNUNET_YES; /* wrong namespace */      
-       }
-      /* then: fall-through! */
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_NBLOCK:
-      if (pr->bf != NULL) 
-       {
-         mingle_hash (&chash, pr->mingle, &mhash);
-         if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
-                                                              &mhash))
-           {
-             GNUNET_STATISTICS_update (stats,
-                                       gettext_noop ("# duplicate replies discarded (bloomfilter)"),
-                                       1,
-                                       GNUNET_NO);
-#if DEBUG_FS
-             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                         "Duplicate response `%s', discarding.\n",
-                         GNUNET_h2s (&mhash));
-#endif
-             return GNUNET_YES; /* duplicate */
-           }
-#if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "New response `%s', adding to filter.\n",
-                     GNUNET_h2s (&mhash));
-#endif
-       }
-      if (pr->client_request_list != NULL)
-       {
-         if (pr->replies_seen_size == pr->replies_seen_off)
-           GNUNET_array_grow (pr->replies_seen,
-                              pr->replies_seen_size,
-                              pr->replies_seen_size * 2 + 4);  
-           pr->replies_seen[pr->replies_seen_off++] = chash;         
-       }
-      if ( (pr->bf == NULL) ||
-          (pr->client_request_list != NULL) )
-       refresh_bloomfilter (pr);
-      GNUNET_CONTAINER_bloomfilter_add (pr->bf,
-                                       &mhash);
-      break;
-    default:
-      GNUNET_break (0);
-      return GNUNET_YES;
-    }
-  prq->priority += pr->remaining_priority;
-  pr->remaining_priority = 0;
-  if (NULL != pr->client_request_list)
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# replies received for local clients"),
-                               1,
-                               GNUNET_NO);
-      cl = pr->client_request_list->client_list;
-      msize = sizeof (struct PutMessage) + prq->size;
-      creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
-      creply->msize = msize;
-      creply->client_list = cl;
-      GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
-                                        cl->res_tail,
-                                        cl->res_tail,
-                                        creply);      
-      pm = (struct PutMessage*) &creply[1];
-      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-      pm->header.size = htons (msize);
-      pm->type = htonl (prq->type);
-      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
-      memcpy (&pm[1], prq->data, prq->size);      
-      if (NULL == cl->th)
-       {
-#if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Transmitting result for query `%s' to client\n",
-                     GNUNET_h2s (key));
-#endif  
-         cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
-                                                       msize,
-                                                       GNUNET_TIME_UNIT_FOREVER_REL,
-                                                       &transmit_to_client,
-                                                       cl);
-       }
-      GNUNET_break (cl->th != NULL);
-      if (pr->do_remove)               
-       {
-         prq->finished = GNUNET_YES;
-         destroy_pending_request (pr);         
-       }
-    }
-  else
-    {
-      cp = pr->cp;
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Transmitting result for query `%s' to other peer (PID=%u)\n",
-                 GNUNET_h2s (key),
-                 (unsigned int) cp->pid);
-#endif  
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# replies received for other peers"),
-                               1,
-                               GNUNET_NO);
-      msize = sizeof (struct PutMessage) + prq->size;
-      reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
-      reply->cont = &transmit_reply_continuation;
-      reply->cont_cls = pr;
-      reply->msize = msize;
-      reply->priority = UINT32_MAX; /* send replies first! */
-      pm = (struct PutMessage*) &reply[1];
-      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
-      pm->header.size = htons (msize);
-      pm->type = htonl (prq->type);
-      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
-      memcpy (&pm[1], prq->data, prq->size);
-      add_to_pending_messages_for_peer (cp, reply, pr);
-    }
-  return GNUNET_YES;
-}
-
-
-/**
- * Continuation called to notify client about result of the
- * operation.
- *
- * @param cls closure
- * @param success GNUNET_SYSERR on failure
- * @param msg NULL on success, otherwise an error message
- */
-static void 
-put_migration_continuation (void *cls,
-                           int success,
-                           const char *msg)
-{
-  /* FIXME */
-}
-
-
-/**
- * Handle P2P "PUT" message.
- *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-handle_p2p_put (void *cls,
-               const struct GNUNET_PeerIdentity *other,
-               const struct GNUNET_MessageHeader *message,
-               struct GNUNET_TIME_Relative latency,
-               uint32_t distance)
-{
-  const struct PutMessage *put;
-  uint16_t msize;
-  size_t dsize;
-  enum GNUNET_BLOCK_Type type;
-  struct GNUNET_TIME_Absolute expiration;
-  GNUNET_HashCode query;
-  struct ProcessReplyClosure prq;
-  const struct SBlock *sb;
-  struct ConnectedPeer *cps;
-
-  msize = ntohs (message->size);
-  if (msize < sizeof (struct PutMessage))
-    {
-      GNUNET_break_op(0);
-      return GNUNET_SYSERR;
-    }
-  put = (const struct PutMessage*) message;
-  dsize = msize - sizeof (struct PutMessage);
-  type = ntohl (put->type);
-  expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
-
-  if (GNUNET_OK !=
-      GNUNET_BLOCK_check_block (type,
-                               &put[1],
-                               dsize,
-                               &query))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
-    return GNUNET_SYSERR;
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
-    { 
-      sb = (const struct SBlock*) &put[1];
-      GNUNET_CRYPTO_hash (&sb->subspace,
-                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                         &prq.namespace);
-    }
-
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received result for query `%s' from peer `%4s'\n",
-             GNUNET_h2s (&query),
-             GNUNET_i2s (other));
-#endif
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# replies received (overall)"),
-                           1,
-                           GNUNET_NO);
-  /* now, lookup 'query' */
-  prq.data = (const void*) &put[1];
-  if (other != NULL)
-    prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                                   &other->hashPubKey);
-  prq.size = dsize;
-  prq.type = type;
-  prq.expiration = expiration;
-  prq.priority = 0;
-  prq.finished = GNUNET_NO;
-  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
-                                             &query,
-                                             &process_reply,
-                                             &prq);
-  cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                          &other->hashPubKey);
-  if (cps != NULL)
-    {
-      cps->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
-      cps->trust_delta += prq.priority;
-    }
-  else
-    {
-      GNUNET_break (0);
-    }
-  if (GNUNET_YES == active_migration)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Replicating result for query `%s' with priority %u\n",
-                 GNUNET_h2s (&query),
-                 prq.priority);
-#endif
-      GNUNET_DATASTORE_put (dsh,
-                           0, &query, dsize, &put[1],
-                           type, prq.priority, 1 /* anonymity */, 
-                           expiration, 
-                           1 + prq.priority, MAX_DATASTORE_QUEUE,
-                           GNUNET_CONSTANTS_SERVICE_TIMEOUT,
-                           &put_migration_continuation, 
-                           NULL);
-    }
-  return GNUNET_OK;
-}
-
-
-/* **************************** P2P GET Handling ************************ */
-
-
-/**
- * Closure for 'check_duplicate_request_{peer,client}'.
- */
-struct CheckDuplicateRequestClosure
-{
-  /**
-   * The new request we should check if it already exists.
-   */
-  const struct PendingRequest *pr;
-
-  /**
-   * Existing request found by the checker, NULL if none.
-   */
-  struct PendingRequest *have;
-};
-
-
-/**
- * Iterator over entries in the 'query_request_map' that
- * tries to see if we have the same request pending from
- * the same client already.
- *
- * @param cls closure (our 'struct CheckDuplicateRequestClosure')
- * @param key current key code (query, ignored, must match)
- * @param value value in the hash map (a 'struct PendingRequest' 
- *              that already exists)
- * @return GNUNET_YES if we should continue to
- *         iterate (no match yet)
- *         GNUNET_NO if not (match found).
- */
-static int
-check_duplicate_request_client (void *cls,
-                               const GNUNET_HashCode * key,
-                               void *value)
-{
-  struct CheckDuplicateRequestClosure *cdc = cls;
-  struct PendingRequest *have = value;
-
-  if (have->client_request_list == NULL)
-    return GNUNET_YES;
-  if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
-       (cdc->pr != have) )
-    {
-      cdc->have = have;
-      return GNUNET_NO;
-    }
-  return GNUNET_YES;
-}
-
-
-/**
- * We're processing (local) results for a search request
- * from another peer.  Pass applicable results to the
- * peer and if we are done either clean up (operation
- * complete) or forward to other peers (more results possible).
- *
- * @param cls our closure (struct LocalGetContext)
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
- */
-static void
-process_local_reply (void *cls,
-                    const GNUNET_HashCode * key,
-                    uint32_t size,
-                    const void *data,
-                    enum GNUNET_BLOCK_Type type,
-                    uint32_t priority,
-                    uint32_t anonymity,
-                    struct GNUNET_TIME_Absolute
-                    expiration, 
-                    uint64_t uid)
-{
-  struct PendingRequest *pr = cls;
-  struct ProcessReplyClosure prq;
-  struct CheckDuplicateRequestClosure cdrc;
-  const struct SBlock *sb;
-  GNUNET_HashCode dhash;
-  GNUNET_HashCode mhash;
-  GNUNET_HashCode query;
-  
-  if (NULL == key)
-    {
-#if DEBUG_FS > 1
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Done processing local replies, forwarding request to other peers.\n");
-#endif
-      pr->qe = NULL;
-      if (pr->client_request_list != NULL)
-       {
-         GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
-                                     GNUNET_YES);
-         /* Figure out if this is a duplicate request and possibly
-            merge 'struct PendingRequest' entries */
-         cdrc.have = NULL;
-         cdrc.pr = pr;
-         GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
-                                                     &pr->query,
-                                                     &check_duplicate_request_client,
-                                                     &cdrc);
-         if (cdrc.have != NULL)
-           {
-#if DEBUG_FS
-             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                         "Received request for block `%s' twice from client, will only request once.\n",
-                         GNUNET_h2s (&pr->query));
-#endif
-             
-             destroy_pending_request (pr);
-             return;
-           }
-       }
-
-      /* no more results */
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                            &forward_request_task,
-                                            pr);      
-      return;
-    }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "New local response to `%s' of type %u.\n",
-             GNUNET_h2s (key),
-             type);
-#endif
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Found ONDEMAND block, performing on-demand encoding\n");
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# on-demand blocks matched requests"),
-                               1,
-                               GNUNET_NO);
-      if (GNUNET_OK != 
-         GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
-                                           anonymity, expiration, uid, 
-                                           &process_local_reply,
-                                           pr))
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-  /* check for duplicates */
-  GNUNET_CRYPTO_hash (data, size, &dhash);
-  mingle_hash (&dhash, 
-              pr->mingle,
-              &mhash);
-  if ( (pr->bf != NULL) &&
-       (GNUNET_YES ==
-       GNUNET_CONTAINER_bloomfilter_test (pr->bf,
-                                          &mhash)) )
-    {      
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result from datastore filtered by bloomfilter (duplicate).\n");
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# results filtered by query bloomfilter"),
-                               1,
-                               GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Found result for query `%s' in local datastore\n",
-             GNUNET_h2s (key));
-#endif
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# results found locally"),
-                           1,
-                           GNUNET_NO);
-  pr->results_found++;
-  memset (&prq, 0, sizeof (prq));
-  prq.data = data;
-  prq.expiration = expiration;
-  prq.size = size;  
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
-    { 
-      sb = (const struct SBlock*) data;
-      GNUNET_CRYPTO_hash (&sb->subspace,
-                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                         &prq.namespace);
-    }
-  if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
-                                            data,
-                                            size,
-                                            &query))
-    {
-      GNUNET_break (0);
-      GNUNET_DATASTORE_remove (dsh,
-                              key,
-                              size, data,
-                              -1, -1, 
-                              GNUNET_TIME_UNIT_FOREVER_REL,
-                              NULL, NULL);
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-  prq.type = type;
-  prq.priority = priority;  
-  prq.finished = GNUNET_NO;
-  process_reply (&prq, key, pr);
-  if (prq.finished == GNUNET_YES)
+  GSF_cfg = cfg;
+  GSF_enable_randomized_delays =
+      GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY");
+  GSF_dsh = GNUNET_DATASTORE_connect (cfg);
+  if (NULL == GSF_dsh)
+  {
+    GNUNET_SCHEDULER_shutdown ();
     return;
     return;
-  if (pr->qe == NULL)
-    return; /* done here */
-  if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
-    {
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
-      return;
-    }
-  if ( (pr->client_request_list == NULL) &&
-       ( (GNUNET_YES == test_load_too_high()) ||
-        (pr->results_found > 5 + 2 * pr->priority) ) )
-    {
-#if DEBUG_FS > 2
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Load too high, done with request\n");
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# processing result set cut short due to load"),
-                               1,
-                               GNUNET_NO);
-      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
-      return;
-    }
-  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-}
-
-
-/**
- * We've received a request with the specified priority.  Bound it
- * according to how much we trust the given peer.
- * 
- * @param prio_in requested priority
- * @param cp the peer making the request
- * @return effective priority
- */
-static uint32_t
-bound_priority (uint32_t prio_in,
-               struct ConnectedPeer *cp)
-{
-  if (cp->trust_delta > prio_in)
-    {
-      cp->trust_delta -= prio_in;
-      return prio_in;
-    }
-  // FIXME: get out trust in the target peer from peerinfo!
-  return 0; 
-}
-
-
-/**
- * Iterator over entries in the 'query_request_map' that
- * tries to see if we have the same request pending from
- * the same peer already.
- *
- * @param cls closure (our 'struct CheckDuplicateRequestClosure')
- * @param key current key code (query, ignored, must match)
- * @param value value in the hash map (a 'struct PendingRequest' 
- *              that already exists)
- * @return GNUNET_YES if we should continue to
- *         iterate (no match yet)
- *         GNUNET_NO if not (match found).
- */
-static int
-check_duplicate_request_peer (void *cls,
-                             const GNUNET_HashCode * key,
-                             void *value)
-{
-  struct CheckDuplicateRequestClosure *cdc = cls;
-  struct PendingRequest *have = value;
-
-  if (cdc->pr->target_pid == have->target_pid)
-    {
-      cdc->have = have;
-      return GNUNET_NO;
-    }
-  return GNUNET_YES;
-}
-
-
-/**
- * Handle P2P "GET" request.
- *
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- *        for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other' 
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-handle_p2p_get (void *cls,
-               const struct GNUNET_PeerIdentity *other,
-               const struct GNUNET_MessageHeader *message,
-               struct GNUNET_TIME_Relative latency,
-               uint32_t distance)
-{
-  struct PendingRequest *pr;
-  struct ConnectedPeer *cp;
-  struct ConnectedPeer *cps;
-  struct CheckDuplicateRequestClosure cdc;
-  struct GNUNET_TIME_Relative timeout;
-  uint16_t msize;
-  const struct GetMessage *gm;
-  unsigned int bits;
-  const GNUNET_HashCode *opt;
-  uint32_t bm;
-  size_t bfsize;
-  uint32_t ttl_decrement;
-  enum GNUNET_BLOCK_Type type;
-  int have_ns;
-
-  msize = ntohs(message->size);
-  if (msize < sizeof (struct GetMessage))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  gm = (const struct GetMessage*) message;
-  type = ntohl (gm->type);
-  switch (type)
-    {
-    case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-      break;
-    default:
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  bm = ntohl (gm->hash_bitmap);
-  bits = 0;
-  while (bm > 0)
-    {
-      if (1 == (bm & 1))
-       bits++;
-      bm >>= 1;
-    }
-  if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }  
-  opt = (const GNUNET_HashCode*) &gm[1];
-  bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
-  bm = ntohl (gm->hash_bitmap);
-  if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
-       (type != GNUNET_BLOCK_TYPE_SBLOCK) )
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;      
-    }
-  bits = 0;
-  cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                          &other->hashPubKey);
-  if (NULL == cps)
-    {
-      /* peer must have just disconnected */
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests dropped due to initiator not being connected"),
-                               1,
-                               GNUNET_NO);
-      return GNUNET_SYSERR;
-    }
-  if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
-    cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                           &opt[bits++]);
-  else
-    cp = cps;
-  if (cp == NULL)
-    {
-#if DEBUG_FS
-      if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
-                   GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
-      
-      else
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
-                   GNUNET_i2s (other));
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests dropped due to missing reverse route"),
-                               1,
-                               GNUNET_NO);
-     /* FIXME: try connect? */
-      return GNUNET_OK;
-    }
-  /* note that we can really only check load here since otherwise
-     peers could find out that we are overloaded by not being
-     disconnected after sending us a malformed query... */
-  if (GNUNET_YES == test_load_too_high ())
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping query from `%s', this peer is too busy.\n",
-                 GNUNET_i2s (other));
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests dropped due to high load"),
-                               1,
-                               GNUNET_NO);
-      return GNUNET_OK;
-    }
-
-#if DEBUG_FS 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
-             GNUNET_h2s (&gm->query),
-             (unsigned int) type,
-             GNUNET_i2s (other),
-             (unsigned int) bm);
-#endif
-  have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
-  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     (have_ns ? sizeof(GNUNET_HashCode) : 0));
-  if (have_ns)
-    {
-      pr->namespace = (GNUNET_HashCode*) &pr[1];
-      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
-    }
-  pr->type = type;
-  pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
-    pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
-
-  pr->anonymity_level = 1;
-  pr->priority = bound_priority (ntohl (gm->priority), cps);
-  pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
-  pr->query = gm->query;
-  /* decrement ttl (always) */
-  ttl_decrement = 2 * TTL_DECREMENT +
-    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                             TTL_DECREMENT);
-  if ( (pr->ttl < 0) &&
-       (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
-                 GNUNET_i2s (other),
-                 pr->ttl,
-                 ttl_decrement);
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests dropped due TTL underflow"),
-                               1,
-                               GNUNET_NO);
-      /* integer underflow => drop (should be very rare)! */      
-      GNUNET_free (pr);
-      return GNUNET_OK;
-    } 
-  pr->ttl -= ttl_decrement;
-  pr->start_time = GNUNET_TIME_absolute_get ();
-
-  /* get bloom filter */
-  if (bfsize > 0)
-    {
-      pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
-                                                 bfsize,
-                                                 BLOOMFILTER_K);
-      pr->bf_size = bfsize;
-    }
-
-  cdc.have = NULL;
-  cdc.pr = pr;
-  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
-                                             &gm->query,
-                                             &check_duplicate_request_peer,
-                                             &cdc);
-  if (cdc.have != NULL)
-    {
-      if (cdc.have->start_time.value + cdc.have->ttl >=
-         pr->start_time.value + pr->ttl)
-       {
-         /* existing request has higher TTL, drop new one! */
-         cdc.have->priority += pr->priority;
-         destroy_pending_request (pr);
-#if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Have existing request with higher TTL, dropping new request.\n",
-                     GNUNET_i2s (other));
-#endif
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests dropped due to higher-TTL request"),
-                                   1,
-                                   GNUNET_NO);
-         return GNUNET_OK;
-       }
-      else
-       {
-         /* existing request has lower TTL, drop old one! */
-         pr->priority += cdc.have->priority;
-         /* Possible optimization: if we have applicable pending
-            replies in 'cdc.have', we might want to move those over
-            (this is a really rare special-case, so it is not clear
-            that this would be worth it) */
-         destroy_pending_request (cdc.have);
-         /* keep processing 'pr'! */
-       }
-    }
-
-  pr->cp = cp;
-  GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (query_request_map,
-                                                  &gm->query,
-                                                  pr,
-                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (peer_request_map,
-                                                  &other->hashPubKey,
-                                                  pr,
-                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  
-  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
-                                           pr,
-                                           pr->start_time.value + pr->ttl);
-
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# P2P searches received"),
-                           1,
-                           GNUNET_NO);
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# P2P searches active"),
-                           1,
-                           GNUNET_NO);
-
-  /* calculate change in traffic preference */
-  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
-  /* process locally */
-  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
-    type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
-  timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
-                                          (pr->priority + 1)); 
-  pr->qe = GNUNET_DATASTORE_get (dsh,
-                                &gm->query,
-                                type,                         
-                                pr->priority + 1,
-                                MAX_DATASTORE_QUEUE,                            
-                                timeout,
-                                &process_local_reply,
-                                pr);
-
-  /* Are multiple results possible?  If so, start processing remotely now! */
-  switch (pr->type)
-    {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-      /* only one result, wait for datastore */
-      break;
-    default:
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (sched,
-                                            &forward_request_task,
-                                            pr);
-    }
-
-  /* make sure we don't track too many requests */
-  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
-    {
-      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
-      destroy_pending_request (pr);
-    }
-  return GNUNET_OK;
-}
-
-
-/* **************************** CS GET Handling ************************ */
-
-
-/**
- * Handle START_SEARCH-message (search request from client).
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_start_search (void *cls,
-                    struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *message)
-{
-  static GNUNET_HashCode all_zeros;
-  const struct SearchMessage *sm;
-  struct ClientList *cl;
-  struct ClientRequestList *crl;
-  struct PendingRequest *pr;
-  uint16_t msize;
-  unsigned int sc;
-  enum GNUNET_BLOCK_Type type;
-
-  msize = ntohs (message->size);
-  if ( (msize < sizeof (struct SearchMessage)) ||
-       (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
-    {
-      GNUNET_break (0);
-      GNUNET_SERVER_receive_done (client,
-                                 GNUNET_SYSERR);
-      return;
-    }
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# client searches received"),
-                           1,
-                           GNUNET_NO);
-  sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
-  sm = (const struct SearchMessage*) message;
-  type = ntohl (sm->type);
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received request for `%s' of type %u from local client\n",
-             GNUNET_h2s (&sm->query),
-             (unsigned int) type);
-#endif
-  switch (type)
-    {
-    case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-    case GNUNET_BLOCK_TYPE_NBLOCK:
-      break;
-    default:
-      GNUNET_break (0);
-      GNUNET_SERVER_receive_done (client,
-                                 GNUNET_SYSERR);
-      return;
-    }  
-
-  cl = client_list;
-  while ( (cl != NULL) &&
-         (cl->client != client) )
-    cl = cl->next;
-  if (cl == NULL)
-    {
-      cl = GNUNET_malloc (sizeof (struct ClientList));
-      cl->client = client;
-      GNUNET_SERVER_client_keep (client);
-      cl->next = client_list;
-      client_list = cl;
-    }
-  /* detect duplicate KBLOCK requests */
-  if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_ANY) )
-    {
-      crl = cl->rl_head;
-      while ( (crl != NULL) &&
-             ( (0 != memcmp (&crl->req->query,
-                             &sm->query,
-                             sizeof (GNUNET_HashCode))) ||
-               (crl->req->type != type) ) )
-       crl = crl->next;
-      if (crl != NULL)         
-       { 
-#if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Have existing request, merging content-seen lists.\n");
-#endif
-         pr = crl->req;
-         /* Duplicate request (used to send long list of
-            known/blocked results); merge 'pr->replies_seen'
-            and update bloom filter */
-         GNUNET_array_grow (pr->replies_seen,
-                            pr->replies_seen_size,
-                            pr->replies_seen_off + sc);
-         memcpy (&pr->replies_seen[pr->replies_seen_off],
-                 &sm[1],
-                 sc * sizeof (GNUNET_HashCode));
-         pr->replies_seen_off += sc;
-         refresh_bloomfilter (pr);
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# client searches updated (merged content seen list)"),
-                                   1,
-                                   GNUNET_NO);
-         GNUNET_SERVER_receive_done (client,
-                                     GNUNET_OK);
-         return;
-       }
-    }
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# client searches active"),
-                           1,
-                           GNUNET_NO);
-  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
-  crl = GNUNET_malloc (sizeof (struct ClientRequestList));
-  memset (crl, 0, sizeof (struct ClientRequestList));
-  crl->client_list = cl;
-  GNUNET_CONTAINER_DLL_insert (cl->rl_head,
-                              cl->rl_tail,
-                              crl);  
-  crl->req = pr;
-  pr->type = type;
-  pr->client_request_list = crl;
-  GNUNET_array_grow (pr->replies_seen,
-                    pr->replies_seen_size,
-                    sc);
-  memcpy (pr->replies_seen,
-         &sm[1],
-         sc * sizeof (GNUNET_HashCode));
-  pr->replies_seen_off = sc;
-  pr->anonymity_level = ntohl (sm->anonymity_level); 
-  refresh_bloomfilter (pr);
-  pr->query = sm->query;
-  if (0 == (1 & ntohl (sm->options)))
-    pr->local_only = GNUNET_NO;
-  else
-    pr->local_only = GNUNET_YES;
-  switch (type)
-    {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
-      if (0 != memcmp (&sm->target,
-                      &all_zeros,
-                      sizeof (GNUNET_HashCode)))
-       pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
-      break;
-    case GNUNET_BLOCK_TYPE_SBLOCK:
-      pr->namespace = (GNUNET_HashCode*) &pr[1];
-      memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
-      break;
-    default:
-      break;
-    }
-  GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (query_request_map,
-                                                  &sm->query,
-                                                  pr,
-                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
-    type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
-  pr->qe = GNUNET_DATASTORE_get (dsh,
-                                &sm->query,
-                                type,
-                                -3, -1,
-                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
-                                &process_local_reply,
-                                pr);
-}
-
-
-/* **************************** Startup ************************ */
-
-/**
- * Process fs requests.
- *
- * @param s scheduler to use
- * @param server the initialized server
- * @param c configuration to use
- */
-static int
-main_init (struct GNUNET_SCHEDULER_Handle *s,
-          struct GNUNET_SERVER_Handle *server,
-          const struct GNUNET_CONFIGURATION_Handle *c)
-{
-  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-    {
-      { &handle_p2p_get, 
-       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-      { &handle_p2p_put, 
-       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-      { NULL, 0, 0 }
-    };
-  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    {&GNUNET_FS_handle_index_start, NULL, 
-     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-    {&GNUNET_FS_handle_index_list_get, NULL, 
-     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-     sizeof (struct UnindexMessage) },
-    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-     0 },
-    {NULL, NULL, 0, 0}
-  };
-
-  sched = s;
-  cfg = c;
-  stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
-  min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
-  connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
-  requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
-  core = GNUNET_CORE_connect (sched,
-                             cfg,
-                             GNUNET_TIME_UNIT_FOREVER_REL,
-                             NULL,
-                             NULL,
-                             &peer_connect_handler,
-                             &peer_disconnect_handler,
-                             NULL, GNUNET_NO,
-                             NULL, GNUNET_NO,
-                             p2p_handlers);
-  if (NULL == core)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 _("Failed to connect to `%s' service.\n"),
-                 "core");
-      GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
-      connected_peers = NULL;
-      GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
-      query_request_map = NULL;
-      GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
-      requests_by_expiration_heap = NULL;
-      GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
-      peer_request_map = NULL;
-      if (dsh != NULL)
-       {
-         GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
-         dsh = NULL;
-       }
-      return GNUNET_SYSERR;
-    }
-  /* FIXME: distinguish between sending and storing in options? */
-  if (active_migration) 
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                 _("Content migration is enabled, will start to gather data\n"));
-      consider_migration_gathering ();
-    }
-  GNUNET_SERVER_disconnect_notify (server, 
-                                  &handle_client_disconnect,
-                                  NULL);
-  GNUNET_SERVER_add_handlers (server, handlers);
-  GNUNET_SCHEDULER_add_delayed (sched,
-                               GNUNET_TIME_UNIT_FOREVER_REL,
-                               &shutdown_task,
-                               NULL);
-  return GNUNET_OK;
-}
-
-
-/**
- * Process fs requests.
- *
- * @param cls closure
- * @param sched scheduler to use
- * @param server the initialized server
- * @param cfg configuration to use
- */
-static void
-run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *sched,
-     struct GNUNET_SERVER_Handle *server,
-     const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-  active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
-                                                          "FS",
-                                                          "ACTIVEMIGRATION");
-  dsh = GNUNET_DATASTORE_connect (cfg,
-                                 sched);
-  if (dsh == NULL)
-    {
-      GNUNET_SCHEDULER_shutdown (sched);
-      return;
-    }
-  if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
-       (GNUNET_OK != main_init (sched, server, cfg)) )
-    {    
-      GNUNET_SCHEDULER_shutdown (sched);
-      GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
-      dsh = NULL;
-      return;   
-    }
+  }
+  GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
+  GSF_stats = GNUNET_STATISTICS_create ("fs", cfg);
+  block_cfg = GNUNET_CONFIGURATION_create ();
+  GNUNET_CONFIGURATION_set_value_string (block_cfg, "block", "PLUGINS", "fs");
+  GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg);
+  GNUNET_assert (NULL != GSF_block_ctx);
+  GSF_dht = GNUNET_DHT_connect (cfg, FS_DHT_HT_SIZE);
+  GSF_plan_init ();
+  GSF_pending_request_init_ ();
+  GSF_connected_peer_init_ ();
+  GSF_push_init_ ();
+  GSF_put_init_ ();
+  if ((GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) ||
+      (GNUNET_OK != main_init (server, cfg)))
+  {
+    GNUNET_SCHEDULER_shutdown ();
+    shutdown_task (NULL, NULL);
+    return;
+  }
 }
 
 
 }
 
 
@@ -3603,11 +647,8 @@ int
 main (int argc, char *const *argv)
 {
   return (GNUNET_OK ==
 main (int argc, char *const *argv)
 {
   return (GNUNET_OK ==
-          GNUNET_SERVICE_run (argc,
-                              argv,
-                              "fs",
-                             GNUNET_SERVICE_OPTION_NONE,
-                             &run, NULL)) ? 0 : 1;
+          GNUNET_SERVICE_run (argc, argv, "fs", GNUNET_SERVICE_OPTION_NONE,
+                              &run, NULL)) ? 0 : 1;
 }
 
 /* end of gnunet-service-fs.c */
 }
 
 /* end of gnunet-service-fs.c */