types
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 5237cee042d38fd239e2445b50351f4e34e1b5b0..b9a2dfa79e3d13b8ce65ab8593288799cb4e758b 100644 (file)
  * @author Christian Grothoff
  *
  * TODO:
- * - tracking of PendingRequests (and defining that struct...)
- * - setup P2P search on CS request
- * - setup P2P search on P2P GET
- * - forward replies based on tracked requests
  * - validation of KBLOCKS (almost done)
  * - validation of SBLOCKS
  * - validation of KSBLOCKS
- * - content migration (put in local DS)
- * - possible major issue: we may
- *   queue "gazillions" of (K|S)Blocks for the
- *   core to transmit to another peer; need
- *   to make sure this is bounded overall...
+ * - actually DO P2P search upon P2P/CS requests (pass appropriate handler to core
+ *   and work through request list there; also update ttl/priority for our client's requests)
+ * - randomly delay processing for improved anonymity (can wait)
+ * - content migration (put in local DS) (can wait)
+ * - check that we decrement PIDs always where necessary (can wait)
+ * - handle some special cases when forwarding replies based on tracked requests (can wait)
+ * - tracking of success correlations for hot-path routing (can wait)
+ * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
+ *   core to transmit to another peer; need to make sure this is bounded overall...
  * - various load-based actions (can wait)
  * - remove on-demand blocks if they keep failing (can wait)
  */
@@ -202,13 +202,14 @@ struct LocalGetContext
    * level is >0 we may happen to know the responder's identity;
    * nevertheless, we should probably not use it for a DHT-lookup
    * or similar blunt actions in order to avoid exposing ourselves).
-   * <p>
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
    * If the request is for an SBLOCK, this is the identity of the
    * pseudonym to which the SBLOCK belongs. 
-   * <p>
-   * If the request is for a KBLOCK, "target" must be all zeros.
    */
-  GNUNET_HashCode target;
+  GNUNET_HashCode namespace;
 
   /**
    * Hash of the keyword (aka query) for KBLOCKs; Hash of
@@ -341,6 +342,32 @@ struct ProcessGetContext
 };
 
 
+/**
+ * Information we keep for each pending reply.
+ */
+struct PendingReply
+{
+  /**
+   * This is a linked list.
+   */
+  struct PendingReply *next;
+
+  /**
+   * Size of the reply; actual reply message follows
+   * at the end of this struct.
+   */
+  size_t msize;
+
+};
+
+
+/**
+ * All requests from a client are 
+ * kept in a doubly-linked list.
+ */
+struct ClientRequestList;
+
+
 /**
  * Information we keep for each pending request.  We should try to
  * keep this struct as small as possible since its memory consumption
@@ -355,6 +382,13 @@ struct PendingRequest
    */
   struct GNUNET_SERVER_Client *client;
 
+  /**
+   * If this request was made by a client,
+   * this is our entry in the client request
+   * list; otherwise NULL.
+   */
+  struct ClientRequestList *crl_entry;
+
   /**
    * If this is a namespace query, pointer to the hash of the public
    * key of the namespace; otherwise NULL.
@@ -367,6 +401,25 @@ struct PendingRequest
    */
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
+  /**
+   * Replies that we have received but were unable to forward yet
+   * (typically non-null only if we have a pending transmission
+   * request with the client or the respective peer).
+   */
+  struct PendingReply *replies_pending;
+
+  /**
+   * Pending transmission request with the core service for the target
+   * peer (for processing of 'replies_pending').
+   */
+  struct GNUNET_CORE_TransmitHandle *cth;
+
+  /**
+   * Pending transmission request for the target client (for processing of
+   * 'replies_pending').
+   */
+  struct GNUNET_CONNECTION_TransmitHandle *th;
+
   /**
    * Hash code of all replies that we have seen so far (only valid
    * if client is not NULL since we only track replies like this for
@@ -403,6 +456,11 @@ struct PendingRequest
    */
   GNUNET_PEER_Id *used_pids;
 
+  /**
+   * Desired anonymity level; only valid for requests from a local client.
+   */
+  uint32_t anonymity_level;
+
   /**
    * How many entries in "used_pids" are actually valid?
    */
