Add missing include
[oweals/gnunet.git] / src / transport / gnunet-service-transport_manipulation.c
index fa8621e3d32ecf5d5c64fa37ff12117a90ca7af5..1b708cb7097e77a827089b42a3e055e9254b4323 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2010,2011 Christian Grothoff (and other contributing authors)
+     (C) 2010-2013 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 #include "gnunet-service-transport.h"
 #include "transport.h"
 
-#define DELAY 0
-#define DISTANCE 1
+enum TRAFFIC_METRIC_DIRECTION
+{
+       TM_SEND = 0,
+       TM_RECEIVE = 1,
+       TM_BOTH = 2
+};
 
-struct GST_ManipulationHandle man_handle;
 
+/**
+ * Struct containing information about manipulations to a specific peer
+ */
+struct TM_Peer;
 
-struct GST_ManipulationHandle
+/**
+ * Manipulation entry
+ */
+struct PropManipulationEntry
 {
-       struct GNUNET_CONTAINER_MultiHashMap *peers;
+       /**
+        * Next in DLL
+        */
+       struct PropManipulationEntry *next;
+
+       /**
+        * Previous in DLL
+        */
+       struct PropManipulationEntry *prev;
 
        /**
-        * General inbound delay
+        * ATS type in HBO
         */
-       struct GNUNET_TIME_Relative delay_in;
+       uint32_t type;
 
        /**
-        * General outbound delay
+        * Value in HBO
         */
-       struct GNUNET_TIME_Relative delay_out;
+       uint32_t metrics[TM_BOTH];
 
 };
 
+/**
+ * Struct containing information about manipulations to a specific peer
+ */
+struct TM_Peer
+{
+       /**
+        * Peer ID
+        */
+       struct GNUNET_PeerIdentity peer;
 
-struct TM_Peer;
+       struct PropManipulationEntry *head;
+       struct PropManipulationEntry *tail;
 
+       /**
+        * Peer specific manipulation metrics
+        */
+       uint32_t metrics [TM_BOTH][GNUNET_ATS_QualityPropertiesCount];
+
+       /**
+        * Task to schedule delayed sendding
+        */
+       GNUNET_SCHEDULER_TaskIdentifier send_delay_task;
+
+       /**
+        * Send queue DLL head
+        */
+       struct DelayQueueEntry *send_head;
+
+       /**
+        * Send queue DLL tail
+        */
+       struct DelayQueueEntry *send_tail;
+};
 
 
+struct GST_ManipulationHandle
+{
+       /**
+        * Hashmap contain all peers currently manipulated
+        */
+       struct GNUNET_CONTAINER_MultiPeerMap *peers;
+
+       /**
+        * Peer containing information for general manipulation
+        */
+       struct TM_Peer general;
+};
+
+
+
+/**
+ * Entry in the delay queue for an outbound delayed message
+ */
 struct DelayQueueEntry
 {
+       /**
+        * Next in DLL
+        */
        struct DelayQueueEntry *prev;
+
+       /**
+        * Previous in DLL
+        */
        struct DelayQueueEntry *next;
+
+       /**
+        * Peer this entry is belonging to
+        * if (NULL == tmp): enqueued in generic DLL and scheduled by generic_send_delay_task
+        * else: enqueued in tmp->send_head and tmp->send_tail and scheduled by tmp->send_delay_task
+        */
        struct TM_Peer *tmp;
+
+       /**
+        * Peer ID
+        */
+       struct GNUNET_PeerIdentity id;
+
+       /**
+        * Absolute time when to send
+        */
        struct GNUNET_TIME_Absolute sent_at;
+
+       /**
+        * The message
+        */
        void *msg;
+
+       /**
+        * The message size
+        */
        size_t msg_size;
+
+       /**
+        * Message timeout
+        */
        struct GNUNET_TIME_Relative timeout;
+
+       /**
+        * Transports send continuation
+        */
        GST_NeighbourSendContinuation cont;
+
+       /**
+        * Transports send continuation cls
+        */
        void *cont_cls;
 };
 
