REST/NAMESTORE: rework API
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
index ce4d3027778462b3953fc64ba64cc0a65f8d8081..b05192e063a52324f78585174d2bd7c357bda3f1 100644 (file)
@@ -1,21 +1,21 @@
 /*
  This file is part of GNUnet
- Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2010-2017 GNUnet e.V.
 
- GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
 
  GNUnet is distributed in the hope that it will be useful, but
  WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- General Public License for more details.
Affero General Public License for more details.
 
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING.  If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ You should have received a copy of the GNU Affero General Public License
+ along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
  */
 
 /**
@@ -30,7 +30,7 @@
 #include "gnunet_hello_lib.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_fragmentation_lib.h"
-#include "gnunet_nat_lib.h"
+#include "gnunet_nat_service.h"
 #include "gnunet_protocols.h"
 #include "gnunet_resolver_service.h"
 #include "gnunet_signatures.h"
@@ -151,13 +151,18 @@ struct PrettyPrinterContext
 /**
  * Session with another peer.
  */
-struct Session
+struct GNUNET_ATS_Session
 {
   /**
    * Which peer is this session for?
    */
   struct GNUNET_PeerIdentity target;
 
+  /**
+   * Tokenizer for inbound messages.
+   */
+  struct GNUNET_MessageStreamTokenizer *mst;
+
   /**
    * Plugin this session belongs to.
    */
@@ -174,9 +179,11 @@ struct Session
   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
 
   /**
-   * Desired delay for next sending we received from other peer
+   * Desired delay for transmissions we received from other peer.
+   * This is for full messages, the value needs to be adjusted for
+   * fragmented messages.
    */
-  struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+  struct GNUNET_TIME_Relative flow_delay_from_other_peer;
 
   /**
    * Session timeout task
@@ -188,6 +195,11 @@ struct Session
    */
   struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * What time did we last transmit?
+   */
+  struct GNUNET_TIME_Absolute last_transmit_time;
+
   /**
    * expected delay for ACKs
    */
@@ -224,7 +236,7 @@ struct Session
   /**
    * Network type of the address.
    */
-  enum GNUNET_ATS_Network_Type scope;
+  enum GNUNET_NetworkType scope;
 
   /**
    * Is this session about to be destroyed (sometimes we cannot
@@ -280,7 +292,7 @@ struct DefragContext
   /**
    * Network type the address belongs to.
    */
-  enum GNUNET_ATS_Network_Type network_type;
+  enum GNUNET_NetworkType network_type;
 
   /**
    * Has the @e sender field been initialized yet?
@@ -317,7 +329,7 @@ struct UDP_FragmentationContext
   /**
    * The session this fragmentation context belongs to
    */
-  struct Session *session;
+  struct GNUNET_ATS_Session *session;
 
   /**
    * Function to call upon completion of the transmission.
@@ -329,6 +341,25 @@ struct UDP_FragmentationContext
    */
   void *cont_cls;
 
+  /**
+   * Start time.
+   */
+  struct GNUNET_TIME_Absolute start_time;
+
+  /**
+   * Transmission time for the next fragment.  Incremented by
+   * the @e flow_delay_from_other_peer for each fragment when
+   * we setup the fragments.
+   */
+  struct GNUNET_TIME_Absolute next_frag_time;
+
+  /**
+   * Desired delay for transmissions we received from other peer.
+   * Adjusted to be per fragment (UDP_MTU), even though on the
+   * wire it was for "full messages".
+   */
+  struct GNUNET_TIME_Relative flow_delay_from_other_peer;
+
   /**
    * Message timeout
    */
@@ -371,7 +402,7 @@ struct UDP_MessageWrapper
   /**
    * Session this message belongs to
    */
-  struct Session *session;
+  struct GNUNET_ATS_Session *session;
 
   /**
    * DLL of messages, previous element
@@ -418,6 +449,17 @@ struct UDP_MessageWrapper
    */
   struct UDP_FragmentationContext *frag_ctx;
 
+  /**
+   * Message enqueue time.
+   */
+  struct GNUNET_TIME_Absolute start_time;
+
+  /**
+   * Desired transmission time for this message, based on the
+   * flow limiting information we got from the other peer.
+   */
+  struct GNUNET_TIME_Absolute transmission_time;
+
   /**
    * Message timeout.
    */
@@ -450,6 +492,8 @@ struct UDP_ACK_Message
 
   /**
    * Desired delay for flow control, in us (in NBO).
+   * A value of UINT32_MAX indicates that the other
+   * peer wants us to disconnect.
    */
   uint32_t delay GNUNET_PACKED;
 
@@ -476,7 +520,7 @@ GNUNET_NETWORK_STRUCT_END
  */
 static void
 notify_session_monitor (struct Plugin *plugin,
-                        struct Session *session,
+                        struct GNUNET_ATS_Session *session,
                         enum GNUNET_TRANSPORT_SessionState state)
 {
   struct GNUNET_TRANSPORT_SessionInfo info;
@@ -508,7 +552,7 @@ notify_session_monitor (struct Plugin *plugin,
  *
  * @param cls the `struct Plugin` with the monitor callback (`sic`)
  * @param peer peer we send information about
- * @param value our `struct Session` to send information about
+ * @param value our `struct GNUNET_ATS_Session` to send information about
  * @return #GNUNET_OK (continue to iterate)
  */
 static int
@@ -517,7 +561,7 @@ send_session_info_iter (void *cls,
                         void *value)
 {
   struct Plugin *plugin = cls;
-  struct Session *session = value;
+  struct GNUNET_ATS_Session *session = value;
 
   notify_session_monitor (plugin,
                           session,
@@ -572,7 +616,7 @@ udp_plugin_setup_monitor (void *cls,
  * @param s session to free
  */
 static void
-free_session (struct Session *s)
+free_session (struct GNUNET_ATS_Session *s)
 {
   if (NULL != s->address)
   {
@@ -587,6 +631,11 @@ free_session (struct Session *s)
     GNUNET_free (s->frag_ctx);
     s->frag_ctx = NULL;
   }
+  if (NULL != s->mst)
+  {
+    GNUNET_MST_destroy (s->mst);
+    s->mst = NULL;
+  }
   GNUNET_free (s);
 }
 
@@ -613,14 +662,74 @@ udp_query_keepalive_factor (void *cls)
  * @param session the session
  * @return the network type
  */
-static enum GNUNET_ATS_Network_Type
-udp_get_network (void *cls,
-                 struct Session *session)
+static enum GNUNET_NetworkType
+udp_plugin_get_network (void *cls,
+                        struct GNUNET_ATS_Session *session)
 {
   return session->scope;
 }
 
 
+/**
+ * Function obtain the network type for an address.
+ *
+ * @param cls closure (`struct Plugin *`)
+ * @param address the address
+ * @return the network type
+ */
+static enum GNUNET_NetworkType
+udp_plugin_get_network_for_address (void *cls,
+                                    const struct GNUNET_HELLO_Address *address)
+{
+  struct Plugin *plugin = cls;
+  size_t addrlen;
+  struct sockaddr_in a4;
+  struct sockaddr_in6 a6;
+  const struct IPv4UdpAddress *u4;
+  const struct IPv6UdpAddress *u6;
+  const void *sb;
+  size_t sbs;
+
+  addrlen = address->address_length;
+  if (addrlen == sizeof(struct IPv6UdpAddress))
+  {
+    GNUNET_assert (NULL != address->address); /* make static analysis happy */
+    u6 = address->address;
+    memset (&a6, 0, sizeof(a6));
+#if HAVE_SOCKADDR_IN_SIN_LEN
+    a6.sin6_len = sizeof (a6);
+#endif
+    a6.sin6_family = AF_INET6;
+    a6.sin6_port = u6->u6_port;
+    GNUNET_memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
+    sb = &a6;
+    sbs = sizeof(a6);
+  }
+  else if (addrlen == sizeof(struct IPv4UdpAddress))
+  {
+    GNUNET_assert (NULL != address->address); /* make static analysis happy */
+    u4 = address->address;
+    memset (&a4, 0, sizeof(a4));
+#if HAVE_SOCKADDR_IN_SIN_LEN
+    a4.sin_len = sizeof (a4);
+#endif
+    a4.sin_family = AF_INET;
+    a4.sin_port = u4->u4_port;
+    a4.sin_addr.s_addr = u4->ipv4_addr;
+    sb = &a4;
+    sbs = sizeof(a4);
+  }
+  else
+  {
+    GNUNET_break (0);
+    return GNUNET_NT_UNSPECIFIED;
+  }
+  return plugin->env->get_address_type (plugin->env->cls,
+                                        sb,
+                                        sbs);
+}
+
+
 /* ******************* Event loop ******************** */
 
 /**
@@ -629,11 +738,9 @@ udp_get_network (void *cls,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
  */
 static void
-udp_plugin_select_v4 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v4 (void *cls);
 
 
 /**
@@ -642,11 +749,9 @@ udp_plugin_select_v4 (void *cls,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
  */
 static void
-udp_plugin_select_v6 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v6 (void *cls);
 
 
 /**
@@ -658,7 +763,9 @@ static void
 schedule_select_v4 (struct Plugin *plugin)
 {
   struct GNUNET_TIME_Relative min_delay;
+  struct GNUNET_TIME_Relative delay;
   struct UDP_MessageWrapper *udpw;
+  struct UDP_MessageWrapper *min_udpw;
 
   if ( (GNUNET_YES == plugin->enable_ipv4) &&
        (NULL != plugin->sockv4) )
@@ -666,11 +773,37 @@ schedule_select_v4 (struct Plugin *plugin)
     /* Find a message ready to send:
      * Flow delay from other peer is expired or not set (0) */
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+    min_udpw = NULL;
     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
-      min_delay = GNUNET_TIME_relative_min (min_delay,
-                                            GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+    {
+      delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
+      if (delay.rel_value_us < min_delay.rel_value_us)
+      {
+        min_delay = delay;
+        min_udpw = udpw;
+      }
+    }
     if (NULL != plugin->select_task_v4)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+    if (NULL != min_udpw)
+    {
+      if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Calculated flow delay for UDPv4 at %s for %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Calculated flow delay for UDPv4 at %s for %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
+      }
+    }
     plugin->select_task_v4
       = GNUNET_SCHEDULER_add_read_net (min_delay,
                                        plugin->sockv4,
@@ -689,17 +822,45 @@ static void
 schedule_select_v6 (struct Plugin *plugin)
 {
   struct GNUNET_TIME_Relative min_delay;
+  struct GNUNET_TIME_Relative delay;
   struct UDP_MessageWrapper *udpw;
+  struct UDP_MessageWrapper *min_udpw;
 
   if ( (GNUNET_YES == plugin->enable_ipv6) &&
        (NULL != plugin->sockv6) )
   {
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+    min_udpw = NULL;
     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
-      min_delay = GNUNET_TIME_relative_min (min_delay,
-                                            GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+    {
+      delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
+      if (delay.rel_value_us < min_delay.rel_value_us)
+      {
+        min_delay = delay;
+        min_udpw = udpw;
+      }
+    }
     if (NULL != plugin->select_task_v6)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
+    if (NULL != min_udpw)
+    {
+      if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Calculated flow delay for UDPv6 at %s for %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Calculated flow delay for UDPv6 at %s for %s\n",
+                    GNUNET_STRINGS_relative_time_to_string (min_delay,
+                                                            GNUNET_YES),
+                    GNUNET_i2s (&min_udpw->session->target));
+      }
+    }
     plugin->select_task_v6
       = GNUNET_SCHEDULER_add_read_net (min_delay,
                                        plugin->sockv6,
@@ -1094,31 +1255,45 @@ udp_plugin_check_address (void *cls,
 
   if (sizeof(struct IPv4UdpAddress) == addrlen)
   {
+    struct sockaddr_in s4;
+
     v4 = (const struct IPv4UdpAddress *) addr;
     if (GNUNET_OK != check_port (plugin,
                                  ntohs (v4->u4_port)))
       return GNUNET_SYSERR;
+    memset (&s4, 0, sizeof (s4));
+    s4.sin_family = AF_INET;
+#if HAVE_SOCKADDR_IN_SIN_LEN
+    s4.sin_len = sizeof (s4);
+#endif
+    s4.sin_port = v4->u4_port;
+    s4.sin_addr.s_addr = v4->ipv4_addr;
+
     if (GNUNET_OK !=
-        GNUNET_NAT_test_address (plugin->nat,
-                                 &v4->ipv4_addr,
-                                 sizeof (struct in_addr)))
+       GNUNET_NAT_test_address (plugin->nat,
+                                &s4,
+                                sizeof (struct sockaddr_in)))
       return GNUNET_SYSERR;
   }
   else if (sizeof(struct IPv6UdpAddress) == addrlen)
   {
+    struct sockaddr_in6 s6;
+
     v6 = (const struct IPv6UdpAddress *) addr;
     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-    if (GNUNET_OK != check_port (plugin,
-                                 ntohs (v6->u6_port)))
-      return GNUNET_SYSERR;
+      return GNUNET_OK; /* plausible, if unlikely... */
+    memset (&s6, 0, sizeof (s6));
+    s6.sin6_family = AF_INET6;
+#if HAVE_SOCKADDR_IN_SIN_LEN
+    s6.sin6_len = sizeof (s6);
+#endif
+    s6.sin6_port = v6->u6_port;
+    s6.sin6_addr = v6->ipv6_addr;
+
     if (GNUNET_OK !=
-        GNUNET_NAT_test_address (plugin->nat,
-                                 &v6->ipv6_addr,
-                                 sizeof (struct in6_addr)))
+       GNUNET_NAT_test_address (plugin->nat,
+                                &s6,
+                                sizeof(struct sockaddr_in6)))
       return GNUNET_SYSERR;
   }
   else
@@ -1134,14 +1309,19 @@ udp_plugin_check_address (void *cls,
  * Our external IP address/port mapping has changed.
  *
  * @param cls closure, the `struct Plugin`
+ * @param app_ctx[in,out] location where the app can store stuff
+ *                  on add and retrieve it on remove
  * @param add_remove #GNUNET_YES to mean the new public IP address,
  *                   #GNUNET_NO to mean the previous (now invalid) one
+ * @param ac address class the address belongs to
  * @param addr either the previous or the new public IP address
  * @param addrlen actual length of the @a addr
  */
 static void
 udp_nat_port_map_callback (void *cls,
+                          void **app_ctx,
                            int add_remove,
+                          enum GNUNET_NAT_AddressClass ac,
                            const struct sockaddr *addr,
                            socklen_t addrlen)
 {
@@ -1152,6 +1332,7 @@ udp_nat_port_map_callback (void *cls,
   void *arg;
   size_t args;
 
+  (void) app_ctx;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        (GNUNET_YES == add_remove)
        ? "NAT notification to add address `%s'\n"
@@ -1168,10 +1349,7 @@ udp_nat_port_map_callback (void *cls,
       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
       i4 = (const struct sockaddr_in *) addr;
       if (0 == ntohs (i4->sin_port))
-      {
-        GNUNET_break (0);
-        return;
-      }
+        return; /* Port = 0 means unmapped, ignore these for UDP. */
       memset (&u4,
               0,
               sizeof(u4));
@@ -1189,10 +1367,7 @@ udp_nat_port_map_callback (void *cls,
       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
       i6 = (const struct sockaddr_in6 *) addr;
       if (0 == ntohs (i6->sin6_port))
-      {
-        GNUNET_break (0);
-        return;
-      }
+        return; /* Port = 0 means unmapped, ignore these for UDP. */
       memset (&u6,
               0,
               sizeof(u6));
@@ -1208,6 +1383,7 @@ udp_nat_port_map_callback (void *cls,
     return;
   }
   /* modify our published address list */
+  /* TODO: use 'ac' here in the future... */
   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
                                            PLUGIN_NAME,
                                            arg,
@@ -1226,12 +1402,12 @@ udp_nat_port_map_callback (void *cls,
 /**
  * Closure for #session_cmp_it().
  */
-struct SessionCompareContext
+struct GNUNET_ATS_SessionCompareContext
 {
   /**
    * Set to session matching the address.
    */
-  struct Session *res;
+  struct GNUNET_ATS_Session *res;
 
   /**
    * Address we are looking for.
@@ -1243,9 +1419,9 @@ struct SessionCompareContext
 /**
  * Find a session with a matching address.
  *
- * @param cls the `struct SessionCompareContext *`
+ * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
  * @param key peer identity (unused)
- * @param value the `struct Session *`
+ * @param value the `struct GNUNET_ATS_Session *`
  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
  */
 static int
@@ -1253,8 +1429,8 @@ session_cmp_it (void *cls,
                 const struct GNUNET_PeerIdentity *key,
                 void *value)
 {
-  struct SessionCompareContext *cctx = cls;
-  struct Session *s = value;
+  struct GNUNET_ATS_SessionCompareContext *cctx = cls;
+  struct GNUNET_ATS_Session *s = value;
 
   if (0 == GNUNET_HELLO_address_cmp (s->address,
                                      cctx->address))
@@ -1276,14 +1452,14 @@ session_cmp_it (void *cls,
  * @param address the address we should locate the session by
  * @return the session if it exists, or NULL if it is not found
  */
-static struct Session *
+static struct GNUNET_ATS_Session *
 udp_plugin_lookup_session (void *cls,
                            const struct GNUNET_HELLO_Address *address)
 {
   struct Plugin *plugin = cls;
   const struct IPv6UdpAddress *udp_a6;
   const struct IPv4UdpAddress *udp_a4;
-  struct SessionCompareContext cctx;
+  struct GNUNET_ATS_SessionCompareContext cctx;
 
   if (NULL == address->address)
   {
@@ -1349,7 +1525,7 @@ udp_plugin_lookup_session (void *cls,
  * @param s session to reschedule timeout activity for
  */
 static void
-reschedule_session_timeout (struct Session *s)
+reschedule_session_timeout (struct GNUNET_ATS_Session *s)
 {
   if (GNUNET_YES == s->in_destroy)
     return;
@@ -1371,7 +1547,7 @@ reschedule_session_timeout (struct Session *s)
 static void
 udp_plugin_update_session_timeout (void *cls,
                                    const struct GNUNET_PeerIdentity *peer,
-                                   struct Session *session)
+                                   struct GNUNET_ATS_Session *session)
 {
   struct Plugin *plugin = cls;
 
@@ -1402,7 +1578,7 @@ static void
 dequeue (struct Plugin *plugin,
          struct UDP_MessageWrapper *udpw)
 {
-  struct Session *session = udpw->session;
+  struct GNUNET_ATS_Session *session = udpw->session;
 
   if (plugin->bytes_in_buffer < udpw->msg_size)
   {
@@ -1454,14 +1630,15 @@ static void
 enqueue (struct Plugin *plugin,
          struct UDP_MessageWrapper *udpw)
 {
-  struct Session *session = udpw->session;
+  struct GNUNET_ATS_Session *session = udpw->session;
 
   if (GNUNET_YES == session->in_destroy)
   {
     GNUNET_break (0);
+    GNUNET_free (udpw);
     return;
   }
-  if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
+  if (plugin->bytes_in_buffer > INT64_MAX - udpw->msg_size)
   {
     GNUNET_break (0);
   }
@@ -1520,10 +1697,11 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
                          int result)
 {
   struct Plugin *plugin = frag_ctx->plugin;
-  struct Session *s = frag_ctx->session;
+  struct GNUNET_ATS_Session *s = frag_ctx->session;
   struct UDP_MessageWrapper *udpw;
   struct UDP_MessageWrapper *tmp;
   size_t overhead;
+  struct GNUNET_TIME_Relative delay;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%p: Fragmented message removed with result %s\n",
@@ -1534,6 +1712,24 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
   else
     overhead = frag_ctx->on_wire_size;
+  delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
+  if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Fragmented message acknowledged after %s (expected at %s)\n",
+         GNUNET_STRINGS_relative_time_to_string (delay,
+                                                 GNUNET_YES),
+         GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Fragmented message acknowledged after %s (expected at %s)\n",
+         GNUNET_STRINGS_relative_time_to_string (delay,
+                                                 GNUNET_YES),
+         GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
+  }
+
   if (NULL != frag_ctx->cont)
     frag_ctx->cont (frag_ctx->cont_cls,
                     &s->target,
@@ -1651,6 +1847,10 @@ qc_fragment_sent (void *cls,
   GNUNET_assert (NULL != udpw->frag_ctx);
   if (GNUNET_OK == result)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Fragment of message with %u bytes transmitted to %s\n",
+                (unsigned int) udpw->payload_size,
+                GNUNET_i2s (&udpw->session->target));
     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
     GNUNET_STATISTICS_update (plugin->env->stats,
                               "# UDP, fragmented msgs, fragments, sent, success",
@@ -1663,6 +1863,10 @@ qc_fragment_sent (void *cls,
   }
   else
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Failed to transmit fragment of message with %u bytes to %s\n",
+                (unsigned int) udpw->payload_size,
+                GNUNET_i2s (&udpw->session->target));
     fragmented_message_done (udpw->frag_ctx,
                              GNUNET_SYSERR);
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1693,7 +1897,7 @@ enqueue_fragment (void *cls,
   struct UDP_FragmentationContext *frag_ctx = cls;
   struct Plugin *plugin = frag_ctx->plugin;
   struct UDP_MessageWrapper *udpw;
-  struct Session *session = frag_ctx->session;
+  struct GNUNET_ATS_Session *session = frag_ctx->session;
   size_t msg_len = ntohs (msg->size);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1705,15 +1909,20 @@ enqueue_fragment (void *cls,
   udpw->msg_size = msg_len;
   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
   udpw->timeout = frag_ctx->timeout;
+  udpw->start_time = frag_ctx->start_time;
+  udpw->transmission_time = frag_ctx->next_frag_time;
+  frag_ctx->next_frag_time
+    = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
+                                frag_ctx->flow_delay_from_other_peer);
   udpw->frag_ctx = frag_ctx;
   udpw->qc = &qc_fragment_sent;
   udpw->qc_cls = plugin;
-  memcpy (udpw->msg_buf,
-          msg,
-          msg_len);
+  GNUNET_memcpy (udpw->msg_buf,
+                msg,
+                msg_len);
   enqueue (plugin,
            udpw);
-  if (sizeof (struct IPv4UdpAddress) == session->address->address_length)
+  if (session->address->address_length == sizeof (struct IPv4UdpAddress))
     schedule_select_v4 (plugin);
   else
     schedule_select_v6 (plugin);
@@ -1735,6 +1944,7 @@ qc_message_sent (void *cls,
 {
   struct Plugin *plugin = cls;
   size_t overhead;
+  struct GNUNET_TIME_Relative delay;
 
   if (udpw->msg_size >= udpw->payload_size)
     overhead = udpw->msg_size - udpw->payload_size;
@@ -1742,11 +1952,28 @@ qc_message_sent (void *cls,
     overhead = udpw->msg_size;
 
   if (NULL != udpw->cont)
+  {
+    delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
+    if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    {
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "Message sent via UDP with delay of %s\n",
+           GNUNET_STRINGS_relative_time_to_string (delay,
+                                                   GNUNET_YES));
+    }
+    else
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Message sent via UDP with delay of %s\n",
+           GNUNET_STRINGS_relative_time_to_string (delay,
+                                                   GNUNET_YES));
+    }
     udpw->cont (udpw->cont_cls,
                 &udpw->session->target,
                 result,
                 udpw->payload_size,
                 overhead);
+  }
   if (GNUNET_OK == result)
   {
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1817,7 +2044,7 @@ qc_message_sent (void *cls,
  */
 static ssize_t
 udp_plugin_send (void *cls,
-                 struct Session *s,
+                 struct GNUNET_ATS_Session *s,
                  const char *msgbuf,
                  size_t msgbuf_size,
                  unsigned int priority,
@@ -1831,6 +2058,7 @@ udp_plugin_send (void *cls,
   struct UDP_MessageWrapper *udpw;
   struct UDPMessage *udp;
   char mbuf[udpmlen] GNUNET_ALIGN;
+  struct GNUNET_TIME_Relative latency;
 
   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
        (NULL == plugin->sockv6) )
@@ -1838,7 +2066,7 @@ udp_plugin_send (void *cls,
   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
        (NULL == plugin->sockv4) )
     return GNUNET_SYSERR;
-  if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  if (udpmlen >= GNUNET_MAX_MESSAGE_SIZE)
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
@@ -1881,16 +2109,21 @@ udp_plugin_send (void *cls,
     udpw->msg_buf = (char *) &udpw[1];
     udpw->msg_size = udpmlen; /* message size with UDP overhead */
     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
+    udpw->start_time = GNUNET_TIME_absolute_get ();
     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
+    udpw->transmission_time = s->last_transmit_time;
+    s->last_transmit_time
+      = GNUNET_TIME_absolute_add (s->last_transmit_time,
+                                  s->flow_delay_from_other_peer);
     udpw->cont = cont;
     udpw->cont_cls = cont_cls;
     udpw->frag_ctx = NULL;
     udpw->qc = &qc_message_sent;
     udpw->qc_cls = plugin;
-    memcpy (udpw->msg_buf,
+    GNUNET_memcpy (udpw->msg_buf,
             udp,
             sizeof (struct UDPMessage));
-    memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
+    GNUNET_memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
             msgbuf,
             msgbuf_size);
     enqueue (plugin,
@@ -1901,15 +2134,19 @@ udp_plugin_send (void *cls,
                               GNUNET_NO);
     GNUNET_STATISTICS_update (plugin->env->stats,
                               "# UDP, unfragmented bytes payload queued total",
-                              udpw->payload_size,
+                              msgbuf_size,
                               GNUNET_NO);
+    if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+      schedule_select_v4 (plugin);
+    else
+      schedule_select_v6 (plugin);
   }
   else
   {
     /* fragmented message */
     if (NULL != s->frag_ctx)
       return GNUNET_SYSERR;
-    memcpy (&udp[1],
+    GNUNET_memcpy (&udp[1],
             msgbuf,
             msgbuf_size);
     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
@@ -1917,6 +2154,12 @@ udp_plugin_send (void *cls,
     frag_ctx->session = s;
     frag_ctx->cont = cont;
     frag_ctx->cont_cls = cont_cls;
+    frag_ctx->start_time = GNUNET_TIME_absolute_get ();
+    frag_ctx->next_frag_time = s->last_transmit_time;
+    frag_ctx->flow_delay_from_other_peer
+      = GNUNET_TIME_relative_divide (s->flow_delay_from_other_peer,
+                                     1 + (msgbuf_size /
+                                          UDP_MTU));
     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
@@ -1929,6 +2172,23 @@ udp_plugin_send (void *cls,
                                                      &enqueue_fragment,
                                                      frag_ctx);
     s->frag_ctx = frag_ctx;
+    s->last_transmit_time = frag_ctx->next_frag_time;
+    latency = GNUNET_TIME_absolute_get_remaining (s->last_transmit_time);
+    if (latency.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
+           GNUNET_STRINGS_relative_time_to_string (latency,
+                                                   GNUNET_YES),
+           GNUNET_i2s (&s->target),
+           (unsigned int) s->msgs_in_queue);
+    else
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
+           GNUNET_STRINGS_relative_time_to_string (latency,
+                                                   GNUNET_YES),
+           GNUNET_i2s (&s->target),
+           (unsigned int) s->msgs_in_queue);
+
     GNUNET_STATISTICS_update (plugin->env->stats,
                               "# UDP, fragmented messages active",
                               1,
@@ -1945,114 +2205,10 @@ udp_plugin_send (void *cls,
   notify_session_monitor (s->plugin,
                           s,
                           GNUNET_TRANSPORT_SS_UPDATE);
-  if (s->address->address_length == sizeof (struct IPv4UdpAddress))
-    schedule_select_v4 (plugin);
-  else
-    schedule_select_v6 (plugin);
   return udpmlen;
 }
 
 
-/**
- * Handle an ACK message.
- *
- * @param plugin the UDP plugin
- * @param msg the (presumed) UDP ACK message
- * @param udp_addr sender address
- * @param udp_addr_len number of bytes in @a udp_addr
- */
-static void
-read_process_ack (struct Plugin *plugin,
-                  const struct GNUNET_MessageHeader *msg,
-                  const union UdpAddress *udp_addr,
-                  socklen_t udp_addr_len)
-{
-  const struct GNUNET_MessageHeader *ack;
-  const struct UDP_ACK_Message *udp_ack;
-  struct GNUNET_HELLO_Address *address;
-  struct Session *s;
-  struct GNUNET_TIME_Relative flow_delay;
-
-  if (ntohs (msg->size)
-      < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  udp_ack = (const struct UDP_ACK_Message *) msg;
-  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
-  if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
-  {
-    GNUNET_break_op(0);
-    return;
-  }
-  address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
-                                           PLUGIN_NAME,
-                                           udp_addr,
-                                           udp_addr_len,
-                                           GNUNET_HELLO_ADDRESS_INFO_NONE);
-  s = udp_plugin_lookup_session (plugin,
-                                 address);
-  if (NULL == s)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "UDP session of address %s for ACK not found\n",
-         udp_address_to_string (plugin,
-                                address->address,
-                                address->address_length));
-    GNUNET_HELLO_address_free (address);
-    return;
-  }
-  if (NULL == s->frag_ctx)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-         "Fragmentation context of address %s for ACK (%s) not found\n",
-         udp_address_to_string (plugin,
-                                address->address,
-                                address->address_length),
-         GNUNET_FRAGMENT_print_ack (ack));
-    GNUNET_HELLO_address_free (address);
-    return;
-  }
-  GNUNET_HELLO_address_free (address);
-
-  flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "We received a sending delay of %s for %s\n",
-       GNUNET_STRINGS_relative_time_to_string (flow_delay,
-                                               GNUNET_YES),
-       GNUNET_i2s (&udp_ack->sender));
-  s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
-
-
-  if (GNUNET_OK !=
-      GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
-                                   ack))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
-         (unsigned int) ntohs (msg->size),
-         GNUNET_i2s (&udp_ack->sender),
-         udp_address_to_string (plugin,
-                                udp_addr,
-                                udp_addr_len));
-    /* Expect more ACKs to arrive */
-    return;
-  }
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Message from %s at %s full ACK'ed\n",
-       GNUNET_i2s (&udp_ack->sender),
-       udp_address_to_string (plugin,
-                              udp_addr,
-                              udp_addr_len));
-
-  /* Remove fragmented message after successful sending */
-  fragmented_message_done (s->frag_ctx,
-                           GNUNET_OK);
-}
-
-
 /* ********************** Receiving ********************** */
 
 
@@ -2069,7 +2225,7 @@ struct FindReceiveContext
   /**
    * Session associated with this context.
    */
-  struct Session *session;
+  struct GNUNET_ATS_Session *session;
 
   /**
    * Address to find.
@@ -2115,35 +2271,6 @@ find_receive_context (void *cls,
 }
 
 
-/**
- * Message tokenizer has broken up an incomming message. Pass it on
- * to the service.
- *
- * @param cls the `struct Plugin *`
- * @param client the `struct Session *`
- * @param hdr the actual message
- * @return #GNUNET_OK (always)
- */
-static int
-process_inbound_tokenized_messages (void *cls,
-                                    void *client,
-                                    const struct GNUNET_MessageHeader *hdr)
-{
-  struct Plugin *plugin = cls;
-  struct Session *session = client;
-
-  if (GNUNET_YES == session->in_destroy)
-    return GNUNET_OK;
-  reschedule_session_timeout (session);
-  session->flow_delay_for_other_peer
-    = plugin->env->receive (plugin->env->cls,
-                            session->address,
-                            session,
-                            hdr);
-  return GNUNET_OK;
-}
-
-
 /**
  * Functions with this signature are called whenever we need to close
  * a session due to a disconnect or failure to establish a connection.
@@ -2154,7 +2281,7 @@ process_inbound_tokenized_messages (void *cls,
  */
 static int
 udp_disconnect_session (void *cls,
-                        struct Session *s)
+                        struct GNUNET_ATS_Session *s)
 {
   struct Plugin *plugin = cls;
   struct UDP_MessageWrapper *udpw;
@@ -2261,6 +2388,155 @@ udp_disconnect_session (void *cls,
 }
 
 
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK message.
+ *
+ * @param plugin the UDP plugin
+ * @param msg the (presumed) UDP ACK message
+ * @param udp_addr sender address
+ * @param udp_addr_len number of bytes in @a udp_addr
+ */
+static void
+read_process_ack (struct Plugin *plugin,
+                  const struct GNUNET_MessageHeader *msg,
+                  const union UdpAddress *udp_addr,
+                  socklen_t udp_addr_len)
+{
+  const struct GNUNET_MessageHeader *ack;
+  const struct UDP_ACK_Message *udp_ack;
+  struct GNUNET_HELLO_Address *address;
+  struct GNUNET_ATS_Session *s;
+  struct GNUNET_TIME_Relative flow_delay;
+
+  /* check message format */
+  if (ntohs (msg->size)
+      < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  udp_ack = (const struct UDP_ACK_Message *) msg;
+  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+  if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
+  {
+    GNUNET_break_op(0);
+    return;
+  }
+
+  /* Locate session */
+  address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
+                                           PLUGIN_NAME,
+                                           udp_addr,
+                                           udp_addr_len,
+                                           GNUNET_HELLO_ADDRESS_INFO_NONE);
+  s = udp_plugin_lookup_session (plugin,
+                                 address);
+  if (NULL == s)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "UDP session of address %s for ACK not found\n",
+         udp_address_to_string (plugin,
+                                address->address,
+                                address->address_length));
+    GNUNET_HELLO_address_free (address);
+    return;
+  }
+  if (NULL == s->frag_ctx)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+         "Fragmentation context of address %s for ACK (%s) not found\n",
+         udp_address_to_string (plugin,
+                                address->address,
+                                address->address_length),
+         GNUNET_FRAGMENT_print_ack (ack));
+    GNUNET_HELLO_address_free (address);
+    return;
+  }
+  GNUNET_HELLO_address_free (address);
+
+  /* evaluate flow delay: how long should we wait between messages? */
+  if (UINT32_MAX == ntohl (udp_ack->delay))
+  {
+    /* Other peer asked for us to terminate the session */
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Asked to disconnect UDP session of %s\n",
+         GNUNET_i2s (&udp_ack->sender));
+    udp_disconnect_session (plugin,
+                            s);
+    return;
+  }
+  flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
+  if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "We received a sending delay of %s for %s\n",
+         GNUNET_STRINGS_relative_time_to_string (flow_delay,
+                                                 GNUNET_YES),
+         GNUNET_i2s (&udp_ack->sender));
+  else
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "We received a sending delay of %s for %s\n",
+         GNUNET_STRINGS_relative_time_to_string (flow_delay,
+                                                 GNUNET_YES),
+         GNUNET_i2s (&udp_ack->sender));
+  /* Flow delay is for the reassembled packet, however, our delay
+     is per packet, so we need to adjust: */
+  s->flow_delay_from_other_peer = flow_delay;
+
+  /* Handle ACK */
+  if (GNUNET_OK !=
+      GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
+                                   ack))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+         (unsigned int) ntohs (msg->size),
+         GNUNET_i2s (&udp_ack->sender),
+         udp_address_to_string (plugin,
+                                udp_addr,
+                                udp_addr_len));
+    /* Expect more ACKs to arrive */
+    return;
+  }
+
+  /* Remove fragmented message after successful sending */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Message from %s at %s full ACK'ed\n",
+       GNUNET_i2s (&udp_ack->sender),
+       udp_address_to_string (plugin,
+                              udp_addr,
+                              udp_addr_len));
+  fragmented_message_done (s->frag_ctx,
+                           GNUNET_OK);
+}
+
+
+/**
+ * Message tokenizer has broken up an incomming message. Pass it on
+ * to the service.
+ *
+ * @param cls the `struct GNUNET_ATS_Session *`
+ * @param hdr the actual message
+ * @return #GNUNET_OK (always)
+ */
+static int
+process_inbound_tokenized_messages (void *cls,
+                                    const struct GNUNET_MessageHeader *hdr)
+{
+  struct GNUNET_ATS_Session *session = cls;
+  struct Plugin *plugin = session->plugin;
+
+  if (GNUNET_YES == session->in_destroy)
+    return GNUNET_OK;
+  reschedule_session_timeout (session);
+  session->flow_delay_for_other_peer
+    = plugin->env->receive (plugin->env->cls,
+                            session->address,
+                            session,
+                            hdr);
+  return GNUNET_OK;
+}
+
+
 /**
  * Destroy a session, plugin is being unloaded.
  *
@@ -2309,14 +2585,12 @@ udp_disconnect (void *cls,
 /**
  * Session was idle, so disconnect it.
  *
- * @param cls the `struct Session` to time out
- * @param tc scheduler context
+ * @param cls the `struct GNUNET_ATS_Session` to time out
  */
 static void
-session_timeout (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
+session_timeout (void *cls)
 {
-  struct Session *s = cls;
+  struct GNUNET_ATS_Session *s = cls;
   struct Plugin *plugin = s->plugin;
   struct GNUNET_TIME_Relative left;
 
@@ -2356,22 +2630,25 @@ session_timeout (void *cls,
  * @param network_type network type the address belongs to
  * @return NULL on error, otherwise session handle
  */
-static struct Session *
+static struct GNUNET_ATS_Session *
 udp_plugin_create_session (void *cls,
                            const struct GNUNET_HELLO_Address *address,
-                           enum GNUNET_ATS_Network_Type network_type)
+                           enum GNUNET_NetworkType network_type)
 {
   struct Plugin *plugin = cls;
-  struct Session *s;
+  struct GNUNET_ATS_Session *s;
 
-  s = GNUNET_new (struct Session);
+  s = GNUNET_new (struct GNUNET_ATS_Session);
+  s->mst = GNUNET_MST_create (&process_inbound_tokenized_messages,
+                              s);
   s->plugin = plugin;
   s->address = GNUNET_HELLO_address_copy (address);
   s->target = address->peer;
+  s->last_transmit_time = GNUNET_TIME_absolute_get ();
   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
                                                               250);
   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
-  s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
+  s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
@@ -2410,13 +2687,13 @@ udp_plugin_create_session (void *cls,
  * @param address the address
  * @return the session or NULL of max connections exceeded
  */
-static struct Session *
+static struct GNUNET_ATS_Session *
 udp_plugin_get_session (void *cls,
                         const struct GNUNET_HELLO_Address *address)
 {
   struct Plugin *plugin = cls;
-  struct Session *s;
-  enum GNUNET_ATS_Network_Type network_type = GNUNET_ATS_NET_UNSPECIFIED;
+  struct GNUNET_ATS_Session *s;
+  enum GNUNET_NetworkType network_type = GNUNET_NT_UNSPECIFIED;
   const struct IPv4UdpAddress *udp_v4;
   const struct IPv6UdpAddress *udp_v6;
 
@@ -2468,7 +2745,7 @@ udp_plugin_get_session (void *cls,
                                                   (const struct sockaddr *) &v6,
                                                   sizeof (v6));
   }
-  GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
+  GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
   return udp_plugin_create_session (cls,
                                    address,
                                    network_type);
@@ -2489,12 +2766,12 @@ process_udp_message (struct Plugin *plugin,
                      const struct UDPMessage *msg,
                      const union UdpAddress *udp_addr,
                      size_t udp_addr_len,
-                     enum GNUNET_ATS_Network_Type network_type)
+                     enum GNUNET_NetworkType network_type)
 {
-  struct Session *s;
+  struct GNUNET_ATS_Session *s;
   struct GNUNET_HELLO_Address *address;
 
-  GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
+  GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
   if (0 != ntohl (msg->reserved))
   {
     GNUNET_break_op(0);
@@ -2530,12 +2807,11 @@ process_udp_message (struct Plugin *plugin,
   GNUNET_free (address);
 
   s->rc++;
-  GNUNET_SERVER_mst_receive (plugin->mst,
-                             s,
-                             (const char *) &msg[1],
-                             ntohs (msg->header.size) - sizeof(struct UDPMessage),
-                             GNUNET_YES,
-                             GNUNET_NO);
+  GNUNET_MST_from_buffer (s->mst,
+                          (const char *) &msg[1],
+                          ntohs (msg->header.size) - sizeof(struct UDPMessage),
+                          GNUNET_YES,
+                          GNUNET_NO);
   s->rc--;
   if ( (0 == s->rc) &&
        (GNUNET_YES == s->in_destroy) )
@@ -2628,7 +2904,7 @@ ack_proc (void *cls,
   struct UDP_ACK_Message *udp_ack;
   uint32_t delay;
   struct UDP_MessageWrapper *udpw;
-  struct Session *s;
+  struct GNUNET_ATS_Session *s;
   struct GNUNET_HELLO_Address *address;
 
   if (GNUNET_NO == rc->have_sender)
@@ -2665,10 +2941,13 @@ ack_proc (void *cls,
                               GNUNET_NO);
     return;
   }
-  if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
+  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
+      s->flow_delay_for_other_peer.rel_value_us)
+    delay = UINT32_MAX;
+  else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
     delay = s->flow_delay_for_other_peer.rel_value_us;
   else
-    delay = UINT32_MAX;
+    delay = UINT32_MAX - 1; /* largest value we can communicate */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending ACK to `%s' including delay of %s\n",
        udp_address_to_string (plugin,
@@ -2680,6 +2959,7 @@ ack_proc (void *cls,
   udpw->msg_size = msize;
   udpw->payload_size = 0;
   udpw->session = s;
+  udpw->start_time = GNUNET_TIME_absolute_get ();
   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
   udpw->msg_buf = (char *) &udpw[1];
   udpw->qc = &ack_message_sent;
@@ -2689,7 +2969,7 @@ ack_proc (void *cls,
   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
   udp_ack->delay = htonl (delay);
   udp_ack->sender = *plugin->env->my_identity;
-  memcpy (&udp_ack[1],
+  GNUNET_memcpy (&udp_ack[1],
           msg,
           ntohs (msg->size));
   enqueue (plugin,
@@ -2718,7 +2998,7 @@ read_process_fragment (struct Plugin *plugin,
                        const struct GNUNET_MessageHeader *msg,
                        const union UdpAddress *udp_addr,
                        size_t udp_addr_len,
-                       enum GNUNET_ATS_Network_Type network_type)
+                       enum GNUNET_NetworkType network_type)
 {
   struct DefragContext *d_ctx;
   struct GNUNET_TIME_Absolute now;
@@ -2739,7 +3019,7 @@ read_process_fragment (struct Plugin *plugin,
   {
     /* Create a new defragmentation context */
     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
-    memcpy (&d_ctx[1],
+    GNUNET_memcpy (&d_ctx[1],
             udp_addr,
             udp_addr_len);
     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
@@ -2777,8 +3057,7 @@ read_process_fragment (struct Plugin *plugin,
                                           msg))
   {
     /* keep this 'rc' from expiring */
-    GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
-                                       d_ctx->hnode,
+    GNUNET_CONTAINER_heap_update_cost (d_ctx->hnode,
                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
   }
   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
@@ -2819,7 +3098,7 @@ udp_select_read (struct Plugin *plugin,
   const struct sockaddr_in6 *sa6;
   const union UdpAddress *int_addr;
   size_t int_addr_len;
-  enum GNUNET_ATS_Network_Type network_type;
+  enum GNUNET_NetworkType network_type;
 
   fromlen = sizeof (addr);
   memset (&addr,
@@ -2827,7 +3106,7 @@ udp_select_read (struct Plugin *plugin,
           sizeof(addr));
   size = GNUNET_NETWORK_socket_recvfrom (rsock,
                                          buf,
-                                         sizeof(buf),
+                                         sizeof (buf),
                                          (struct sockaddr *) &addr,
                                          &fromlen);
   sa = (const struct sockaddr *) &addr;
@@ -2854,6 +3133,16 @@ udp_select_read (struct Plugin *plugin,
     /* Connection failure or something. Not a protocol violation. */
     return;
   }
+
+  /* Check if this is a STUN packet */
+  if (GNUNET_NO !=
+      GNUNET_NAT_stun_handle_packet (plugin->nat,
+                                    (const struct sockaddr *) &addr,
+                                    fromlen,
+                                    buf,
+                                    size))
+    return; /* was STUN, do not process further */
+
   if (size < sizeof(struct GNUNET_MessageHeader))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -2866,6 +3155,7 @@ udp_select_read (struct Plugin *plugin,
     GNUNET_break_op (0);
     return;
   }
+
   msg = (const struct GNUNET_MessageHeader *) buf;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "UDP received %u-byte message from `%s' type %u\n",
@@ -2876,7 +3166,7 @@ udp_select_read (struct Plugin *plugin,
   if (size != ntohs (msg->size))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
-         "UDP malformed message header from %s\n",
+         "UDP malformed message (size %u) header from %s\n",
          (unsigned int) size,
          GNUNET_a2s (sa,
                      fromlen));
@@ -2971,7 +3261,7 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
 {
   struct UDP_MessageWrapper *udpw;
   struct GNUNET_TIME_Relative remaining;
-  struct Session *session;
+  struct GNUNET_ATS_Session *session;
   int removed;
 
   removed = GNUNET_NO;
@@ -3014,8 +3304,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
     }
     else
     {
-      /* Message did not time out, check flow delay */
-      remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
+      /* Message did not time out, check transmission time */
+      remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
       if (0 == remaining.rel_value_us)
       {
         /* this message is not delayed */
@@ -3061,13 +3351,13 @@ analyze_send_error (struct Plugin *plugin,
                     socklen_t slen,
                     int error)
 {
-  enum GNUNET_ATS_Network_Type type;
+  enum GNUNET_NetworkType type;
 
   type = plugin->env->get_address_type (plugin->env->cls,
                                         sa,
                                         slen);
-  if ( ( (GNUNET_ATS_NET_LAN == type) ||
-         (GNUNET_ATS_NET_WAN == type) ) &&
+  if ( ( (GNUNET_NT_LAN == type) ||
+         (GNUNET_NT_WAN == type) ) &&
        ( (ENETUNREACH == errno) ||
          (ENETDOWN == errno) ) )
   {
@@ -3181,6 +3471,9 @@ udp_select_send (struct Plugin *plugin,
                                          udpw->msg_size,
                                          a,
                                          slen);
+    udpw->session->last_transmit_time
+      = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
+                                  udpw->session->last_transmit_time);
     dequeue (plugin,
              udpw);
     if (GNUNET_SYSERR == sent)
@@ -3244,19 +3537,17 @@ udp_select_send (struct Plugin *plugin,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context
  */
 static void
-udp_plugin_select_v4 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v4 (void *cls)
 {
   struct Plugin *plugin = cls;
+  const struct GNUNET_SCHEDULER_TaskContext *tc;
 
   plugin->select_task_v4 = NULL;
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
   if (NULL == plugin->sockv4)
     return;
+  tc = GNUNET_SCHEDULER_get_task_context ();
   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
                                    plugin->sockv4)))
@@ -3274,19 +3565,17 @@ udp_plugin_select_v4 (void *cls,
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context
  */
 static void
-udp_plugin_select_v6 (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v6 (void *cls)
 {
   struct Plugin *plugin = cls;
+  const struct GNUNET_SCHEDULER_TaskContext *tc;
 
   plugin->select_task_v6 = NULL;
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
   if (NULL == plugin->sockv6)
     return;
+  tc = GNUNET_SCHEDULER_get_task_context ();
   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
                                     plugin->sockv6)) )
@@ -3310,13 +3599,13 @@ udp_plugin_select_v6 (void *cls,
  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
  * @return number of sockets that were successfully bound
  */
-static int
+static unsigned int
 setup_sockets (struct Plugin *plugin,
                const struct sockaddr_in6 *bind_v6,
                const struct sockaddr_in *bind_v4)
 {
   int tries;
-  int sockets_created = 0;
+  unsigned int sockets_created = 0;
   struct sockaddr_in6 server_addrv6;
   struct sockaddr_in server_addrv4;
   const struct sockaddr *server_addr;
@@ -3526,8 +3815,8 @@ setup_sockets (struct Plugin *plugin,
   schedule_select_v4 (plugin);
   schedule_select_v6 (plugin);
   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
-                                     GNUNET_NO,
-                                     plugin->port,
+                                    "transport-udp",
+                                    IPPROTO_UDP,
                                      sockets_created,
                                      addrs,
                                      addrlens,
@@ -3554,15 +3843,15 @@ libgnunet_plugin_transport_udp_init (void *cls)
   unsigned long long port;
   unsigned long long aport;
   unsigned long long udp_max_bps;
-  unsigned long long enable_v6;
-  unsigned long long enable_broadcasting;
-  unsigned long long enable_broadcasting_recv;
+  int enable_v6;
+  int enable_broadcasting;
+  int enable_broadcasting_recv;
   char *bind4_address;
   char *bind6_address;
   struct GNUNET_TIME_Relative interval;
   struct sockaddr_in server_addrv4;
   struct sockaddr_in6 server_addrv6;
-  int res;
+  unsigned int res;
   int have_bind4;
   int have_bind6;
 
@@ -3715,8 +4004,6 @@ libgnunet_plugin_transport_udp_init (void *cls)
   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
                                                       GNUNET_NO);
   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
-                                     p);
   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
                                  NULL,
                                  NULL,
@@ -3733,7 +4020,8 @@ libgnunet_plugin_transport_udp_init (void *cls)
         _("Failed to create UDP network sockets\n"));
     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
-    GNUNET_SERVER_mst_destroy (p->mst);
+    if (NULL != p->nat)
+      GNUNET_NAT_unregister (p->nat);
     GNUNET_free (p);
     return NULL;
   }
@@ -3754,7 +4042,8 @@ libgnunet_plugin_transport_udp_init (void *cls)
   api->check_address = &udp_plugin_check_address;
   api->get_session = &udp_plugin_get_session;
   api->send = &udp_plugin_send;
-  api->get_network = &udp_get_network;
+  api->get_network = &udp_plugin_get_network;
+  api->get_network_for_address = &udp_plugin_get_network_for_address;
   api->update_session_timeout = &udp_plugin_update_session_timeout;
   api->setup_monitor = &udp_plugin_setup_monitor;
   return api;
@@ -3842,11 +4131,6 @@ libgnunet_plugin_transport_udp_done (void *cls)
     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
     plugin->defrag_ctxs = NULL;
   }
-  if (NULL != plugin->mst)
-  {
-    GNUNET_SERVER_mst_destroy (plugin->mst);
-    plugin->mst = NULL;
-  }
   while (NULL != (udpw = plugin->ipv4_queue_head))
   {
     dequeue (plugin,
@@ -3877,6 +4161,11 @@ libgnunet_plugin_transport_udp_done (void *cls)
                                  plugin->ppc_dll_tail,
                                  cur);
     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
+    if (NULL != cur->timeout_task)
+    {
+      GNUNET_SCHEDULER_cancel (cur->timeout_task);
+      cur->timeout_task = NULL;
+    }
     GNUNET_free (cur);
   }
   GNUNET_free (plugin);