- measure overhead
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
index d1a4b821c96ba0b6aac54c6d7b7a6c37c61c0592..b6b13eab2ec33bd110c658dd9d4fb2be94b7165f 100644 (file)
@@ -86,6 +86,16 @@ struct PrettyPrinterContext
 };
 
 
+enum UDP_MessageType
+{
+  UNDEFINED = 0,
+  MSG_FRAGMENTED = 1,
+  MSG_FRAGMENTED_COMPLETE = 2,
+  MSG_UNFRAGMENTED = 3,
+  MSG_ACK = 4,
+  MSG_BEACON = 5
+};
+
 struct Session
 {
   /**
@@ -278,6 +288,9 @@ struct UDP_FragmentationContext
    * Bytes used to send all fragments on wire including UDP overhead
    */
   size_t on_wire_size;
+
+  unsigned int fragments_used;
+  struct GNUNET_TIME_Relative exp_delay;
 };
 
 
@@ -300,6 +313,12 @@ struct UDP_MessageWrapper
    */
   struct UDP_MessageWrapper *next;
 
+  /**
+   * Message type
+   * According to UDP_MessageType
+   */
+  int msg_type;
+
   /**
    * Message with size msg_size including UDP specific overhead
    */
@@ -692,40 +711,178 @@ udp_plugin_address_pretty_printer (void *cls, const char *type,
 static void
 call_continuation (struct UDP_MessageWrapper *udpw, int result)
 {
+  size_t overhead;
+  struct UDP_MessageWrapper dummy;
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Calling continuation for %u byte message to `%s' with result %s\n",
       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
       (GNUNET_OK == result) ? "OK" : "SYSERR");
-  if (NULL == udpw->cont)
-    return;
 
-  if (NULL == udpw->frag_ctx)
-  {
-      /* Not fragmented message */
-      if (GNUNET_OK == result)
-      {
-        if (udpw->msg_size >= udpw->payload_size)
-        {
-            GNUNET_STATISTICS_update (plugin->env->stats,
-                                  "# bytes overhead transmitted via UDP",
-                                  udpw->msg_size - udpw->payload_size, GNUNET_NO);
-        }
-        GNUNET_STATISTICS_update (plugin->env->stats,
-                              "# bytes payload transmitted via UDP",
-                              udpw->payload_size, GNUNET_NO);
-      }
-      udpw->cont (udpw->cont_cls, &udpw->session->target, result,
-                  udpw->payload_size, udpw->msg_size);
-  }
+  if ((udpw->msg_size - udpw->payload_size) >= 0)
+    overhead = udpw->msg_size - udpw->payload_size;
   else
-  {
-      /* Fragmented message */
-      if (GNUNET_OK == result)
-      {
-          /* Fragmented message: only call next_fragment continuation on success */
-          udpw->cont (udpw->cont_cls, &udpw->session->target, result,
+    overhead = udpw->msg_size;
+
+  switch (result) {
+    case GNUNET_OK:
+      switch (udpw->msg_type) {
+        case MSG_UNFRAGMENTED:
+          if (NULL != udpw->cont)
+          {
+            /* Transport continuation */
+            udpw->cont (udpw->cont_cls, &udpw->session->target, result,
+                      udpw->payload_size, overhead);
+          }
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, unfragmented msgs, messages, sent, success",
+                                    1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, unfragmented msgs, bytes payload, sent, success",
+                                    udpw->payload_size, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, unfragmented msgs, bytes overhead, sent, success",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, total, bytes overhead, sent",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, total, bytes payload, sent",
+                                    udpw->payload_size, GNUNET_NO);
+          break;
+        case MSG_FRAGMENTED_COMPLETE:
+          GNUNET_assert (NULL != udpw->frag_ctx);
+          if (udpw->frag_ctx->cont != NULL)
+            udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_OK,
+                               udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, messages, sent, success",
+                                    1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, bytes payload, sent, success",
+                                    udpw->payload_size, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, bytes overhead, sent, success",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, total, bytes overhead, sent",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, total, bytes payload, sent",
+                                    udpw->payload_size, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, messages, pending",
+                                    -1, GNUNET_NO);
+          break;
+        case MSG_FRAGMENTED:
+          /* Fragmented message: enqueue next fragment */
+          if (NULL != udpw->cont)
+            udpw->cont (udpw->cont_cls, &udpw->session->target, result,
                       udpw->payload_size, udpw->msg_size);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, fragments, sent, success",
+                                    1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, fragments bytes, sent, success",
+                                    udpw->msg_size, GNUNET_NO);
+          break;
+        case MSG_ACK:
+          /* No continuation */
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, ACK msgs, messages, sent, success",
+                                    1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, ACK msgs, bytes overhead, sent, success",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, total, bytes overhead, sent",
+                                    overhead, GNUNET_NO);
+          break;
+        case MSG_BEACON:
+          GNUNET_break (0);
+          break;
+        default:
+          LOG (GNUNET_ERROR_TYPE_ERROR,
+              "ERROR: %u\n", udpw->msg_type);
+          GNUNET_break (0);
+          break;
+      }
+      break;
+    case GNUNET_SYSERR:
+      switch (udpw->msg_type) {
+        case MSG_UNFRAGMENTED:
+          /* Unfragmented message: failed to send */
+          if (NULL != udpw->cont)
+            udpw->cont (udpw->cont_cls, &udpw->session->target, result,
+                      udpw->payload_size, overhead);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                  "# UDP, unfragmented msgs, messages, sent, failure",
+                                  1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, unfragmented msgs, bytes payload, sent, failure",
+                                    udpw->payload_size, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, unfragmented msgs, bytes overhead, sent, failure",
+                                    overhead, GNUNET_NO);
+          break;
+        case MSG_FRAGMENTED_COMPLETE:
+          GNUNET_assert (NULL != udpw->frag_ctx);
+          if (udpw->frag_ctx->cont != NULL)
+            udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_SYSERR,
+                               udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, messages, sent, failure",
+                                    1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, bytes payload, sent, failure",
+                                    udpw->payload_size, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, bytes payload, sent, failure",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, bytes payload, sent, failure",
+                                    overhead, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, messages, pending",
+                                    -1, GNUNET_NO);
+          break;
+        case MSG_FRAGMENTED:
+          GNUNET_assert (NULL != udpw->frag_ctx);
+          /* Fragmented message: failed to send */
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, fragments, sent, failure",
+                                    1, GNUNET_NO);
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, fragmented msgs, fragments bytes, sent, failure",
+                                    udpw->msg_size, GNUNET_NO);
+
+          dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
+          dummy.msg_buf = NULL;
+          dummy.msg_size = udpw->frag_ctx->on_wire_size;
+          dummy.payload_size = udpw->frag_ctx->payload_size;
+          dummy.frag_ctx = udpw->frag_ctx;
+          dummy.session = udpw->session;
+          call_continuation (&dummy, GNUNET_SYSERR);
+
+          break;
+        case MSG_ACK:
+          /* ACK message: failed to send */
+          GNUNET_STATISTICS_update (plugin->env->stats,
+                                    "# UDP, ACK msgs, messages, sent, failure",
+                                    1, GNUNET_NO);
+          break;
+        case MSG_BEACON:
+          /* Beacon message: failed to send */
+          GNUNET_break (0);
+          break;
+        default:
+          GNUNET_break (0);
+          break;
       }
+      break;
+    default:
+      GNUNET_break (0);
+      break;
   }
 }
 
@@ -824,6 +981,23 @@ free_session (struct Session *s)
 }
 
 
+static void
+dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
+{
+  GNUNET_STATISTICS_update (plugin->env->stats,
+                            "# UDP, total, bytes in buffers",
+                            -udpw->msg_size, GNUNET_NO);
+  GNUNET_STATISTICS_update (plugin->env->stats,
+                            "# UDP, total, msgs in buffers",
+                            -1, GNUNET_NO);
+  if (udpw->session->addrlen == sizeof (struct sockaddr_in))
+    GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
+                                 plugin->ipv4_queue_tail, udpw);
+  if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
+    GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
+                                 plugin->ipv6_queue_tail, udpw);
+}
+
 /**
  * Functions with this signature are called whenever we need
  * to close a session due to a disconnect or failure to
@@ -850,7 +1024,7 @@ disconnect_session (struct Session *s)
     next = udpw->next;
     if (udpw->session == s)
     {
-      GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+      dequeue (plugin, udpw);
       call_continuation(udpw, GNUNET_SYSERR);
       GNUNET_free (udpw);
     }
@@ -861,7 +1035,7 @@ disconnect_session (struct Session *s)
     next = udpw->next;
     if (udpw->session == s)
     {
-      GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+      dequeue (plugin, udpw);
       call_continuation(udpw, GNUNET_SYSERR);
       GNUNET_free (udpw);
     }
@@ -886,7 +1060,7 @@ disconnect_session (struct Session *s)
                                                        &s->target.hashPubKey,
                                                        s));
   GNUNET_STATISTICS_set(plugin->env->stats,
-                        "# UDP sessions active",
+                        "# UDP, sessions active",
                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
                         GNUNET_NO);
   if (s->rc > 0)
@@ -1194,7 +1368,7 @@ udp_plugin_get_session (void *cls,
                                                     s,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
   GNUNET_STATISTICS_set(plugin->env->stats,
-                        "# UDP sessions active",
+                        "# UDP, sessions active",
                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
                         GNUNET_NO);
   return s;
@@ -1204,7 +1378,12 @@ udp_plugin_get_session (void *cls,
 static void 
 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
 {
-
+  GNUNET_STATISTICS_update (plugin->env->stats,
+                            "# UDP, total, bytes in buffers",
+                            udpw->msg_size, GNUNET_NO);
+  GNUNET_STATISTICS_update (plugin->env->stats,
+                            "# UDP, total, msgs in buffers",
+                            1, GNUNET_NO);
   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
                                  plugin->ipv4_queue_tail, udpw);
@@ -1214,6 +1393,7 @@ enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
 }
 
 
+
 /**
  * Fragment message was transmitted via UDP, let fragmentation know
  * to send the next fragment now.
@@ -1253,6 +1433,7 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
  
   LOG (GNUNET_ERROR_TYPE_DEBUG, 
        "Enqueuing fragment with %u bytes\n", msg_len);
+  frag_ctx->fragments_used ++;
   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
   udpw->session = frag_ctx->session;
   udpw->msg_buf = (char *) &udpw[1];
@@ -1262,6 +1443,7 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
   udpw->cont_cls = udpw;
   udpw->timeout = frag_ctx->timeout;
   udpw->frag_ctx = frag_ctx;
+  udpw->msg_type = MSG_FRAGMENTED;
   memcpy (udpw->msg_buf, msg, msg_len);
   enqueue (plugin, udpw);
   schedule_select (plugin);
@@ -1331,13 +1513,6 @@ udp_plugin_send (void *cls,
        udpmlen,
        GNUNET_i2s (&s->target),
        GNUNET_a2s(s->sock_addr, s->addrlen));
-  
-  GNUNET_STATISTICS_update (plugin->env->stats,
-                            "# bytes currently in UDP buffers",
-                            msgbuf_size, GNUNET_NO);
-  GNUNET_STATISTICS_update (plugin->env->stats,
-                            "# bytes payload asked to transmit via UDP",
-                            msgbuf_size, GNUNET_NO);
 
 
   /* Message */
@@ -1360,9 +1535,17 @@ udp_plugin_send (void *cls,
     udpw->cont = cont;
     udpw->cont_cls = cont_cls;
     udpw->frag_ctx = NULL;
+    udpw->msg_type = MSG_UNFRAGMENTED;
     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
     enqueue (plugin, udpw);
+
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, unfragmented msgs, messages, attempt",
+                              1, GNUNET_NO);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, unfragmented msgs, bytes payload, attempt",
+                              udpw->payload_size, GNUNET_NO);
   }
   else
   {
@@ -1381,11 +1564,23 @@ udp_plugin_send (void *cls,
     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
               UDP_MTU,
               &plugin->tracker,
-              s->last_expected_delay,
+              GNUNET_TIME_relative_max (s->last_expected_delay, GNUNET_TIME_UNIT_SECONDS),
               &udp->header,
               &enqueue_fragment,
               frag_ctx);
+
+    frag_ctx->exp_delay = s->last_expected_delay;
     s->frag_ctx = frag_ctx;
+
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, fragmented msgs, messages, pending",
+                              1, GNUNET_NO);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, fragmented msgs, messages, attempt",
+                              1, GNUNET_NO);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, fragmented msgs, bytes payload, attempt",
+                              frag_ctx->payload_size, GNUNET_NO);
   }
   schedule_select (plugin);
   return udpmlen;
@@ -1674,16 +1869,17 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
        delay);
   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
   udpw->msg_size = msize;
+  udpw->payload_size = 0;
   udpw->session = s;
   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
   udpw->msg_buf = (char *)&udpw[1];
+  udpw->msg_type = MSG_ACK;
   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
   udp_ack->header.size = htons ((uint16_t) msize);
   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
   udp_ack->delay = htonl (delay);
   udp_ack->sender = *rc->plugin->env->my_identity;
   memcpy (&udp_ack[1], msg, ntohs (msg->size));
-
   enqueue (rc->plugin, udpw);
 }
 
@@ -1710,6 +1906,9 @@ read_process_ack (struct Plugin *plugin,
                  char *addr,
                  socklen_t fromlen)
 {
+  struct UDP_MessageWrapper dummy;
+  struct UDP_MessageWrapper * udpw;
+  struct UDP_MessageWrapper * tmp;
   const struct GNUNET_MessageHeader *ack;
   const struct UDP_ACK_Message *udp_ack;
   struct LookupContext l_ctx;
@@ -1748,6 +1947,9 @@ read_process_ack (struct Plugin *plugin,
     return;
   }
 
+  if (0 != memcmp (&l_ctx.res->target, &udp_ack->sender, sizeof (struct GNUNET_PeerIdentity)))
+    GNUNET_break (0);
+
   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1764,8 +1966,6 @@ read_process_ack (struct Plugin *plugin,
        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
 
-  struct UDP_MessageWrapper * udpw;
-  struct UDP_MessageWrapper * tmp;
   if (s->addrlen == sizeof (struct sockaddr_in6))
   {
     udpw = plugin->ipv6_queue_head;
@@ -1774,7 +1974,7 @@ read_process_ack (struct Plugin *plugin,
       tmp = udpw->next;
       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
       {
-        GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+        dequeue (plugin, udpw);
         GNUNET_free (udpw);
       }
       udpw = tmp;
@@ -1788,34 +1988,29 @@ read_process_ack (struct Plugin *plugin,
       tmp = udpw->next;
       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
       {
-        GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+        dequeue (plugin, udpw);
         GNUNET_free (udpw);
       }
       udpw = tmp;
     }
   }
+/*
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "Fragmented message sent: fragments needed: %u , payload %u bytes, used on wire %u bytes, overhead: %f, expected delay: %llu\n",
+       s->frag_ctx->fragments_used,
+       s->frag_ctx->payload_size,
+       s->frag_ctx->on_wire_size,
+       ((float) s->frag_ctx->on_wire_size) / s->frag_ctx->payload_size,
+       s->frag_ctx->exp_delay.rel_value);
+*/
+  dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
+  dummy.msg_buf = NULL;
+  dummy.msg_size = s->frag_ctx->on_wire_size;
+  dummy.payload_size = s->frag_ctx->payload_size;
+  dummy.frag_ctx = s->frag_ctx;
+  dummy.session = s;
 
-  GNUNET_STATISTICS_update (plugin->env->stats,
-                        "# bytes payload transmitted via UDP",
-                        s->frag_ctx->payload_size, GNUNET_NO);
-
-  if (s->frag_ctx->on_wire_size >= s->frag_ctx->payload_size)
-  {
-      GNUNET_STATISTICS_update (plugin->env->stats,
-                            "# bytes overhead transmitted via UDP",
-                            s->frag_ctx->on_wire_size - s->frag_ctx->payload_size, GNUNET_NO);
-  }
-
-
-  if (s->frag_ctx->cont != NULL)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Calling continuation for fragmented message to `%s' with result %s\n",
-        GNUNET_i2s (&s->target), "OK");
-
-    s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK,
-                       s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
-  }
+  call_continuation (&dummy, GNUNET_OK);
 
   GNUNET_free (s->frag_ctx);
   s->frag_ctx = NULL;