-struct TM_Peer
-{
-       struct GNUNET_PeerIdentity peer;
-       uint32_t metrics [TM_BOTH][GNUNET_ATS_QualityPropertiesCount];
-       GNUNET_SCHEDULER_TaskIdentifier send_delay_task;
-       struct DelayQueueEntry *send_head;
-       struct DelayQueueEntry *send_tail;
-};
+struct GST_ManipulationHandle man_handle;
 
+/**
+ * DLL head for delayed messages based on general delay
+ */
+struct DelayQueueEntry *generic_dqe_head;
 
+/**
+ * DLL tail for delayed messages based on general delay
+ */
+struct DelayQueueEntry *generic_dqe_tail;
+
+/**
+ * Task to schedule delayed sending based on general delay
+ */
+GNUNET_SCHEDULER_TaskIdentifier generic_send_delay_task;
 
 static void
-set_delay(struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction, uint32_t value)
+set_metric (struct TM_Peer *dest, int direction, uint32_t type, uint32_t value)
 {
-       uint32_t val;
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Set traffic metrics %s for peer `%s' in direction %s to %u\n",
-                       "DELAY", GNUNET_i2s(peer),
-                       (TM_BOTH == direction) ? "BOTH" : (TM_SEND == direction) ? "SEND": "RECEIVE", value);
-
-       if (UINT32_MAX == value)
-               val = UINT32_MAX - 1; /* prevent overflow */
-       else if (0 == value)
-               val = UINT32_MAX; /* disable */
-       else
-               val = value;
+       struct PropManipulationEntry *cur;
+       for (cur = dest->head; NULL != cur; cur = cur->next)
+       {
+               if (cur->type == type)
+                       break;
+       }
+       if (NULL == cur)
+       {
+               cur = GNUNET_malloc (sizeof (struct PropManipulationEntry));
+               GNUNET_CONTAINER_DLL_insert (dest->head, dest->tail, cur);
+               cur->type = type;
+               cur->metrics[TM_SEND] = UINT32_MAX;
+               cur->metrics[TM_RECEIVE] = UINT32_MAX;
+       }
+
 
        switch (direction) {
                case TM_BOTH:
-                       tmp->metrics[TM_SEND][DELAY] = val;
-                       tmp->metrics[TM_RECEIVE][DELAY] = val;
+                       cur->metrics[TM_SEND] = value;
+                       cur->metrics[TM_RECEIVE] = value;
                        break;
                case TM_SEND:
-                       tmp->metrics[TM_SEND][DELAY] = val;
+                       cur->metrics[TM_SEND] = value;
                        break;
                case TM_RECEIVE:
-                       tmp->metrics[TM_RECEIVE][DELAY] = val;
+                       cur->metrics[TM_RECEIVE] = value;
                        break;
                default:
                        break;
@@ -117,56 +235,97 @@ set_delay(struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction,
 
 }
 
+static uint32_t
+find_metric (struct TM_Peer *dest, uint32_t type, int direction)
+{
+       struct PropManipulationEntry *cur;
+
+       for (cur = dest->head; NULL != cur; cur = cur->next)
+       {
+               if (cur->type == type)
+                       return cur->metrics[direction];
+
+       }
+       return UINT32_MAX;
+}
+
+/**
+ * Clean up metrics for a peer
+ */
+
 static void
-set_distance (struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction, uint32_t value)
+free_metric (struct TM_Peer *dest)
 {
-       uint32_t val;
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Set traffic metrics %s for peer `%s' in direction %s to %u\n",
-                       "DISTANCE", GNUNET_i2s(peer),
-                       (TM_BOTH == direction) ? "BOTH" : (TM_SEND == direction) ? "SEND": "RECEIVE", value);
-
-       if (UINT32_MAX == value)
-               val = UINT32_MAX - 1; /* prevent overflow */
-       else if (0 == value)
-               val = UINT32_MAX; /* disable */
-       else
-               val = value;
+       struct PropManipulationEntry *cur;
+       struct PropManipulationEntry *next;
 
