fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 2e1c574c8ec34395471d83c7a99684bde0b6f2b7..0f603651ea4bfc0680831763f5814c65edae62ac 100644 (file)
@@ -24,9 +24,6 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - track per-peer request latency (using new load API)
- * - consider more precise latency estimation (per-peer & request) -- again load API?
- * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
  * - introduce random latency in processing
  * - more statistics
  */
 
 #define DEBUG_FS GNUNET_NO
 
+/**
+ * Should we introduce random latency in processing?  Required for proper
+ * implementation of GAP, but can be disabled for performance evaluation of
+ * the basic routing algorithm.
+ */
+#define SUPPORT_DELAYS GNUNET_NO
+
 /**
  * Maximum number of outgoing messages we queue per peer.
  */
@@ -142,6 +146,11 @@ struct PendingMessage
    */
   void *cont_cls;
 
+  /**
+   * Do not transmit this pending message until this deadline.
+   */
+  struct GNUNET_TIME_Absolute delay_until;
+
   /**
    * Size of the reply; actual reply message follows
    * at the end of this struct.
@@ -216,6 +225,23 @@ struct ConnectedPeer
    */
   struct PendingMessage *pending_messages_tail;
 
+  /**
+   * How long does it typically take for us to transmit a message
+   * to this peer?  (delay between the request being issued and
+   * the callback being invoked).
+   */
+  struct GNUNET_LOAD_Value *transmission_delay;
+
+  /**
+   * Time when the last transmission request was issued.
+   */
+  struct GNUNET_TIME_Absolute last_transmission_request_start;
+
+  /**
+   * ID of delay task for scheduling transmission.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
+
   /**
    * Average priority of successful replies.  Calculated
    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
@@ -582,12 +608,17 @@ struct PendingRequest
   /**
    * Remove this request after transmission of the current response.
    */
-  int16_t do_remove;
+  int8_t do_remove;
+
+  /**
+   * GNUNET_YES if we should not forward this request to other peers.
+   */
+  int8_t local_only;
 
   /**
    * GNUNET_YES if we should not forward this request to other peers.
    */
-  int16_t local_only;
+  int8_t forward_only;
 
 };
 
@@ -796,6 +827,10 @@ static struct GNUNET_LOAD_Value *datastore_get_load;
  */
 static struct GNUNET_LOAD_Value *datastore_put_load;
 
+/**
+ * How long do requests typically stay in the routing table?
+ */
+static struct GNUNET_LOAD_Value *rt_entry_lifetime;
 
 /**
  * We've just now completed a datastore request.  Update our
@@ -958,7 +993,7 @@ consider_migration (void *cls,
     }
 
   /* consider scheduling transmission to cp for content migration */
-  if (cp->cth != NULL)
+  if (cp->cth != NULL)        
     return GNUNET_YES; 
   msize = 0;
   pos = mig_head;
@@ -986,6 +1021,11 @@ consider_migration (void *cls,
              msize,
              GNUNET_h2s (key));
 #endif
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   cp->cth 
     = GNUNET_CORE_notify_transmit_ready (core,
                                         0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1318,12 +1358,12 @@ destroy_pending_message (struct PendingMessage *pm,
   TransmissionContinuation cont;
   void *cont_cls;
 
+  cont = pm->cont;
+  cont_cls = pm->cont_cls;
   if (pml != NULL)
     {
       GNUNET_assert (pml->pm == pm);
       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
-      cont = pm->cont;
-      cont_cls = pm->cont_cls;
       destroy_pending_message_list_entry (pml);
     }
   else
@@ -1366,12 +1406,14 @@ destroy_pending_request (struct PendingRequest *pr)
                                -1,
                                GNUNET_NO);
     }