@@ -437,6 +495,12 @@ struct PendingRequest
    */
   uint32_t remaining_priority;
 
+  /**
+   * Number to mingle hashes for bloom-filter
+   * tests with.
+   */
+  int32_t mingle;
+
   /**
    * TTL with which we saw this request (or, if we initiated, TTL that
    * we used for the request).
@@ -451,6 +515,60 @@ struct PendingRequest
 };
 
 
+/**
+ * All requests from a client are 
+ * kept in a doubly-linked list.
+ */
+struct ClientRequestList
+{
+  /**
+   * This is a doubly-linked list.
+   */
+  struct ClientRequestList *next;
+
+  /**
+   * This is a doubly-linked list.
+   */ 
+  struct ClientRequestList *prev;
+
+  /**
+   * A request from this client.
+   */
+  struct PendingRequest *req;
+
+};
+
+
+/**
+ * Linked list of all clients that we are 
+ * currently processing requests for.
+ */
+struct ClientList
+{
+
+  /**
+   * This is a linked list.
+   */
+  struct ClientList *next;
+
+  /**
+   * What client is this entry for?
+   */
+  struct GNUNET_SERVER_Client* client;
+
+  /**
+   * Head of the DLL of requests from this client.
+   */
+  struct ClientRequestList *head;
+
+  /**
+   * Tail of the DLL of requests from this client.
+   */
+  struct ClientRequestList *tail;
+
+};
+
+
 /**
  * Closure for "process_reply" function.
  */
@@ -483,12 +601,6 @@ struct ProcessReplyClosure
 };
 
 
-/**
- * Map from queries to pending requests ("struct PendingRequest") for
- * this query.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *request_map;
-
 /**
  * Our connection to the datastore.
  */
@@ -505,8 +617,7 @@ static struct GNUNET_SCHEDULER_Handle *sched;
 const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