-       switch (direction) {
-       case TM_BOTH:
-               tmp->metrics[TM_SEND][DISTANCE] = val;
-               tmp->metrics[TM_RECEIVE][DISTANCE] = val;
-               break;
-       case TM_SEND:
-               tmp->metrics[TM_SEND][DISTANCE] = val;
-               break;
-       case TM_RECEIVE:
-               tmp->metrics[TM_RECEIVE][DISTANCE] = val;
-               break;
-       default:
-               break;
+       for (cur = dest->head; NULL != cur; cur = next)
+       {
+               next = cur->next;
+               GNUNET_CONTAINER_DLL_remove (dest->head, dest->tail, cur);
+               GNUNET_free (cur);
        }
 }
 
+
+/**
+ * Set traffic metric to manipulate
+ *
+ * @param cls closure
+ * @param client client sending message
+ * @param message containing information
+ */
 void
 GST_manipulation_set_metric (void *cls, struct GNUNET_SERVER_Client *client,
     const struct GNUNET_MessageHeader *message)
 {
        struct TrafficMetricMessage *tm = (struct TrafficMetricMessage *) message;
+       struct GNUNET_PeerIdentity dummy;
        struct GNUNET_ATS_Information *ats;
        struct TM_Peer *tmp;
        uint32_t type;
        uint32_t value;
+       uint16_t direction;
        int c;
        int c2;
 
        if (0 == ntohs (tm->ats_count))
          GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
 
+       direction = TM_BOTH;
+       switch (ntohs(tm->direction)) {
+               case 1:
+                       direction = TM_SEND;
+                       break;
+               case 2:
+                       direction = TM_RECEIVE;
+                       break;
+               case 3:
+                       direction = TM_BOTH;
+                       break;
+               default:
+                       break;
+       }
+
+       memset (&dummy, '\0', sizeof (struct GNUNET_PeerIdentity));
+       if (0 == memcmp (&tm->peer, &dummy, sizeof (struct GNUNET_PeerIdentity)))
+       {
+                       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received traffic metrics for all peers \n");
+
+                       ats = (struct GNUNET_ATS_Information *) &tm[1];
+                       for (c = 0; c < ntohs (tm->ats_count); c++)
+                       {
+                                       type = htonl (ats[c].type);
+                                       value = htonl (ats[c].value);
+                                       set_metric (&man_handle.general, direction, type, value);
+                       }
+                       return;
+       }
+
        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received traffic metrics for peer `%s'\n",
                        GNUNET_i2s(&tm->peer));
 
-       if (NULL == (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &tm->peer.hashPubKey)))
+       if (NULL == (tmp = GNUNET_CONTAINER_multipeermap_get (man_handle.peers, &tm->peer)))
        {
                        tmp = GNUNET_malloc (sizeof (struct TM_Peer));
                        tmp->peer = (tm->peer);
@@ -177,7 +336,9 @@ GST_manipulation_set_metric (void *cls, struct GNUNET_SERVER_Client *client,
                                                        tmp->metrics[c][c2] = UINT32_MAX;
                                        }
                        }
-                       GNUNET_CONTAINER_multihashmap_put (man_handle.peers, &tm->peer.hashPubKey, tmp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+                       GNUNET_CONTAINER_multipeermap_put (man_handle.peers,
+                                                          &tm->peer, tmp,
+                                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
        }
 
        ats = (struct GNUNET_ATS_Information *) &tm[1];
@@ -185,16 +346,7 @@ GST_manipulation_set_metric (void *cls, struct GNUNET_SERVER_Client *client,
        {
                        type = htonl (ats[c].type);
                        value = htonl (ats[c].value);
-                       switch (type) {
-                               case GNUNET_ATS_QUALITY_NET_DELAY:
-                                       set_delay (tmp, &tm->peer, ntohs (tm->direction), value);
-                                       break;
-                               case GNUNET_ATS_QUALITY_NET_DISTANCE:
-                                       set_distance (tmp, &tm->peer, ntohs (tm->direction), value);
-                                       break;
-                               default:
-                                       break;
-                       }
+                       set_metric (tmp, direction, type, value);
        }
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -207,182 +359,397 @@ send_delayed (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
        struct DelayQueueEntry *next;
        struct TM_Peer *tmp = dqe->tmp;
        struct GNUNET_TIME_Relative delay;
-       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
-       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
-       GST_neighbours_send (&tmp->peer, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
 
-       next = tmp->send_head;
-       if (NULL != next)
+       if (NULL != tmp)
        {
-                       /* More delayed messages */
-                       delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
-                       tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
-       }
+                       GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
+                       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+                       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+                       GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
 
+                       next = tmp->send_head;
+                       if (NULL != next)
+                       {
+                               /* More delayed messages */
+                               delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
+                               tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
+                       }
+       }
+       else
+       {
+                       /* Remove from generic queue */
+                       GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
+                       generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+                       GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+                       GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
+                       next = generic_dqe_head;
+                       if (NULL != next)
+                       {
+                               /* More delayed messages */
+                               delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
+                               generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
+                       }
+       }
        GNUNET_free (dqe);
 }
 
+
+/**
+ * Adapter function between transport's send function and transport plugins
+ *
+ * @param target the peer the message to send to
+ * @param msg the message received
+ * @param msg_size message size
+ * @param timeout timeout
+ * @param cont the continuation to call after sending
+ * @param cont_cls cls for continuation
+ */
 void
 GST_manipulation_send (const struct GNUNET_PeerIdentity *target, const void *msg,
     size_t msg_size, struct GNUNET_TIME_Relative timeout,
     GST_NeighbourSendContinuation cont, void *cont_cls)
 {
+  struct TM_Peer *tmp;
+  struct DelayQueueEntry *dqe;
+  struct GNUNET_TIME_Relative delay;
+
+  if (NULL != (tmp = GNUNET_CONTAINER_multipeermap_get (man_handle.peers, target)))
+  {
+    GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
+    /* Manipulate here */
+    /* Delay */
+    if (UINT32_MAX != find_metric(tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
+    {
+      /* We have a delay */
+      delay.rel_value_us = find_metric (tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
+      dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
+      dqe->id = *target;
+      dqe->tmp = tmp;
+      dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
+      dqe->cont = cont;
+      dqe->cont_cls = cont_cls;
+      dqe->msg = &dqe[1];
+      dqe->msg_size = msg_size;
+      dqe->timeout = timeout;
+      memcpy (dqe->msg, msg, msg_size);
+      GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
+      if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
+        tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+          "Delaying %u byte message to peer `%s' with generic delay for %ms\n",
+          msg_size, GNUNET_i2s (target),
+          GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
+      return;
+    }
+  }
+  else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
+  {
+    GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
+    /* We have a delay */
+    delay.rel_value_us = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
+    dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
+    dqe->id = *target;
+    dqe->tmp = NULL;
+    dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
+    dqe->cont = cont;
+    dqe->cont_cls = cont_cls;
+    dqe->msg = &dqe[1];
+    dqe->msg_size = msg_size;
+    dqe->timeout = timeout;
+    memcpy (dqe->msg, msg, msg_size);
+    GNUNET_CONTAINER_DLL_insert_tail (generic_dqe_head, generic_dqe_tail, dqe);
+    if (GNUNET_SCHEDULER_NO_TASK == generic_send_delay_task)
+    {
+        generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+        "Delaying %u byte message to peer `%s' with peer specific delay for %s\n",
+        msg_size, GNUNET_i2s (target),
+        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
+    return;
+  }
+
+  /* Normal sending */
+  GST_neighbours_send (target, msg, msg_size, timeout, cont, cont_cls);
+}
+
+
+/**
+ * Function that will be called to manipulate ATS information according to
+ * current manipulation settings
+ *
+ * @param peer the peer
+ * @param address binary address
+ * @param session the session
+ * @param ats the ats information
+ * @param ats_count the number of ats information
+ */
+struct GNUNET_ATS_Information *
+GST_manipulation_manipulate_metrics (const struct GNUNET_PeerIdentity *peer,
+               const struct GNUNET_HELLO_Address *address,
+               struct Session *session,
+               const struct GNUNET_ATS_Information *ats,
+               uint32_t ats_count)
+{
+       struct GNUNET_ATS_Information *ats_new = GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *ats_count);
        struct TM_Peer *tmp;
