-ensure stats queues do not grow too big
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
index e10a924d138eaa9f1b05c740d662a49f3ff1da61..a999897cc00dca618a8c0b867663ff1c22a4cf38 100644 (file)
@@ -1,6 +1,6 @@
 /*
  This file is part of GNUnet
- Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2010-2015 GNUnet e.V.
 
  GNUnet is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published
@@ -175,8 +175,8 @@ struct GNUNET_ATS_Session
 
   /**
    * Desired delay for transmissions we received from other peer.
-   * Adjusted to be per fragment (UDP_MTU), even though on the
-   * wire it was for "full messages".
+   * This is for full messages, the value needs to be adjusted for
+   * fragmented messages.
    */
   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
 
@@ -343,11 +343,18 @@ struct UDP_FragmentationContext
 
   /**
    * Transmission time for the next fragment.  Incremented by
-   * the "flow_delay_from_other_peer" for each fragment when
+   * the @e flow_delay_from_other_peer for each fragment when
    * we setup the fragments.
    */
   struct GNUNET_TIME_Absolute next_frag_time;
 
+  /**
+   * Desired delay for transmissions we received from other peer.
+   * Adjusted to be per fragment (UDP_MTU), even though on the
+   * wire it was for "full messages".
+   */
+  struct GNUNET_TIME_Relative flow_delay_from_other_peer;
+
   /**
    * Message timeout
    */
@@ -480,6 +487,8 @@ struct UDP_ACK_Message
 
   /**
    * Desired delay for flow control, in us (in NBO).
+   * A value of UINT32_MAX indicates that the other
+   * peer wants us to disconnect.
    */
   uint32_t delay GNUNET_PACKED;
 
@@ -719,11 +728,9 @@ udp_plugin_get_network_for_address (void *cls,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
  */
 static void
-udp_plugin_select_v4 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v4 (void *cls);
 
 
 /**
@@ -732,11 +739,9 @@ udp_plugin_select_v4 (void *cls,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
  */
 static void
-udp_plugin_select_v6 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v6 (void *cls);
 
 
 /**
@@ -748,7 +753,9 @@ static void
 schedule_select_v4 (struct Plugin *plugin)
 {
   struct GNUNET_TIME_Relative min_delay;
+  struct GNUNET_TIME_Relative delay;
   struct UDP_MessageWrapper *udpw;
+  struct UDP_MessageWrapper *min_udpw;
 
   if ( (GNUNET_YES == plugin->enable_ipv4) &&
        (NULL != plugin->sockv4) )
@@ -756,26 +763,35 @@ schedule_select_v4 (struct Plugin *plugin)
     /* Find a message ready to send:
      * Flow delay from other peer is expired or not set (0) */
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+    min_udpw = NULL;
     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
-      min_delay = GNUNET_TIME_relative_min (min_delay,
-                                            GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
+    {
+      delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
+      if (delay.rel_value_us < min_delay.rel_value_us)
+      {
+        min_delay = delay;
+        min_udpw = udpw;
+      }
+    }
     if (NULL != plugin->select_task_v4)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
-    if (NULL != plugin->ipv4_queue_head)
+    if (NULL != min_udpw)
     {
       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
       {
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Calculated flow delay for UDPv4 at %s\n",
+                    "Calculated flow delay for UDPv4 at %s for %s\n",
                     GNUNET_STRINGS_relative_time_to_string (min_delay,
-                                                            GNUNET_YES));
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
       }
       else
       {
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "Calculated flow delay for UDPv4 at %s\n",
+                    "Calculated flow delay for UDPv4 at %s for %s\n",
                     GNUNET_STRINGS_relative_time_to_string (min_delay,
-                                                            GNUNET_YES));
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
       }
     }
     plugin->select_task_v4