- * Handle to the core service (NULL until we've
- * connected to it).
+ * Handle to the core service (NULL until we've connected to it).
  */
 struct GNUNET_CORE_Handle *core;
 
@@ -536,13 +647,42 @@ static struct DatastoreRequestQueue *drq_tail;
 static struct IndexInfo *indexed_files;
 
 /**
- * Maps hash over content of indexed files
- * to the respective filename.  The filenames
- * are pointers into the indexed_files linked
- * list and do not need to be freed.
+ * Maps hash over content of indexed files to the respective filename.
+ * The filenames are pointers into the indexed_files linked list and
+ * do not need to be freed.
  */
 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
 
+/**
+ * Map of query hash codes to requests.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
+
+/**
+ * Map of peer IDs to requests (for those requests coming
+ * from other peers).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
+
+/**
+ * Linked list of all of our clients and their requests.
+ */
+static struct ClientList *clients;
+
+/**
+ * Heap with the request that will expire next at the top.  Contains
+ * pointers of type "struct PendingRequest*"; these will *also* be
+ * aliased from the "requests_by_peer" data structures and the
+ * "requests_by_query" table.  Note that requests from our clients
+ * don't expire and are thus NOT in the "requests_by_expiration"
+ * (or the "requests_by_peer" tables).
+ */
+static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
+
+/**
+ * FIXME: set from configuration.
+ */
+static uint64_t max_pending_requests = 32;
 
 /**
  * Write the current index information list to disk.
@@ -1244,6 +1384,70 @@ handle_on_demand_block (const GNUNET_HashCode * key,
 }
 
 
+/**
+ * How many bytes should a bloomfilter be if we have already seen
+ * entry_count responses?  Note that BLOOMFILTER_K gives us the number
+ * of bits set per entry.  Furthermore, we should not re-size the
+ * filter too often (to keep it cheap).
+ *
+ * Since other peers will also add entries but not resize the filter,
+ * we should generally pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller or equal to 2^15.
+ */
+static unsigned int
+compute_bloomfilter_size (unsigned int entry_count)
+{
+  unsigned int size;
+  unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
+  uint16_t max = 1 << 15;
+
+  if (entry_count > max)
+    return max;
+  size = 8;
+  while ((size < max) && (size < ideal))
+    size *= 2;
+  if (size > max)
+    return max;
+  return size;
+}
+
+
+/**
+ * Recalculate our bloom filter for filtering replies.
+ *
+ * @param count number of entries we are filtering right now
+ * @param mingle set to our new mingling value
+ * @param entries the entries to filter
+ * @return updated bloomfilter, NULL for none
+ */
+static struct GNUNET_CONTAINER_BloomFilter *
+refresh_bloomfilter (unsigned int count,
+                    int32_t * mingle,
+                    const GNUNET_HashCode *entries)
+{
+  struct GNUNET_CONTAINER_BloomFilter *bf;
+  unsigned int nsize;
+  unsigned int i;
+  GNUNET_HashCode mhash;
+
+  if (0 == count)
+    return NULL;
+  nsize = compute_bloomfilter_size (count);
+  *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
+  bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
+                                         nsize,
+                                         BLOOMFILTER_K);
+  for (i=0;i<count;i++)
+    {
+      mingle_hash (&entries[i], *mingle, &mhash);
+      GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
+    }
+  return bf;
+}
+
+
 /**
  * We're processing (local) results for a search request
  * from a (local) client.  Pass applicable results to the
@@ -1274,6 +1478,9 @@ process_local_get_result (void *cls,
                          uint64_t uid)
 {
   struct LocalGetContext *lgc = cls;
+  struct PendingRequest *pr;
+  struct ClientRequestList *crl;
+  struct ClientList *cl;
   size_t msize;
   unsigned int i;
 
@@ -1291,8 +1498,50 @@ process_local_get_result (void *cls,
           (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
           (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
        {
-         // FIXME: initiate P2P search
-         return;
+         cl = clients;
+         while ( (NULL != cl) &&
+                 (cl->client != lgc->client) )
+           cl = cl->next;
+         if (cl == NULL)
+           {
+             cl = GNUNET_malloc (sizeof (struct ClientList));
+             cl->client = lgc->client;
+             cl->next = clients;
+             clients = cl;
+           }
+         crl = GNUNET_malloc (sizeof (struct ClientRequestList));
+         GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
+         pr = GNUNET_malloc (sizeof (struct PendingRequest));
+         pr->client = lgc->client;
+         GNUNET_SERVER_client_keep (pr->client);
+         pr->crl_entry = crl;
+         crl->req = pr;
+         if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
+           {
+             pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
+             *pr->namespace = lgc->namespace;
+           }
+         pr->replies_seen = lgc->results;
+         lgc->results = NULL;
+         pr->start_time = GNUNET_TIME_absolute_get ();
+         pr->query = lgc->query;
+         pr->target_pid = GNUNET_PEER_intern (&lgc->target);
+         pr->replies_seen_off = lgc->results_used;
+         pr->replies_seen_size = lgc->results_size;
+         lgc->results_size = 0;
+         pr->type = lgc->type;
+         pr->anonymity_level = lgc->anonymity_level;
+         pr->bf = refresh_bloomfilter (pr->replies_seen_off,
+                                       &pr->mingle,
+                                       pr->replies_seen);
+         GNUNET_CONTAINER_multihashmap_put (requests_by_query,
+                                            &pr->query,
+                                            pr,
+                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+         
+         // FIXME: trigger some processing NOW!
+         local_get_context_free (lgc);
+         return;
        }
       /* got all possible results, clean up! */
       local_get_context_free (lgc);
@@ -1441,7 +1690,18 @@ handle_start_search (void *cls,
   lgc->client = client;
   lgc->type = ntohl (sm->type);
   lgc->anonymity_level = ntohl (sm->anonymity_level);
-  lgc->target = sm->target;
+  switch (lgc->type)
+    {
+    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+      lgc->target.hashPubKey = sm->target;
+      break;
+    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+      lgc->namespace = sm->target;
+      break;
+    default:
+      break;
+    }
   lgc->query = sm->query;
   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1467,6 +1727,47 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
 };
 
 