-  /* might have already been removed from map in 'process_reply' (if
-     there was a unique reply) or never inserted if it was a
-     duplicate; hence ignore the return value here */
-  (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
-                                              &pr->query,
-                                              pr);
+  if (GNUNET_YES == 
+      GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+                                           &pr->query,
+                                           pr))
+    {
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration (pr->start_time).value);
+    }
   if (pr->qe != NULL)
      {
       GNUNET_DATASTORE_cancel (pr->qe);
@@ -1456,6 +1498,7 @@ peer_connect_handler (void *cls,
   uint32_t trust;
   
   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+  cp->transmission_delay = GNUNET_LOAD_value_init ();
   cp->pid = GNUNET_PEER_intern (peer);
 
   fn = get_trust_filename (peer);
@@ -1668,9 +1711,18 @@ peer_disconnect_handler (void *cls,
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+    {
+      GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      cp->cth = NULL;
+    }
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (NULL != (pm = cp->pending_messages_head))
     destroy_pending_message (pm, 0 /* delivery failed */);
+  GNUNET_LOAD_value_free (cp->transmission_delay);
   GNUNET_break (0 == cp->pending_requests);
   GNUNET_free (cp);
 }
@@ -1829,6 +1881,8 @@ shutdown_task (void *cls,
   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
   query_request_map = NULL;
+  GNUNET_LOAD_value_free (rt_entry_lifetime);
+  rt_entry_lifetime = NULL;
   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
   peer_request_map = NULL;
@@ -1869,6 +1923,39 @@ shutdown_task (void *cls,
 /* ******************* Utility functions  ******************** */
 
 
+/**
+ * We've had to delay a request for transmission to core, but now
+ * we should be ready.  Run it.
+ *
+ * @param cls the 'struct ConnectedPeer' for which a request was delayed
+ * @param tc task context (unused)
+ */
+static void
+delayed_transmission_request (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct ConnectedPeer *cp = cls;
+  struct GNUNET_PeerIdentity pid;
+  struct PendingMessage *pm;
+
+  pm = cp->pending_messages_head;
+  cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (cp->cth == NULL);
+  if (pm == NULL)
+    return;
+  GNUNET_PEER_resolve (cp->pid,
+                      &pid);
+  cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              pm->priority,
+                                              GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                                              &pid,
+                                              pm->msize,
+                                              &transmit_to_peer,
+                                              cp);
+}
+
+
 /**
  * Transmit messages by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
@@ -1888,13 +1975,16 @@ transmit_to_peer (void *cls,
 {
   struct ConnectedPeer *cp = cls;
   char *cbuf = buf;
-  struct GNUNET_PeerIdentity pid;
   struct PendingMessage *pm;
+  struct PendingMessage *next_pm;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative min_delay;
   struct MigrationReadyBlock *mb;
   struct MigrationReadyBlock *next;
   struct PutMessage migm;
   size_t msize;
   unsigned int i;
+  struct GNUNET_PeerIdentity pid;
  
   cp->cth = NULL;
   if (NULL == buf)
@@ -1903,31 +1993,53 @@ transmit_to_peer (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping message, core too busy.\n");
 #endif
+      GNUNET_LOAD_update (cp->transmission_delay,
+                         UINT64_MAX);
       return 0;
-    }
+    }  
+  GNUNET_LOAD_update (cp->transmission_delay,
+                     GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
+  now = GNUNET_TIME_absolute_get ();
   msize = 0;
-  while ( (NULL != (pm = cp->pending_messages_head) ) &&
+  min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+  next_pm = cp->pending_messages_head;
+  while ( (NULL != (pm = next_pm) ) &&
          (pm->msize <= size) )
     {
+      next_pm = pm->next;
+      if (pm->delay_until.value > now.value)
+       {
+         min_delay = GNUNET_TIME_relative_min (min_delay,
+                                               GNUNET_TIME_absolute_get_remaining (pm->delay_until));
+         continue;
+       }
       memcpy (&cbuf[msize], &pm[1], pm->msize);
       msize += pm->msize;
       size -= pm->msize;
+      if (NULL == pm->pml)
+       {
+         GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+                                      cp->pending_messages_tail,
+                                      pm);
+         cp->pending_requests--;
+       }
       destroy_pending_message (pm, cp->pid);
     }
-  if (NULL != pm)
-    {
-      GNUNET_PEER_resolve (cp->pid,
-                          &pid);
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  pm->priority,
-                                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,
-                                                  &pid,
-                                                  pm->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
+  if (pm != NULL)
+    min_delay = GNUNET_TIME_UNIT_ZERO;
+  if (NULL != cp->pending_messages_head)
+    {     
+      GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task
+       = GNUNET_SCHEDULER_add_delayed (sched,
+                                       min_delay,
+                                       &delayed_transmission_request,
+                                       cp);
     }
-  else
+  if (pm == NULL)
     {      
+      GNUNET_PEER_resolve (cp->pid,
+                          &pid);
       next = mig_head;
       while (NULL != (mb = next))
        {
@@ -1955,7 +2067,7 @@ transmit_to_peer (void *cls,
                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                              "Pushing migration block `%s' (%u bytes) to `%s'\n",
                              GNUNET_h2s (&mb->query),
-                             mb->size,
+                             (unsigned int) mb->size,
                              GNUNET_i2s (&pid));
 #endif   
                  break;
@@ -1966,7 +2078,7 @@ transmit_to_peer (void *cls,
                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                              "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
                              GNUNET_h2s (&mb->query),
-                             mb->size,
+                             (unsigned int) mb->size,
                              GNUNET_i2s (&pid));
 #endif   
                }
@@ -1984,9 +2096,9 @@ transmit_to_peer (void *cls,
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes to peer %u\n",
-             msize,
-             cp->pid);
+             "Transmitting %u bytes to peer with PID %u\n",
+             (unsigned int) msize,
+             (unsigned int) cp->pid);
 #endif
   return msize;
 }
@@ -2034,8 +2146,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
     destroy_pending_message (cp->pending_messages_tail, 0);  
   GNUNET_PEER_resolve (cp->pid, &pid);
   if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+    {
+      GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      cp->cth = NULL;
+    }
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   /* need to schedule transmission */
+  cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
                                               cp->pending_messages_head->priority,
                                               MAX_TRANSMIT_DELAY,
@@ -2058,16 +2179,69 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
 
 
 /**
- * Test if the load on this peer is too high
+ * Test if the DATABASE (GET) load on this peer is too high
+ * to even consider processing the query at
+ * all.  
+ * 
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal)
+ *         GNUNET_SYSERR to process for free (load low)
+ */
+static int
+test_get_load_too_high (uint32_t priority)
+{
+  double ld;
+
+  ld = GNUNET_LOAD_get_load (datastore_get_load);
+  if (ld < 1)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for free (low load)"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_SYSERR;
+    }
+  if (ld <= priority)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for a price (normal load)"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_NO;
+    }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# requests dropped due to high load"),
+                           1,
+                           GNUNET_NO);
+  return GNUNET_YES;
+}
+
+
+
+
+/**
+ * Test if the DATABASE (PUT) load on this peer is too high
  * to even consider processing the query at
- * all.
+ * all.  
  * 
- * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal or low)
  */
 static int