@@ -796,32 +812,43 @@ static void
 schedule_select_v6 (struct Plugin *plugin)
 {
   struct GNUNET_TIME_Relative min_delay;
+  struct GNUNET_TIME_Relative delay;
   struct UDP_MessageWrapper *udpw;
+  struct UDP_MessageWrapper *min_udpw;
 
   if ( (GNUNET_YES == plugin->enable_ipv6) &&
        (NULL != plugin->sockv6) )
   {
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+    min_udpw = NULL;
     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
-      min_delay = GNUNET_TIME_relative_min (min_delay,
-                                            GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
+    {
+      delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
+      if (delay.rel_value_us < min_delay.rel_value_us)
+      {
+        min_delay = delay;
+        min_udpw = udpw;
+      }
+    }
     if (NULL != plugin->select_task_v6)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
-    if (NULL != plugin->ipv6_queue_head)
+    if (NULL != min_udpw)
     {
       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
       {
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Calculated flow delay for UDPv6 at %s\n",
+                    "Calculated flow delay for UDPv6 at %s for %s\n",
                     GNUNET_STRINGS_relative_time_to_string (min_delay,
-                                                            GNUNET_YES));
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
       }
       else
       {
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "Calculated flow delay for UDPv6 at %s\n",
+                    "Calculated flow delay for UDPv6 at %s for %s\n",
                     GNUNET_STRINGS_relative_time_to_string (min_delay,
-                                                            GNUNET_YES));
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
       }
     }
     plugin->select_task_v6
@@ -1662,17 +1689,19 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
   delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Fragmented message acknowledged after %s\n",
-                GNUNET_STRINGS_relative_time_to_string (delay,
-                                                        GNUNET_YES));
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Fragmented message acknowledged after %s (expected at %s)\n",
+         GNUNET_STRINGS_relative_time_to_string (delay,
+                                                 GNUNET_YES),
+         GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
   }
   else
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Fragmented message acknowledged after %s\n",
-                GNUNET_STRINGS_relative_time_to_string (delay,
-                                                        GNUNET_YES));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Fragmented message acknowledged after %s (expected at %s)\n",
+         GNUNET_STRINGS_relative_time_to_string (delay,
+                                                 GNUNET_YES),
+         GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
   }
 
   if (NULL != frag_ctx->cont)
@@ -1792,6 +1821,10 @@ qc_fragment_sent (void *cls,
   GNUNET_assert (NULL != udpw->frag_ctx);
   if (GNUNET_OK == result)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Fragment of message with %u bytes transmitted to %s\n",
+                (unsigned int) udpw->payload_size,
+                GNUNET_i2s (&udpw->session->target));
     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
     GNUNET_STATISTICS_update (plugin->env->stats,
                               "# UDP, fragmented msgs, fragments, sent, success",
@@ -1804,6 +1837,10 @@ qc_fragment_sent (void *cls,
   }
   else
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Failed to transmit fragment of message with %u bytes to %s\n",
+                (unsigned int) udpw->payload_size,
+                GNUNET_i2s (&udpw->session->target));
     fragmented_message_done (udpw->frag_ctx,
                              GNUNET_SYSERR);
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1850,7 +1887,7 @@ enqueue_fragment (void *cls,
   udpw->transmission_time = frag_ctx->next_frag_time;
   frag_ctx->next_frag_time
     = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
-                                session->flow_delay_from_other_peer);
+                                frag_ctx->flow_delay_from_other_peer);
   udpw->frag_ctx = frag_ctx;
   udpw->qc = &qc_fragment_sent;
   udpw->qc_cls = plugin;
@@ -1859,6 +1896,10 @@ enqueue_fragment (void *cls,
           msg_len);
   enqueue (plugin,
            udpw);
+  if (session->address->address_length == sizeof (struct IPv4UdpAddress))
+    schedule_select_v4 (plugin);
+  else
+    schedule_select_v6 (plugin);
 }
 
 