@@ -1939,7 +2134,7 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
   }
 
   GNUNET_STATISTICS_update (plugin->env->stats,
-                            "# bytes received via UDP",
+                            "# UDP, total, bytes, received",
                             size, GNUNET_NO);
 
   switch (ntohs (msg->type))
@@ -1981,28 +2176,33 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
     {
       /* Message timed out */
-      call_continuation(udpw, GNUNET_SYSERR);
-      if (NULL == udpw->frag_ctx)
-      {
-        /* Not fragmented message */
-        LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Message for peer `%s' with size %u timed out\n",
-             GNUNET_i2s(&udpw->session->target), udpw->payload_size);
-      }
-      else
-      {
+      call_continuation (udpw, GNUNET_SYSERR);
+      switch (udpw->msg_type) {
+        case MSG_UNFRAGMENTED:
+          /* Not fragmented message */
+          LOG (GNUNET_ERROR_TYPE_DEBUG,
+               "Message for peer `%s' with size %u timed out\n",
+               GNUNET_i2s(&udpw->session->target), udpw->payload_size);
+          break;
+        case MSG_FRAGMENTED:
           /* Fragmented message */
+          call_continuation (udpw, GNUNET_SYSERR);
           LOG (GNUNET_ERROR_TYPE_DEBUG,
                "Fragment for message for peer `%s' with size %u timed out\n",
                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
           udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy (udpw->frag_ctx->frag);
           GNUNET_free (udpw->frag_ctx);
           udpw->session->frag_ctx = NULL;
-      }
 
-      GNUNET_STATISTICS_update (plugin->env->stats,
-                                "# bytes currently in UDP buffers",
-                                -udpw->msg_size, GNUNET_NO);
+          break;
+        case MSG_ACK:
+          LOG (GNUNET_ERROR_TYPE_DEBUG,
+               "ACK Message for peer `%s' with size %u timed out\n",
+               GNUNET_i2s(&udpw->session->target), udpw->payload_size);
+          break;
+        default:
+          break;
+      }
 
       GNUNET_STATISTICS_update (plugin->env->stats,
                                 "# messages dismissed due to timeout",
@@ -2010,13 +2210,13 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
       /* Remove message */
       if (sock == plugin->sockv4)
       {
-        GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+        dequeue (plugin, udpw);
         GNUNET_free (udpw);
         udpw = plugin->ipv4_queue_head;
       }
       if (sock == plugin->sockv6)
       {
-        GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+        dequeue (plugin, udpw);
         GNUNET_free (udpw);
         udpw = plugin->ipv6_queue_head;
       }
@@ -2120,6 +2320,12 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
     /* Failure */
     analyze_send_error (plugin, sa, slen, errno);
     call_continuation(udpw, GNUNET_SYSERR);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                            "# UDP, total, bytes, sent, failure",
+                            sent, GNUNET_NO);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, total, messages, sent, failure",
+                              1, GNUNET_NO);
   }
   else
   {
@@ -2129,21 +2335,16 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
          (sent < 0) ? STRERROR (errno) : "ok");
     GNUNET_STATISTICS_update (plugin->env->stats,
-                              "# bytes transmitted via UDP",
+                              "# UDP, total, bytes, sent, success",
                               sent, GNUNET_NO);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              "# UDP, total, messages, sent, success",