-       struct DelayQueueEntry *dqe;
-       struct GNUNET_TIME_Relative delay;
+       uint32_t m_tmp;
+       uint32_t g_tmp;
+       int d;
+       tmp = GNUNET_CONTAINER_multipeermap_get (man_handle.peers, peer);
 
-       if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &target->hashPubKey)))
-       {
-                       /* Manipulate here */
-                       /* Delay */
-                       if (UINT32_MAX != tmp->metrics[TM_SEND][DELAY])
-                       {
-                                       /* We have a delay */
-                                       delay.rel_value = tmp->metrics[TM_SEND][DELAY];
-                                       dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
-                                       dqe->tmp = tmp;
-                                       dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
-                                       dqe->cont = cont;
-                                       dqe->cont_cls = cont_cls;
-                                       dqe->msg = &dqe[1];
-                                       dqe->msg_size = msg_size;
-                                       dqe->timeout = timeout;
-                                       memcpy (dqe->msg, msg, msg_size);
-                                       GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
-                                       if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
-                                               tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
-                                       return;
-                       }
-       }
-       else if (man_handle.delay_out.rel_value != 0)
+       for (d = 0; d < ats_count; d++)
        {
-                       /* We have a delay */
-                       delay = man_handle.delay_out;
-                       dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
-                       dqe->tmp = tmp;
-                       dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
-                       dqe->cont = cont;
-                       dqe->cont_cls = cont_cls;
-                       dqe->msg = &dqe[1];
-                       dqe->msg_size = msg_size;
-                       dqe->timeout = timeout;
-                       memcpy (dqe->msg, msg, msg_size);
-                       GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
-                       if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
-                               tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
-                       return;
+               ats_new[d] = ats[d];
+               m_tmp = UINT32_MAX;
+               if (NULL != tmp)
+                       m_tmp = find_metric (tmp, ntohl(ats[d].type), TM_RECEIVE);
+               g_tmp = find_metric (&man_handle.general, ntohl(ats[d].type), TM_RECEIVE);
+
+               if (UINT32_MAX != g_tmp)
+                               ats_new[d].value = htonl(g_tmp);
+               if (UINT32_MAX != m_tmp)
+                               ats_new[d].value = htonl(m_tmp);
        }
 