-test_load_too_high ()
+test_put_load_too_high (uint32_t priority)
 {
-  return GNUNET_SYSERR; // FIXME
+  double ld;
+
+  if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
+    return GNUNET_NO; /* very fast */
+  ld = GNUNET_LOAD_get_load (datastore_put_load);
+  if ( (ld < 1) || (ld < priority) )
+    return GNUNET_NO;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# storage requests dropped due to high load"),
+                           1,
+                           GNUNET_NO);
+  return GNUNET_YES;
 }
 
 
@@ -2309,8 +2483,6 @@ target_reservation_cb (void *cls,
                                                     pr);
          return;  /* this target round failed */
        }
-      /* FIXME: if we are "quite" busy, we may still want to skip
-        this round; need more load detection code! */
       no_route = GNUNET_YES;
     }
   
@@ -2466,7 +2638,7 @@ target_peer_select_cb (void *cls,
                                        P2P_SUCCESS_LIST_SIZE);
       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
        if (cp->last_p2p_replies[i] == pr->cp->pid)
-         score += 1; /* likely successful based on hot path */
+         score += 1.0; /* likely successful based on hot path */
     }
   else
     {
@@ -2474,14 +2646,14 @@ target_peer_select_cb (void *cls,
                                        CS2P_SUCCESS_LIST_SIZE);
       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
        if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
-         score += 1; /* likely successful based on hot path */
+         score += 1.0; /* likely successful based on hot path */
     }
   /* 3b) include latency */
   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