@@ -1889,17 +1930,17 @@ qc_message_sent (void *cls,
     delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Message sent via UDP with delay of %s\n",
-                  GNUNET_STRINGS_relative_time_to_string (delay,
-                                                          GNUNET_YES));
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "Message sent via UDP with delay of %s\n",
+           GNUNET_STRINGS_relative_time_to_string (delay,
+                                                   GNUNET_YES));
     }
     else
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Message sent via UDP with delay of %s\n",
-                  GNUNET_STRINGS_relative_time_to_string (delay,
-                                                          GNUNET_YES));
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Message sent via UDP with delay of %s\n",
+           GNUNET_STRINGS_relative_time_to_string (delay,
+                                                   GNUNET_YES));
     }
     udpw->cont (udpw->cont_cls,
                 &udpw->session->target,
@@ -1991,6 +2032,7 @@ udp_plugin_send (void *cls,
   struct UDP_MessageWrapper *udpw;
   struct UDPMessage *udp;
   char mbuf[udpmlen] GNUNET_ALIGN;
+  struct GNUNET_TIME_Relative latency;
 
   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
        (NULL == plugin->sockv6) )
@@ -2068,6 +2110,10 @@ udp_plugin_send (void *cls,
                               "# UDP, unfragmented bytes payload queued total",
                               msgbuf_size,
                               GNUNET_NO);
+    if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+      schedule_select_v4 (plugin);
+    else
+      schedule_select_v6 (plugin);
   }
   else
   {
@@ -2084,6 +2130,10 @@ udp_plugin_send (void *cls,
     frag_ctx->cont_cls = cont_cls;
     frag_ctx->start_time = GNUNET_TIME_absolute_get ();
     frag_ctx->next_frag_time = s->last_transmit_time;
+    frag_ctx->flow_delay_from_other_peer
+      = GNUNET_TIME_relative_divide (s->flow_delay_from_other_peer,
+                                     1 + (msgbuf_size /
+                                          UDP_MTU));
     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
@@ -2097,10 +2147,22 @@ udp_plugin_send (void *cls,
                                                      frag_ctx);
     s->frag_ctx = frag_ctx;
     s->last_transmit_time = frag_ctx->next_frag_time;
-    if (sizeof (struct IPv4UdpAddress) == s->address->address_length)
-      schedule_select_v4 (plugin);
+    latency = GNUNET_TIME_absolute_get_remaining (s->last_transmit_time);
+    if (latency.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
+           GNUNET_STRINGS_relative_time_to_string (latency,
+                                                   GNUNET_YES),
+           GNUNET_i2s (&s->target),
+           (unsigned int) s->msgs_in_queue);
     else
-      schedule_select_v6 (plugin);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
+           GNUNET_STRINGS_relative_time_to_string (latency,
+                                                   GNUNET_YES),
+           GNUNET_i2s (&s->target),
+           (unsigned int) s->msgs_in_queue);
+
     GNUNET_STATISTICS_update (plugin->env->stats,
                               "# UDP, fragmented messages active",
                               1,
@@ -2117,119 +2179,10 @@ udp_plugin_send (void *cls,
   notify_session_monitor (s->plugin,
                           s,
                           GNUNET_TRANSPORT_SS_UPDATE);
-  if (s->address->address_length == sizeof (struct IPv4UdpAddress))
-    schedule_select_v4 (plugin);
-  else
-    schedule_select_v6 (plugin);
   return udpmlen;
 }
 
 
-/**
- * Handle an ACK message.
- *
- * @param plugin the UDP plugin
- * @param msg the (presumed) UDP ACK message
- * @param udp_addr sender address
- * @param udp_addr_len number of bytes in @a udp_addr
- */
-static void
-read_process_ack (struct Plugin *plugin,
-                  const struct GNUNET_MessageHeader *msg,
-                  const union UdpAddress *udp_addr,
-                  socklen_t udp_addr_len)
-{
-  const struct GNUNET_MessageHeader *ack;
-  const struct UDP_ACK_Message *udp_ack;
-  struct GNUNET_HELLO_Address *address;
-  struct GNUNET_ATS_Session *s;
-  struct GNUNET_TIME_Relative flow_delay;
-
-  if (ntohs (msg->size)
-      < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  udp_ack = (const struct UDP_ACK_Message *) msg;
-  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
-  if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
-  {
-    GNUNET_break_op(0);
-    return;
-  }
-  address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
-                                           PLUGIN_NAME,
-                                           udp_addr,
-                                           udp_addr_len,
-                                           GNUNET_HELLO_ADDRESS_INFO_NONE);
-  s = udp_plugin_lookup_session (plugin,
-                                 address);
-  if (NULL == s)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "UDP session of address %s for ACK not found\n",
-         udp_address_to_string (plugin,
-                                address->address,
-                                address->address_length));
-    GNUNET_HELLO_address_free (address);
-    return;
-  }
-  if (NULL == s->frag_ctx)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-         "Fragmentation context of address %s for ACK (%s) not found\n",
-         udp_address_to_string (plugin,
-                                address->address,
-                                address->address_length),
-         GNUNET_FRAGMENT_print_ack (ack));
-    GNUNET_HELLO_address_free (address);
-    return;
-  }
-  GNUNET_HELLO_address_free (address);
-
-  flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "We received a sending delay of %s for %s\n",
-       GNUNET_STRINGS_relative_time_to_string (flow_delay,
-                                               GNUNET_YES),
-       GNUNET_i2s (&udp_ack->sender));
-  /* Flow delay is for the reassembled packet, however, our delay
-     is per packet, so we need to adjust: */
-  flow_delay = GNUNET_TIME_relative_divide (flow_delay,
-                                            1 + (s->frag_ctx->payload_size /
-                                                 UDP_MTU));
-  s->flow_delay_from_other_peer = flow_delay;
-
-
-  if (GNUNET_OK !=
-      GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
-                                   ack))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
-         (unsigned int) ntohs (msg->size),
-         GNUNET_i2s (&udp_ack->sender),
-         udp_address_to_string (plugin,
-                                udp_addr,
-                                udp_addr_len));
-    /* Expect more ACKs to arrive */
-    return;
-  }
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Message from %s at %s full ACK'ed\n",
-       GNUNET_i2s (&udp_ack->sender),
-       udp_address_to_string (plugin,
-                              udp_addr,
-                              udp_addr_len));
-
-  /* Remove fragmented message after successful sending */
-  fragmented_message_done (s->frag_ctx,
-                           GNUNET_OK);
-}
-
-
 /* ********************** Receiving ********************** */
 
 