-       /* Normal sending */
-       GST_neighbours_send (target, msg, msg_size, timeout, cont, cont_cls);
+       return ats_new;
 }
 
+
+/**
+ * Adapter function between transport plugins and transport receive function
+ * manipulation delays for next send.
+ *
+ * @param cls the closure for transport
+ * @param peer the peer the message was received from
+ * @param message the message received
+ * @param session the session the message was received on
+ * @param sender_address the sender address
+ * @param sender_address_len the length of the sender address
+ * @return manipulated delay for next receive
+ */
 struct GNUNET_TIME_Relative
-GST_manipulation_recv (void *cls, const struct GNUNET_PeerIdentity *peer,
+GST_manipulation_recv (void *cls,
+               const struct GNUNET_PeerIdentity *peer,
     const struct GNUNET_MessageHeader *message,
-    const struct GNUNET_ATS_Information *ats,
-    uint32_t ats_count, struct Session *session,
+    struct Session *session,
     const char *sender_address,
     uint16_t sender_address_len)
 {
        struct TM_Peer *tmp;
-       int d;
-       struct GNUNET_ATS_Information ats_new[ats_count];
-       struct GNUNET_TIME_Relative q_delay;
+       uint32_t p_recv_delay;
+       uint32_t g_recv_delay;
+       struct GNUNET_TIME_Relative quota_delay;
        struct GNUNET_TIME_Relative m_delay;
 
-       for (d = 0; d < ats_count; d++)
+       g_recv_delay = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
+       if ((g_recv_delay >= GNUNET_TIME_UNIT_ZERO.rel_value_us) && (UINT32_MAX != g_recv_delay))
+         m_delay.rel_value_us = g_recv_delay; /* Global delay */
+       else
+         m_delay = GNUNET_TIME_UNIT_ZERO;
 
-       if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey)))
+       if (NULL != (tmp = GNUNET_CONTAINER_multipeermap_get (man_handle.peers, peer)))
        {
-                       /* Manipulate distance */
-                       for (d = 0; d < ats_count; d++)
-                       {
-                                       ats_new[d] = ats[d];
-                                       /* Set distance */
-                                       if ((ntohl(ats[d].type) == GNUNET_ATS_QUALITY_NET_DISTANCE) &&
-                                                (UINT32_MAX != tmp->metrics[TM_RECEIVE][DISTANCE]))
-                                                       ats_new[d].value = htonl(tmp->metrics[TM_RECEIVE][DISTANCE]);
-                       }
-                       /* Manipulate receive delay */
-                       if (UINT32_MAX != tmp->metrics[TM_RECEIVE][DELAY])
-                       {
-                                       m_delay.rel_value = tmp->metrics[TM_RECEIVE][DELAY];
-                                       q_delay = GST_receive_callback (cls, peer, message, &ats_new[0], ats_count,
-                                                       session, sender_address, sender_address_len);
-
-                                       if (q_delay.rel_value >= m_delay.rel_value)
-                                       {
-                                                       return q_delay;
-                                       }
-                                       else
-                                       {
-                                                       return m_delay;
-                                       }
-                       }
-                       else
-                               return GST_receive_callback (cls, peer, message, &ats_new[0], ats_count,
-                                               session, sender_address, sender_address_len);
+         /* Manipulate receive delay */
+         p_recv_delay = find_metric (tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
+         if (UINT32_MAX != p_recv_delay)
+           m_delay.rel_value_us = p_recv_delay; /* Peer specific delay */
        }
 
-       return GST_receive_callback (cls, peer, message, ats, ats_count,
+       quota_delay = GST_receive_callback (cls, peer, message,
                        session, sender_address, sender_address_len);
+
+       if (quota_delay.rel_value_us > m_delay.rel_value_us)
+               m_delay = quota_delay;
+
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Delaying next receive for peer `%s' for %s\n",
+                   GNUNET_i2s (peer),
+                   GNUNET_STRINGS_relative_time_to_string (m_delay, GNUNET_YES));
+       return m_delay;
+
 }
 
+
+/**
+ * Initialize traffic manipulation
+ *
+ * @param GST_cfg configuration handle
+ */
 void
 GST_manipulation_init (const struct GNUNET_CONFIGURATION_Handle *GST_cfg)
 {
+  unsigned long long tmp;
+  struct GNUNET_TIME_Relative delay;
+
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
+                                                         "transport",
+                                                         "MANIPULATE_DISTANCE_IN",
+                                                           &tmp)) &&
+       (tmp > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Setting inbound distance_in to %llu\n",
+               (unsigned long long) tmp);
+    set_metric (&man_handle.general, TM_RECEIVE, GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
+  }
+
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
+                                                           "transport",
+                                                           "MANIPULATE_DISTANCE_OUT",
+                                                           &tmp)) &&
+       (tmp > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Setting outbound distance_in to %llu\n",
+               (unsigned long long) tmp);
+    set_metric (&man_handle.general, TM_SEND,
+               GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
+  }
+
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
+                                                         "transport",
+                                                         "MANIPULATE_DELAY_IN",
+                                                         &delay)) &&
+       (delay.rel_value_us > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Delaying inbound traffic for %s\n",
+               GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
+    set_metric (&man_handle.general, TM_RECEIVE,
+               GNUNET_ATS_QUALITY_NET_DELAY,
+               delay.rel_value_us);
+  }
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
+                                                         "transport",
+                                                         "MANIPULATE_DELAY_OUT",
+                                                         &delay)) &&
+       (delay.rel_value_us > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Delaying outbound traffic for %s\n",
+               GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
+    set_metric (&man_handle.general,
+               TM_SEND,
+               GNUNET_ATS_QUALITY_NET_DELAY,
+               delay.rel_value_us);
+  }
+  man_handle.peers = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
+}
 
-       if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
-                       "transport", "MANIPULATE_DELAY_IN", &man_handle.delay_in))
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying inbound traffic for %llu ms\n",
-                               (unsigned long long) man_handle.delay_in.rel_value);
-       else
-               man_handle.delay_in.rel_value = 0;
-
-       if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
-                       "transport", "MANIPULATE_DELAY_OUT", &man_handle.delay_out))
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying outbound traffic for %llu ms\n",
-                       (unsigned long long) man_handle.delay_out.rel_value);
-       else
-               man_handle.delay_out.rel_value = 0;
 
-       man_handle.peers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
+static int
+free_tmps (void *cls,
+          const struct GNUNET_PeerIdentity *key,
+          void *value)
+{
+  struct DelayQueueEntry *dqe;
+  struct DelayQueueEntry *next;
+
+  if (NULL != value)
+  {
+    struct TM_Peer *tmp = (struct TM_Peer *) value;
+
+    if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove (man_handle.peers, key, value))
+      GNUNET_break (0);
+    free_metric (tmp);
+    next = tmp->send_head;
+    while (NULL != (dqe = next))
+      {
+       next = dqe->next;
+       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+       if (NULL != dqe->cont)
+         dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+       GNUNET_free (dqe);
+      }
+    if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
+      {
+       GNUNET_SCHEDULER_cancel (tmp->send_delay_task);
+       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+      }
+    GNUNET_free (tmp);
+  }
+  return GNUNET_OK;
 }
 
-int free_tmps (void *cls,
-                                                        const struct GNUNET_HashCode * key,
-                                                        void *value)
+
+/**
+ * Notify manipulation about disconnect so it can discard queued messages
+ *
+ * @param peer the disconnecting peer
+ */
+void
+GST_manipulation_peer_disconnect (const struct GNUNET_PeerIdentity *peer)
 {
+       struct TM_Peer *tmp;
        struct DelayQueueEntry *dqe;
        struct DelayQueueEntry *next;
-       if (NULL != value)
+
+       if (NULL != (tmp = GNUNET_CONTAINER_multipeermap_get (man_handle.peers, peer)))
        {
-                       struct TM_Peer *tmp = (struct TM_Peer *) value;
-                       GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value);
                        next = tmp->send_head;
                        while (NULL != (dqe = next))
                        {
                                        next = dqe->next;
                                        GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+                                       if (NULL != dqe->cont)
+                                                       dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
                                        GNUNET_free (dqe);
                        }
-                       if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
+       }
+       else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
+       {
+                       next = generic_dqe_head;
+                       while (NULL != (dqe = next))
                        {
-                                       GNUNET_SCHEDULER_cancel (tmp->send_delay_task);
-                                       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+                                       next = dqe->next;
+                                       if (0 == memcmp (peer, &dqe->id, sizeof (dqe->id)))
+                                       {
+                                                       GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+                                                       if (NULL != dqe->cont)
+                                                               dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+                                                       GNUNET_free (dqe);
+                                       }
+                       }
+                       if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
+                       {
+                                       GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+                                       if (NULL != generic_dqe_head)
+                                               generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (
+                                                               GNUNET_TIME_absolute_get_remaining(generic_dqe_head->sent_at),
+                                                               &send_delayed, generic_dqe_head);
                        }
-                       GNUNET_free (tmp);
        }
-       return GNUNET_OK;
 }
 
+
+/**
+ * Stop traffic manipulation
+ */
 void
 GST_manipulation_stop ()
 {
-       GNUNET_CONTAINER_multihashmap_iterate (man_handle.peers, &free_tmps,NULL);
+       struct DelayQueueEntry *cur;
+       struct DelayQueueEntry *next;
+       GNUNET_CONTAINER_multipeermap_iterate (man_handle.peers, &free_tmps,NULL);
+       GNUNET_CONTAINER_multipeermap_destroy (man_handle.peers);
+
+       next = generic_dqe_head;
+       while (NULL != (cur = next))
+       {
+                       next = cur->next;
+                       GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, cur);
+                       if (NULL != cur->cont)
+                               cur->cont (cur->cont_cls, GNUNET_SYSERR, cur->msg_size, 0);
+                       GNUNET_free (cur);
+       }
+       if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
+       {
+                       GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+                       generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+       }
 
-       GNUNET_CONTAINER_multihashmap_destroy (man_handle.peers);
+       free_metric (&man_handle.general);
        man_handle.peers = NULL;
 }