fix misscalculation of per-session flow delays and apply flow delays properly per...
authorChristian Grothoff <christian@grothoff.org>
Sat, 17 Oct 2015 20:28:16 +0000 (20:28 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sat, 17 Oct 2015 20:28:16 +0000 (20:28 +0000)
src/transport/plugin_transport_udp.c

index ca5166600807bd841e1a0cee902636d25fbcbdc3..66843ed977693859023033f5584e24b273c92164 100644 (file)
@@ -174,9 +174,11 @@ struct Session
   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
 
   /**
-   * Desired delay for next sending we received from other peer
+   * Desired delay for transmissions we received from other peer.
+   * Adjusted to be per fragment (UDP_MTU), even though on the
+   * wire it was for "full messages".
    */
-  struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+  struct GNUNET_TIME_Relative flow_delay_from_other_peer;
 
   /**
    * Session timeout task
@@ -188,6 +190,11 @@ struct Session
    */
   struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * What time did we last transmit?
+   */
+  struct GNUNET_TIME_Absolute last_transmit_time;
+
   /**
    * expected delay for ACKs
    */
@@ -329,6 +336,18 @@ struct UDP_FragmentationContext
    */
   void *cont_cls;
 
+  /**
+   * Start time.
+   */
+  struct GNUNET_TIME_Absolute start_time;
+
+  /**
+   * Transmission time for the next fragment.  Incremented by
+   * the "flow_delay_from_other_peer" for each fragment when
+   * we setup the fragments.
+   */
+  struct GNUNET_TIME_Absolute next_frag_time;
+
   /**
    * Message timeout
    */
@@ -418,6 +437,17 @@ struct UDP_MessageWrapper
    */
   struct UDP_FragmentationContext *frag_ctx;
 
+  /**
+   * Message enqueue time.
+   */
+  struct GNUNET_TIME_Absolute start_time;
+
+  /**
+   * Desired transmission time for this message, based on the
+   * flow limiting information we got from the other peer.
+   */
+  struct GNUNET_TIME_Absolute transmission_time;
+
   /**
    * Message timeout.
    */
@@ -728,9 +758,26 @@ schedule_select_v4 (struct Plugin *plugin)
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
       min_delay = GNUNET_TIME_relative_min (min_delay,
-                                            GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+                                            GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
     if (NULL != plugin->select_task_v4)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+    if (NULL != plugin->ipv4_queue_head)
+    {
+      if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Calculated flow delay for UDPv4 at %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES));
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Calculated flow delay for UDPv4 at %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES));
+      }
+    }
     plugin->select_task_v4
       = GNUNET_SCHEDULER_add_read_net (min_delay,
                                        plugin->sockv4,
@@ -757,9 +804,26 @@ schedule_select_v6 (struct Plugin *plugin)
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
       min_delay = GNUNET_TIME_relative_min (min_delay,
-                                            GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+                                            GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
     if (NULL != plugin->select_task_v6)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
+    if (NULL != plugin->ipv6_queue_head)
+    {
+      if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Calculated flow delay for UDPv6 at %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES));
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Calculated flow delay for UDPv6 at %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES));
+      }
+    }
     plugin->select_task_v6
       = GNUNET_SCHEDULER_add_read_net (min_delay,
                                        plugin->sockv6,
@@ -1584,6 +1648,7 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
   struct UDP_MessageWrapper *udpw;
   struct UDP_MessageWrapper *tmp;
   size_t overhead;
+  struct GNUNET_TIME_Relative delay;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%p: Fragmented message removed with result %s\n",
@@ -1594,6 +1659,22 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
   else
     overhead = frag_ctx->on_wire_size;
+  delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
+  if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Fragmented message acknowledged after %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Fragmented message acknowledged after %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+  }
+
   if (NULL != frag_ctx->cont)
     frag_ctx->cont (frag_ctx->cont_cls,
                     &s->target,
@@ -1765,6 +1846,11 @@ enqueue_fragment (void *cls,
   udpw->msg_size = msg_len;
   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
   udpw->timeout = frag_ctx->timeout;
+  udpw->start_time = frag_ctx->start_time;
+  udpw->transmission_time = frag_ctx->next_frag_time;
+  frag_ctx->next_frag_time
+    = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
+                                session->flow_delay_from_other_peer);
   udpw->frag_ctx = frag_ctx;
   udpw->qc = &qc_fragment_sent;
   udpw->qc_cls = plugin;
@@ -1795,6 +1881,7 @@ qc_message_sent (void *cls,
 {
   struct Plugin *plugin = cls;
   size_t overhead;
+  struct GNUNET_TIME_Relative delay;
 
   if (udpw->msg_size >= udpw->payload_size)
     overhead = udpw->msg_size - udpw->payload_size;
@@ -1802,11 +1889,28 @@ qc_message_sent (void *cls,
     overhead = udpw->msg_size;
 
   if (NULL != udpw->cont)
+  {
+    delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
+    if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Message sent via UDP with delay of %s\n",
+                  GNUNET_STRINGS_relative_time_to_string (delay,
+                                                          GNUNET_YES));
+    }
+    else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Message sent via UDP with delay of %s\n",
+                  GNUNET_STRINGS_relative_time_to_string (delay,
+                                                          GNUNET_YES));
+    }
     udpw->cont (udpw->cont_cls,
                 &udpw->session->target,
                 result,
                 udpw->payload_size,
                 overhead);
+  }
   if (GNUNET_OK == result)
   {
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1941,7 +2045,12 @@ udp_plugin_send (void *cls,
     udpw->msg_buf = (char *) &udpw[1];
     udpw->msg_size = udpmlen; /* message size with UDP overhead */
     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
+    udpw->start_time = GNUNET_TIME_absolute_get ();
     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
+    udpw->transmission_time = s->last_transmit_time;
+    s->last_transmit_time
+      = GNUNET_TIME_absolute_add (s->last_transmit_time,
+                                  s->flow_delay_from_other_peer);
     udpw->cont = cont;
     udpw->cont_cls = cont_cls;
     udpw->frag_ctx = NULL;
@@ -1977,6 +2086,8 @@ udp_plugin_send (void *cls,
     frag_ctx->session = s;
     frag_ctx->cont = cont;
     frag_ctx->cont_cls = cont_cls;
+    frag_ctx->start_time = GNUNET_TIME_absolute_get ();
+    frag_ctx->next_frag_time = s->last_transmit_time;
     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
@@ -1989,6 +2100,7 @@ udp_plugin_send (void *cls,
                                                      &enqueue_fragment,
                                                      frag_ctx);
     s->frag_ctx = frag_ctx;
+    s->last_transmit_time = frag_ctx->next_frag_time;
     GNUNET_STATISTICS_update (plugin->env->stats,
                               "# UDP, fragmented messages active",
                               1,
@@ -2082,7 +2194,12 @@ read_process_ack (struct Plugin *plugin,
        GNUNET_STRINGS_relative_time_to_string (flow_delay,
                                                GNUNET_YES),
        GNUNET_i2s (&udp_ack->sender));
-  s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
+  /* Flow delay is for the reassembled packet, however, our delay
+     is per packet, so we need to adjust: */
+  flow_delay = GNUNET_TIME_relative_divide (flow_delay,
+                                            1 + (s->frag_ctx->payload_size /
+                                                 UDP_MTU));
+  s->flow_delay_from_other_peer = flow_delay;
 
 
   if (GNUNET_OK !=
@@ -2428,10 +2545,11 @@ udp_plugin_create_session (void *cls,
   s->plugin = plugin;
   s->address = GNUNET_HELLO_address_copy (address);
   s->target = address->peer;
+  s->last_transmit_time = GNUNET_TIME_absolute_get ();
   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
                                                               250);
   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
-  s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
+  s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
@@ -2740,6 +2858,7 @@ ack_proc (void *cls,
   udpw->msg_size = msize;
   udpw->payload_size = 0;
   udpw->session = s;
+  udpw->start_time = GNUNET_TIME_absolute_get ();
   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
   udpw->msg_buf = (char *) &udpw[1];
   udpw->qc = &ack_message_sent;
@@ -3085,8 +3204,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
     }
     else
     {
-      /* Message did not time out, check flow delay */
-      remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
+      /* Message did not time out, check transmission time */
+      remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
       if (0 == remaining.rel_value_us)
       {
         /* this message is not delayed */
@@ -3252,6 +3371,9 @@ udp_select_send (struct Plugin *plugin,
                                          udpw->msg_size,
                                          a,
                                          slen);
+    udpw->session->last_transmit_time
+      = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
+                                  udpw->session->last_transmit_time);
     dequeue (plugin,
              udpw);
     if (GNUNET_SYSERR == sent)