@@ -2292,35 +2245,6 @@ find_receive_context (void *cls,
 }
 
 
-/**
- * Message tokenizer has broken up an incomming message. Pass it on
- * to the service.
- *
- * @param cls the `struct Plugin *`
- * @param client the `struct GNUNET_ATS_Session *`
- * @param hdr the actual message
- * @return #GNUNET_OK (always)
- */
-static int
-process_inbound_tokenized_messages (void *cls,
-                                    void *client,
-                                    const struct GNUNET_MessageHeader *hdr)
-{
-  struct Plugin *plugin = cls;
-  struct GNUNET_ATS_Session *session = client;
-
-  if (GNUNET_YES == session->in_destroy)
-    return GNUNET_OK;
-  reschedule_session_timeout (session);
-  session->flow_delay_for_other_peer
-    = plugin->env->receive (plugin->env->cls,
-                            session->address,
-                            session,
-                            hdr);
-  return GNUNET_OK;
-}
-
-
 /**
  * Functions with this signature are called whenever we need to close
  * a session due to a disconnect or failure to establish a connection.
@@ -2438,6 +2362,157 @@ udp_disconnect_session (void *cls,
 }
 
 
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK message.
+ *
+ * @param plugin the UDP plugin
+ * @param msg the (presumed) UDP ACK message
+ * @param udp_addr sender address
+ * @param udp_addr_len number of bytes in @a udp_addr
+ */
+static void
+read_process_ack (struct Plugin *plugin,
+                  const struct GNUNET_MessageHeader *msg,
+                  const union UdpAddress *udp_addr,
+                  socklen_t udp_addr_len)
+{
+  const struct GNUNET_MessageHeader *ack;
+  const struct UDP_ACK_Message *udp_ack;
+  struct GNUNET_HELLO_Address *address;
+  struct GNUNET_ATS_Session *s;
+  struct GNUNET_TIME_Relative flow_delay;
+
+  /* check message format */
+  if (ntohs (msg->size)
+      < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  udp_ack = (const struct UDP_ACK_Message *) msg;
+  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+  if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
+  {
+    GNUNET_break_op(0);
+    return;
+  }
+
+  /* Locate session */
+  address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
+                                           PLUGIN_NAME,
+                                           udp_addr,
+                                           udp_addr_len,
+                                           GNUNET_HELLO_ADDRESS_INFO_NONE);
+  s = udp_plugin_lookup_session (plugin,
+                                 address);
+  if (NULL == s)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "UDP session of address %s for ACK not found\n",
+         udp_address_to_string (plugin,
+                                address->address,
+                                address->address_length));
+    GNUNET_HELLO_address_free (address);
+    return;
+  }
+  if (NULL == s->frag_ctx)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+         "Fragmentation context of address %s for ACK (%s) not found\n",
+         udp_address_to_string (plugin,
+                                address->address,
+                                address->address_length),
+         GNUNET_FRAGMENT_print_ack (ack));
+    GNUNET_HELLO_address_free (address);
+    return;
+  }
+  GNUNET_HELLO_address_free (address);
+
+  /* evaluate flow delay: how long should we wait between messages? */
+  if (UINT32_MAX == ntohl (udp_ack->delay))
+  {
+    /* Other peer asked for us to terminate the session */
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Asked to disconnect UDP session of %s\n",
+         GNUNET_i2s (&udp_ack->sender));
+    udp_disconnect_session (plugin,
+                            s);
+    return;
+  }
+  flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
+  if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "We received a sending delay of %s for %s\n",
+         GNUNET_STRINGS_relative_time_to_string (flow_delay,
+                                                 GNUNET_YES),
+         GNUNET_i2s (&udp_ack->sender));
+  else
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "We received a sending delay of %s for %s\n",
+         GNUNET_STRINGS_relative_time_to_string (flow_delay,
+                                                 GNUNET_YES),
+         GNUNET_i2s (&udp_ack->sender));
+  /* Flow delay is for the reassembled packet, however, our delay
+     is per packet, so we need to adjust: */
+  s->flow_delay_from_other_peer = flow_delay;
+
+  /* Handle ACK */
+  if (GNUNET_OK !=
+      GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
+                                   ack))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+         (unsigned int) ntohs (msg->size),
+         GNUNET_i2s (&udp_ack->sender),
+         udp_address_to_string (plugin,
+                                udp_addr,
+                                udp_addr_len));
+    /* Expect more ACKs to arrive */
+    return;
+  }
+
+  /* Remove fragmented message after successful sending */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Message from %s at %s full ACK'ed\n",
+       GNUNET_i2s (&udp_ack->sender),
+       udp_address_to_string (plugin,
+                              udp_addr,
+                              udp_addr_len));
+  fragmented_message_done (s->frag_ctx,
+                           GNUNET_OK);
+}
+
+
+/**
+ * Message tokenizer has broken up an incomming message. Pass it on
+ * to the service.
+ *
+ * @param cls the `struct Plugin *`
+ * @param client the `struct GNUNET_ATS_Session *`
+ * @param hdr the actual message
+ * @return #GNUNET_OK (always)
+ */
+static int
+process_inbound_tokenized_messages (void *cls,
+                                    void *client,
+                                    const struct GNUNET_MessageHeader *hdr)
+{
+  struct Plugin *plugin = cls;
+  struct GNUNET_ATS_Session *session = client;
+
+  if (GNUNET_YES == session->in_destroy)
+    return GNUNET_OK;
+  reschedule_session_timeout (session);
+  session->flow_delay_for_other_peer
+    = plugin->env->receive (plugin->env->cls,
+                            session->address,
+                            session,
+                            hdr);
+  return GNUNET_OK;
+}
+
+
 /**
  * Destroy a session, plugin is being unloaded.
  *
@@ -2487,11 +2562,9 @@ udp_disconnect (void *cls,
  * Session was idle, so disconnect it.
  *
  * @param cls the `struct GNUNET_ATS_Session` to time out
- * @param tc scheduler context
  */
 static void