+/**
+ * Clean up the memory used by the PendingRequest
+ * structure (except for the client or peer list
+ * that the request may be part of).
+ *
+ * @param pr request to clean up
+ */
+static void
+destroy_pending_request (struct PendingRequest *pr)
+{
+  struct PendingReply *reply;
+
+  GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
+                                       &pr->query,
+                                       pr);
+  // FIXME: not sure how this can work (efficiently)
+  // also, what does the return value mean?
+  if (pr->client != NULL)
+    GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
+                                      pr);
+  if (NULL != pr->bf)
+    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+  if (NULL != pr->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
+  if (NULL != pr->th)
+    GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
+  while (NULL != (reply = pr->replies_pending))
+    {
+      pr->replies_pending = reply->next;
+      GNUNET_free (reply);
+    }
+  GNUNET_PEER_change_rc (pr->source_pid, -1);
+  GNUNET_PEER_change_rc (pr->target_pid, -1);
+  GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
+  GNUNET_free_non_null (pr->used_pids);
+  GNUNET_free_non_null (pr->replies_seen);
+  GNUNET_free_non_null (pr->namespace);
+  GNUNET_free (pr);
+}
+
+
 /**
  * A client disconnected.  Remove all of its pending queries.
  *
@@ -1479,14 +1780,38 @@ handle_client_disconnect (void *cls,
                          * client)
 {
   struct LocalGetContext *lgc;
+  struct ClientList *cpos;
+  struct ClientList *cprev;
+  struct ClientRequestList *rl;
 
   lgc = lgc_head;
   while ( (NULL != lgc) &&
          (lgc->client != client) )
     lgc = lgc->next;
-  if (lgc == NULL)
-    return; /* not one of our clients */
-  local_get_context_free (lgc);
+  if (lgc != NULL)
+    local_get_context_free (lgc);
+  cprev = NULL;
+  cpos = clients;
+  while ( (NULL != cpos) &&
+         (clients->client != client) )
+    {
+      cprev = cpos;
+      cpos = cpos->next;
+    }
+  if (cpos != NULL)
+    {
+      if (cprev == NULL)
+       clients = cpos->next;
+      else
+       cprev->next = cpos->next;
+      while (NULL != (rl = cpos->head))
+       {
+         cpos->head = rl->next;
+         destroy_pending_request (rl->req);
+         GNUNET_free (rl);
+       }
+      GNUNET_free (cpos);
+    }
 }
 
 
@@ -1507,9 +1832,13 @@ shutdown_task (void *cls,
   GNUNET_DATASTORE_disconnect (dsh,
                               GNUNET_NO);
   dsh = NULL;
-  // FIXME: iterate over 'request_map' to free entries!
-  GNUNET_CONTAINER_multihashmap_destroy (request_map);
-  request_map = NULL;
+  // FIXME: iterate over maps to free entries!
+  GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
+  requests_by_query = NULL;
+  GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
+  requests_by_peer = NULL;
+  GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
+  requests_by_expiration = NULL;
   GNUNET_CONTAINER_multihashmap_destroy (ifm);
   ifm = NULL;
   while (NULL != (pos = indexed_files))
@@ -1520,6 +1849,30 @@ shutdown_task (void *cls,
 }
 
 
+/**
+ * Free (each) request made by the peer.
+ *
+ * @param cls closure, points to peer that the request belongs to
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+destroy_request (void *cls,
+                const GNUNET_HashCode * key,
+                void *value)
+{
+  const struct GNUNET_PeerIdentity * peer = cls;
+  struct PendingRequest *pr = value;
+  
+  GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
+                                       &peer->hashPubKey,
+                                       pr);
+  destroy_pending_request (pr);
+  return GNUNET_YES;
+}
+
+
 /**
  * Method called whenever a peer disconnects.
  *
@@ -1531,9 +1884,10 @@ peer_disconnect_handler (void *cls,
                         const struct
                         GNUNET_PeerIdentity * peer)
 {
-  // FIXME: remove all pending requests from this
-  // peer from our memory
-  // (iterate over request_map)
+  GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
+                                             &peer->hashPubKey,
+                                             &destroy_request,
+                                             (void*) peer);
 }
 
 
@@ -1550,12 +1904,52 @@ forward_get_request (void *cls,
                     const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct ProcessGetContext *pgc = cls;
+  struct PendingRequest *pr;
+  struct PendingRequest *eer;
+  struct GNUNET_PeerIdentity target;
 
-  // FIXME: install entry in
-  // 'request_map' and do actual
-  // forwarding...
-  if (pgc->bf != NULL)
-    GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
+  pr = GNUNET_malloc (sizeof (struct PendingRequest));
+  if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
+    {
+      pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
+      *pr->namespace = pgc->namespace;
+    }
+  pr->bf = pgc->bf;
+  pgc->bf = NULL;
+  pr->start_time = pgc->start_time;
+  pr->query = pgc->query;
+  pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
+  if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
+    pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
+  pr->anonymity_level = 1; /* default */
+  pr->priority = pgc->priority;
+  pr->remaining_priority = pr->priority;
+  pr->mingle = pgc->mingle;
+  pr->ttl = pgc->ttl; 
+  pr->type = pgc->type;
+  GNUNET_CONTAINER_multihashmap_put (requests_by_query,
+                                    &pr->query,
+                                    pr,
+                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
+                                    &pgc->reply_to.hashPubKey,
+                                    pr,
+                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_CONTAINER_heap_insert (requests_by_expiration,
+                               pr,
+                               pr->start_time.value + pr->ttl);
+  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
+    {
+      /* expire oldest request! */
+      eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
+      GNUNET_PEER_resolve (eer->source_pid,
+                          &target);    
+      GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
+                                           &target.hashPubKey,
+                                           eer);
+      destroy_pending_request (eer);     
+    }
+  // FIXME: trigger actual forwarding NOW!
   GNUNET_free (pgc); 
 }
 