+                              1, GNUNET_NO);
     if (NULL != udpw->frag_ctx)
         udpw->frag_ctx->on_wire_size += udpw->msg_size;
     call_continuation (udpw, GNUNET_OK);
   }
-
-  GNUNET_STATISTICS_update (plugin->env->stats,
-                            "# bytes currently in UDP buffers",
-                            -udpw->msg_size, GNUNET_NO);
-
-  if (sock == plugin->sockv4)
-    GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
-  else if (sock == plugin->sockv6)
-    GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+  dequeue (plugin, udpw);
   GNUNET_free (udpw);
   udpw = NULL;
 
@@ -2610,18 +2811,20 @@ libgnunet_plugin_transport_udp_done (void *cls)
   while (udpw != NULL)
   {
     struct UDP_MessageWrapper *tmp = udpw->next;
-    GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+    dequeue (plugin, udpw);
     call_continuation(udpw, GNUNET_SYSERR);
     GNUNET_free (udpw);
+
     udpw = tmp;
   }
   udpw = plugin->ipv6_queue_head;
   while (udpw != NULL)
   {
     struct UDP_MessageWrapper *tmp = udpw->next;
-    GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+    dequeue (plugin, udpw);
     call_continuation(udpw, GNUNET_SYSERR);
     GNUNET_free (udpw);
+
     udpw = tmp;
   }