-check return values, fix leak
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
index 41dd61a644db1a280fb0a2660a253422fb365b6c..71b0fc091571ece75c9234c176ede7fa6305aae4 100644 (file)
@@ -251,6 +251,12 @@ schedule_peer_transmission (void *cls,
 static void
 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
 {
+#define N ((double)128.0)
+  /**
+   * Running average delay we currently impose.
+   */
+  static double avg_delay;
+
   struct GSF_PendingRequestData *prd;
   struct GNUNET_TIME_Relative delay;
 
@@ -259,7 +265,7 @@ plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
                          gettext_noop ("# average retransmission delay (ms)"),
                          total_delay * 1000LL / plan_count, GNUNET_NO);
   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
-  // FIXME: calculate 'rp->priority'!
+
   if (rp->transmission_counter < 8)
     delay =
         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -276,6 +282,35 @@ plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
   delay.rel_value =
       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                 delay.rel_value + 1);
+  /* Add 0.01 to avg_delay to avoid division-by-zero later */
+  avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value) / N) + 0.01;
+
+  /*
+   * For the priority, we need to consider a few basic rules:
+   * 1) if we just started requesting (delay is small), we should
+   * virtually always have a priority of zero.
+   * 2) for requests with average latency, our priority should match
+   * the average priority observed on the network
+   * 3) even the longest-running requests should not be WAY out of
+   * the observed average (thus we bound by a factor of 2)
+   * 4) we add +1 to the observed average priority to avoid everyone
+   * staying put at zero (2 * 0 = 0...).
+   *
+   * Using the specific calculation below, we get:
+   *
+   * delay = 0 => priority = 0;
+   * delay = avg delay => priority = running-average-observed-priority;
+   * delay >> avg_delay => priority = 2 * running-average-observed-priority;
+   *
+   * which satisfies all of the rules above.
+   *
+   * Note: M_PI_4 = PI/4 = arctan(1)
+   */
+  rp->priority =
+      round ((GSF_current_priorities +
+              1.0) * atan (delay.rel_value / avg_delay)) / M_PI_4;
+  /* Note: usage of 'round' and 'atan' requires -lm */
+
   if (rp->transmission_counter != 0)
     delay.rel_value += TTL_DECREMENT;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -283,12 +318,9 @@ plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
               (unsigned int) rp->transmission_counter,
               (unsigned long long) delay.rel_value);
   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
-#if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Earliest (re)transmission for `%s' in %us\n",
               GNUNET_h2s (&prd->query), rp->transmission_counter);
-#endif
-
   GNUNET_assert (rp->hn == NULL);
   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
       == 0)
@@ -304,6 +336,7 @@ plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
     GNUNET_SCHEDULER_cancel (pp->task);
   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+#undef N
 }
 
 
@@ -378,11 +411,9 @@ transmit_message_callback (void *cls, size_t buf_size, void *buf)
   rp->last_transmission = GNUNET_TIME_absolute_get ();
   rp->transmission_counter++;
   total_delay++;
-#if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Executing plan %p executed %u times, planning retransmission\n",
               rp, rp->transmission_counter);
-#endif
   plan (pp, rp);
   GNUNET_STATISTICS_update (GSF_stats,
                             gettext_noop
@@ -427,18 +458,14 @@ schedule_peer_transmission (void *cls,
     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
     if (NULL == rp)
     {
-#if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
                   pp);
-#endif
       return;                   /* both queues empty */
     }
     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
-#if DEBUG_FS
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
                 (unsigned long long) delay.rel_value, pp);
-#endif
     GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# delay heap timeout"),
                            delay.rel_value, GNUNET_NO);
 
@@ -450,9 +477,7 @@ schedule_peer_transmission (void *cls,
                             1, GNUNET_NO);
   /* process from priority heap */
   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
-#if DEBUG_FS > 1
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
-#endif
   GNUNET_assert (NULL != rp);
   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
   pp->pth =
@@ -572,11 +597,9 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
                             GNUNET_NO);
   prd = GSF_pending_request_get_data_ (pr);
-#if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Planning transmission of query `%s' to peer `%s'\n",
               GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
-#endif
   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
@@ -663,6 +686,31 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
   GNUNET_free (pp);
 }
 
+/**
+ * Get the last transmission attempt time for the request plan list
+ * referenced by 'rpr_head', that was sent to 'sender'
+ *
+ * @param rpr_head request plan reference list to check.
+ * @param sender the peer that we've sent the request to.
+ * @param result the timestamp to fill.
+ * @return GNUNET_YES if 'result' was changed, GNUNET_NO otherwise.
+ */
+int
+GSF_request_plan_reference_get_last_transmission_ (
+    struct GSF_RequestPlanReference *rpr_head, struct GSF_ConnectedPeer *sender,
+    struct GNUNET_TIME_Absolute *result)
+{
+  struct GSF_RequestPlanReference *rpr;
+  for (rpr = rpr_head; rpr; rpr = rpr->next)
+  {
+    if (rpr->rp->pp->cp == sender)
+    {
+      *result = rpr->rp->last_transmission;
+      return GNUNET_YES;
+    }
+  }
+  return GNUNET_NO;
+}
 
 /**
  * Notify the plan about a request being done; destroy all entries