@@ -1565,6 +1959,7 @@ forward_get_request (void *cls,
  * the target buffer "buf".  "buf" will be
  * NULL and "size" zero if the socket was closed for
  * writing in the meantime.  In that case, only
+
  * free the message
  *
  * @param cls closure, pointer to the message
@@ -1731,9 +2126,8 @@ process_p2p_get_result (void *cls,
   
 
 /**
- * We're processing a GET request from
- * another peer.  Give it to our local
- * datastore.
+ * We're processing a GET request from another peer.  Give it to our
+ * local datastore.
  *
  * @param cls our "struct ProcessGetContext"
  * @param ok did we get a datastore slice or not?
@@ -1982,6 +2376,42 @@ handle_p2p_get (void *cls,
 }
 
 
+/**
+ * Function called to notify us that we can now transmit a reply to a
+ * client or peer.  "buf" will be NULL and "size" zero if the socket was
+ * closed for writing in the meantime.
+ *
+ * @param cls closure, points to a "struct PendingRequest*" with
+ *            one or more pending replies
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_result (void *cls,
+                size_t size, 
+                void *buf)
+{
+  struct PendingRequest *pr = cls;
+  char *cbuf = buf;
+  struct PendingReply *reply;
+  size_t ret;
+
+  ret = 0;
+  while (NULL != (reply = pr->replies_pending))
+    {
+      if ( (reply->msize + ret < ret) ||
+          (reply->msize + ret > size) )
+       break;
+      pr->replies_pending = reply->next;
+      memcpy (&cbuf[ret], &reply[1], reply->msize);
+      ret += reply->msize;
+      GNUNET_free (pr);
+    }
+  return 0;
+}
+
+
 /**
  * Iterator over pending requests.
  *
@@ -1997,10 +2427,138 @@ process_reply (void *cls,
 {
   struct ProcessReplyClosure *prq = cls;
   struct PendingRequest *pr = value;
-
-  fprintf (stderr, "FIXME %p %p\n", prq, pr);
-  // FIXME: forward reply to client
-  // or other peers (depending on pr...)
+  struct PendingRequest *eer;
+  struct PendingReply *reply;
+  struct PutMessage *pm;
+  struct ContentMessage *cm;
+  GNUNET_HashCode chash;
+  GNUNET_HashCode mhash;
+  struct GNUNET_PeerIdentity target;
+  size_t msize;
+  uint32_t prio;
+  struct GNUNET_TIME_Relative max_delay;
+  
+  GNUNET_CRYPTO_hash (prq->data,
+                     prq->size,
+                     &chash);
+  switch (prq->type)
+    {
+    case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+    case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+      if (0 != memcmp (&chash,
+                      key,
+                      sizeof (GNUNET_HashCode)))
+       {
+         GNUNET_break_op (0);
+         return GNUNET_YES;
+       }
+      break;
+    case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
+      // FIXME: validate KBlock!
+      if (pr->bf != NULL) 
+       {
+         mingle_hash (&chash, pr->mingle, &mhash);
+         if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
+                                                              &mhash))
+           return GNUNET_YES; /* duplicate */
+         GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+                                           &mhash);
+       }      
+      break;
+    case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+      // FIXME: validate SBlock!
+      if (pr->bf != NULL) 
+       {
+         mingle_hash (&chash, pr->mingle, &mhash);
+         if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
+                                                              &mhash))
+           return GNUNET_YES; /* duplicate */
+         GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+                                           &mhash);
+       }
+      break;
+    case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
+      // FIXME: validate SKBlock!
+      break;
+    }
+  prio = pr->priority;
+  prq->priority += pr->remaining_priority;
+  pr->remaining_priority = 0;
+  if (pr->client != NULL)
+    {
+      if (pr->replies_seen_size == pr->replies_seen_off)
+       GNUNET_array_grow (pr->replies_seen,
+                          pr->replies_seen_size,
+                          pr->replies_seen_size * 2 + 4);
+      pr->replies_seen[pr->replies_seen_off++] = chash;
+      // FIXME: possibly recalculate BF!
+    }
+  if (pr->client == NULL)
+    {
+      msize = sizeof (struct ContentMessage) + prq->size;
+      reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
+      reply->msize = msize;
+      cm = (struct ContentMessage*) &reply[1];
+      cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
+      cm->header.size = htons (msize);
+      cm->type = htonl (prq->type);
+      cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      reply->next = pr->replies_pending;
+      pr->replies_pending = reply;
+      memcpy (&reply[1], prq->data, prq->size);
+      if (pr->cth != NULL)
+       return GNUNET_YES;
+      max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+      if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
+       {
+         /* estimate expiration time from time difference between
+            first request that will be discarded and this request */
+         eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
+         max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
+                                                          eer->start_time);
+       }
+      GNUNET_PEER_resolve (pr->source_pid,
+                          &target);
+      pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                                  prio,
+                                                  max_delay,
+                                                  &target,
+                                                  msize,
+                                                  &transmit_result,
+                                                  pr);
+      if (NULL == pr->cth)
+       {
+         // FIXME: now what? discard?
+       }
+    }
+  else
+    {
+      msize = sizeof (struct PutMessage) + prq->size;
+      reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
+      reply->msize = msize;
+      reply->next = pr->replies_pending;
+      pm = (struct PutMessage*) &reply[1];
+      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+      pm->header.size = htons (msize);
+      pm->type = htonl (prq->type);
+      pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
+      pr->replies_pending = reply;
+      memcpy (&reply[1], prq->data, prq->size);
+      if (pr->th != NULL)
+       return GNUNET_YES;
+      pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
+                                                   msize,
+                                                   GNUNET_TIME_UNIT_FOREVER_REL,
+                                                   &transmit_result,
+                                                   pr);
+      if (pr->th == NULL)
+       {
+         // FIXME: need to try again later (not much
+         // to do here specifically, but we need to
+         // check somewhere else to handle this case!)
+       }
+    }
+  // FIXME: implement hot-path routing statistics keeping!
   return GNUNET_YES;
 }
 
@@ -2089,7 +2647,7 @@ handle_p2p_put (void *cls,
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
-  GNUNET_CONTAINER_multihashmap_get_multiple (request_map,
+  GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
                                              &query,
                                              &process_reply,
                                              &prq);
@@ -2197,7 +2755,9 @@ run (void *cls,
   cfg = c;
 
   ifm = GNUNET_CONTAINER_multihashmap_create (128);
-  request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+  requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
   read_index_list ();
   dsh = GNUNET_DATASTORE_connect (cfg,
                                  sched);