-    score += 1; /* likely fast based on latency */
+    score += 1.0; /* likely fast based on latency */
   /* 3c) include priorities */
   if (cp->avg_priority <= pr->remaining_priority / 2.0)
-    score += 1; /* likely successful based on priorities */
+    score += 1.0; /* likely successful based on priorities */
   /* 3d) penalize for queue size */  
   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
   /* 3e) include peer proximity */
@@ -2596,6 +2768,7 @@ forward_request_task (void *cls,
     return; /* configured to not do P2P search */
   /* (0) try DHT */
   if ( (0 == pr->anonymity_level) &&
+       (GNUNET_YES != pr->forward_only) &&
        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
     {
@@ -2657,18 +2830,28 @@ forward_request_task (void *cls,
     }
 
   /* (3) reserve reply bandwidth */
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &psc.target.hashPubKey);
-  GNUNET_assert (NULL != cp);
-  pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
-                                               &psc.target,
-                                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
-                                               DBLOCK_SIZE * 2, 
-                                               cp->inc_preference,
-                                               &target_reservation_cb,
-                                               pr);
-  cp->inc_preference = 0;
+  if (GNUNET_NO == pr->forward_only)
+    {
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &psc.target.hashPubKey);
+      GNUNET_assert (NULL != cp);
+      pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
+                                                   &psc.target,
+                                                   GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
+                                                   GNUNET_BANDWIDTH_value_init (UINT32_MAX),
+                                                   DBLOCK_SIZE * 2, 
+                                                   cp->inc_preference,
+                                                   &target_reservation_cb,
+                                                   pr);
+      cp->inc_preference = 0;
+    }
+  else
+    {
+      /* force forwarding */
+      static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
+      target_reservation_cb (pr, &psc.target,
+                            zerobw, zerobw, 0, 0.0);
+    }
 }
 
 
@@ -2851,8 +3034,6 @@ process_reply (void *cls,
                            GNUNET_NO);
   if (prq->sender != NULL)
     {
-      /* FIXME: should we be more precise here and not use
-        "start_time" but a peer-specific time stamp? */
       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
       prq->sender->avg_delay.value
        = (prq->sender->avg_delay.value * 
@@ -2916,6 +3097,8 @@ process_reply (void *cls,
                    GNUNET_CONTAINER_multihashmap_remove (query_request_map,
                                                          key,
                                                          pr));
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration (pr->start_time).value);
       break;
     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
       GNUNET_STATISTICS_update (stats,
@@ -3027,6 +3210,12 @@ process_reply (void *cls,
       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+      reply->delay_until 
+       = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                                          GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                                                    TTL_DECREMENT)));
+#endif
       reply->msize = msize;
       reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
@@ -3196,8 +3385,9 @@ handle_p2p_put (void *cls,
       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
       prq.sender->trust += prq.priority;
     }