-session_timeout (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
+session_timeout (void *cls)
 {
   struct GNUNET_ATS_Session *s = cls;
   struct Plugin *plugin = s->plugin;
@@ -2843,10 +2916,13 @@ ack_proc (void *cls,
                               GNUNET_NO);
     return;
   }
-  if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
+  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
+      s->flow_delay_for_other_peer.rel_value_us)
+    delay = UINT32_MAX;
+  else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
     delay = s->flow_delay_for_other_peer.rel_value_us;
   else
-    delay = UINT32_MAX;
+    delay = UINT32_MAX - 1; /* largest value we can communicate */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending ACK to `%s' including delay of %s\n",
        udp_address_to_string (plugin,
@@ -3053,9 +3129,6 @@ udp_select_read (struct Plugin *plugin,
     return;
   }
 
-
-
-
   msg = (const struct GNUNET_MessageHeader *) buf;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "UDP received %u-byte message from `%s' type %u\n",
@@ -3066,7 +3139,7 @@ udp_select_read (struct Plugin *plugin,
   if (size != ntohs (msg->size))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
-         "UDP malformed message header from %s\n",
+         "UDP malformed message (size %u) header from %s\n",
          (unsigned int) size,
          GNUNET_a2s (sa,
                      fromlen));
@@ -3437,19 +3510,17 @@ udp_select_send (struct Plugin *plugin,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context
  */
 static void
-udp_plugin_select_v4 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v4 (void *cls)
 {
   struct Plugin *plugin = cls;
-
+  const struct GNUNET_SCHEDULER_TaskContext *tc;
+  
   plugin->select_task_v4 = NULL;
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
   if (NULL == plugin->sockv4)
     return;
+  tc = GNUNET_SCHEDULER_get_task_context ();
   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
                                    plugin->sockv4)))
@@ -3467,19 +3538,17 @@ udp_plugin_select_v4 (void *cls,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context
  */
 static void
-udp_plugin_select_v6 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v6 (void *cls)
 {
   struct Plugin *plugin = cls;
+  const struct GNUNET_SCHEDULER_TaskContext *tc;
 
   plugin->select_task_v6 = NULL;
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
   if (NULL == plugin->sockv6)
     return;
+  tc = GNUNET_SCHEDULER_get_task_context ();
   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
                                     plugin->sockv6)) )