-  if (GNUNET_YES == active_migration)
-    {
+  if ( (GNUNET_YES == active_migration) &&
+       (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+    {      
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Replicating result for query `%s' with priority %u\n",
@@ -3464,10 +3654,10 @@ process_local_reply (void *cls,
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
-  process_reply (&prq, key, pr);
   if ( (old_rf == 0) &&
-       (pr->results_found == 1) )
+       (pr->results_found == 0) )
     update_datastore_delays (pr->start_time);
+  process_reply (&prq, key, pr);
   if (prq.finished == GNUNET_YES)
     return;
   if (pr->qe == NULL)
@@ -3478,7 +3668,7 @@ process_local_reply (void *cls,
       return;
     }
   if ( (pr->client_request_list == NULL) &&
-       ( (GNUNET_YES == test_load_too_high()) ||
+       ( (GNUNET_YES == test_get_load_too_high (0)) ||
         (pr->results_found > 5 + 2 * pr->priority) ) )
     {
 #if DEBUG_FS > 2
@@ -3504,7 +3694,7 @@ process_local_reply (void *cls,
  * @param cp the peer making the request
  * @return effective priority
  */
-static uint32_t
+static int32_t
 bound_priority (uint32_t prio_in,
                struct ConnectedPeer *cp)
 {
@@ -3513,7 +3703,7 @@ bound_priority (uint32_t prio_in,
   double rret;
   int ld;
 
-  ld = test_load_too_high ();
+  ld = test_get_load_too_high (0);
   if (ld == GNUNET_SYSERR)
     return 0; /* excess resources */
   ret = change_host_trust (cp, prio_in);
@@ -3526,6 +3716,18 @@ bound_priority (uint32_t prio_in,
       current_priorities 
        = (current_priorities * (N-1) + rret)/N;
     }
+  if ( (ld == GNUNET_YES) && (ret > 0) )
+    {
+      /* try with charging */
+      ld = test_get_load_too_high (ret);
+    }
+  if (ld == GNUNET_YES)
+    {
+      /* undo charge */
+      if (ret != 0)
+       change_host_trust (cp, -ret);
+      return -1; /* not enough resources */
+    }
 #undef N
   return ret;
 }
@@ -3592,9 +3794,9 @@ handle_p2p_get (void *cls,
   uint32_t bm;
   size_t bfsize;
   uint32_t ttl_decrement;
+  int32_t priority;
   enum GNUNET_BLOCK_Type type;
   int have_ns;
-  int ld;
 
   msize = ntohs(message->size);
   if (msize < sizeof (struct GetMessage))
@@ -3660,11 +3862,8 @@ handle_p2p_get (void *cls,
   /* note that we can really only check load here since otherwise
      peers could find out that we are overloaded by not being
      disconnected after sending us a malformed query... */
-
-  /* FIXME: query priority should play
-     a major role here! */
-  ld = test_load_too_high ();
-  if (GNUNET_YES == ld)
+  priority = bound_priority (ntohl (gm->priority), cps);
+  if (priority < 0)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3677,9 +3876,6 @@ handle_p2p_get (void *cls,
                                GNUNET_NO);
       return GNUNET_OK;
     }
-  /* FIXME: if ld == GNUNET_NO, forward
-     instead of indirecting! */
-
 #if DEBUG_FS 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
@@ -3696,12 +3892,21 @@ handle_p2p_get (void *cls,
       pr->namespace = (GNUNET_HashCode*) &pr[1];
       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
     }
+  if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
+       (GNUNET_LOAD_get_average (cp->transmission_delay) > 
+       GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
+    {
+      /* don't have BW to send to peer, or would likely take longer than we have for it,
+        so at best indirect the query */
+      priority = 0;
+      pr->forward_only = GNUNET_YES;
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
   pr->anonymity_level = 1;
-  pr->priority = bound_priority (ntohl (gm->priority), cps);
+  pr->priority = (uint32_t) priority;
   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
   pr->query = gm->query;
   /* decrement ttl (always) */
@@ -3808,22 +4013,29 @@ handle_p2p_get (void *cls,
     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
-  pr->qe = GNUNET_DATASTORE_get (dsh,
-                                &gm->query,
-                                type,                         
-                                pr->priority + 1,
-                                MAX_DATASTORE_QUEUE,                            
-                                timeout,
-                                &process_local_reply,
-                                pr);
+  if (GNUNET_YES != pr->forward_only)
+    pr->qe = GNUNET_DATASTORE_get (dsh,
+                                  &gm->query,
+                                  type,                               
+                                  pr->priority + 1,
+                                  MAX_DATASTORE_QUEUE,                          
+                                  timeout,
+                                  &process_local_reply,
+                                  pr);
+  else
+    GNUNET_STATISTICS_update (stats,
+                             gettext_noop ("# requests forwarded due to high load"),
+                             1,
+                             GNUNET_NO);
 
-  /* Are multiple results possible?  If so, start processing remotely now! */
+  /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
   switch (pr->type)
     {
     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one result, wait for datastore */
-      break;
+      if (GNUNET_YES != pr->forward_only)
+       break;
     default:
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
        pr->task = GNUNET_SCHEDULER_add_now (sched,
@@ -4065,6 +4277,7 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
     }
   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
+  rt_entry_lifetime = GNUNET_LOAD_value_init ();
   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
   core = GNUNET_CORE_connect (sched,
@@ -4087,6 +4300,8 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
       connected_peers = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
       query_request_map = NULL;
+      GNUNET_LOAD_value_free (rt_entry_lifetime);
+      rt_entry_lifetime = NULL;
       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
       requests_by_expiration_heap = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);