fixing compiler warnings
[oweals/gnunet.git] / src / transport / gnunet-service-transport_manipulation.c
index 34c8ab173910ae833ba8b3be6d0ccd3ae51f9633..7620cdbb1c88969be1fcfd75fd0e2482292bb9de 100644 (file)
@@ -1,22 +1,22 @@
 /*
    This file is part of GNUnet.
-     (C) 2010,2011 Christian Grothoff (and other contributing authors)
-
    GNUnet is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published
    by the Free Software Foundation; either version 3, or (at your
    option) any later version.
-
    GNUnet is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    General Public License for more details.
-
    You should have received a copy of the GNU General Public License
    along with GNUnet; see the file COPYING.  If not, write to the
    Free Software Foundation, Inc., 59 Temple Place - Suite 330,
    Boston, MA 02111-1307, USA.
-*/
+ This file is part of GNUnet.
+ Copyright (C) 2010-2013 GNUnet e.V.
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING.  If not, write to the
Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
+ */
 
 /**
  * @file transport/gnunet-service-transport_manipulation.c
 #include "gnunet-service-transport.h"
 #include "transport.h"
 
-static struct GNUNET_CONTAINER_MultiHashMap *peers;
 
-#define DELAY 0
-#define DISTANCE 1
+/**
+ * Struct containing information about manipulations to a specific peer
+ */
+struct TM_Peer
+{
+  /**
+   * Peer ID
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  /**
+   * How long to delay incoming messages for this peer.
+   */
+  struct GNUNET_TIME_Relative delay_in;
+
+  /**
+   * How long to delay outgoing messages for this peer.
+   */
+  struct GNUNET_TIME_Relative delay_out;
+
+  /**
+   * Manipulated properties to use for this peer.
+   */
+  struct GNUNET_ATS_Properties properties;
+
+  /**
+   * Task to schedule delayed sendding
+   */
+  struct GNUNET_SCHEDULER_Task *send_delay_task;
+
+  /**
+   * Send queue DLL head
+   */
+  struct DelayQueueEntry *send_head;
+
+  /**
+   * Send queue DLL tail
+   */
+  struct DelayQueueEntry *send_tail;
+};
 
-struct TM_Peer;
 
+/**
+ * Entry in the delay queue for an outbound delayed message
+ */
 struct DelayQueueEntry
 {
-       struct DelayQueueEntry *prev;
-       struct DelayQueueEntry *next;
-       struct TM_Peer *tmp;
-       struct GNUNET_TIME_Absolute sent_at;
-       void *msg;
-       size_t msg_size;
-       struct GNUNET_TIME_Relative timeout;
-       GST_NeighbourSendContinuation cont;
-       void *cont_cls;
+  /**
+   * Next in DLL
+   */
+  struct DelayQueueEntry *prev;
+
+  /**
+   * Previous in DLL
+   */
+  struct DelayQueueEntry *next;
+
+  /**
+   * Peer this entry is belonging to if (NULL == tmp): enqueued in
+   * generic DLL and scheduled by generic_send_delay_task else:
+   * enqueued in tmp->send_head and tmp->send_tail and scheduled by
+   * tmp->send_delay_task
+   */
+  struct TM_Peer *tmp;
+
+  /**
+   * Peer ID
+   */
+  struct GNUNET_PeerIdentity id;
+
+  /**
+   * Absolute time when to send
+   */
+  struct GNUNET_TIME_Absolute sent_at;
+
+  /**
+   * The message
+   */
+  void *msg;
+
+  /**
+   * The message size
+   */
+  size_t msg_size;
+
+  /**
+   * Message timeout
+   */
+  struct GNUNET_TIME_Relative timeout;
+
+  /**
+   * Transports send continuation
+   */
+  GST_NeighbourSendContinuation cont;
+
+  /**
+   * Transports send continuation cls
+   */
+  void *cont_cls;
 };
 
-struct TM_Peer
-{
-       struct GNUNET_PeerIdentity peer;
-       uint32_t metrics [TM_BOTH][GNUNET_ATS_QualityPropertiesCount];
-       GNUNET_SCHEDULER_TaskIdentifier send_delay_task;
-       struct DelayQueueEntry *send_head;
-       struct DelayQueueEntry *send_tail;
-};
+/**
+ * Hashmap contain all peers currently manipulated
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *peers;
+
+/**
+ * Inbound delay to apply to all peers.
+ */
+static struct GNUNET_TIME_Relative delay_in;
 
+/**
+ * Outbound delay to apply to all peers.
+ */
+static struct GNUNET_TIME_Relative delay_out;
 
+/**
+ * DLL head for delayed messages based on general delay
+ */
+static struct DelayQueueEntry *generic_dqe_head;
 
-static void
-set_delay(struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction, uint32_t value)
-{
-       uint32_t val;
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Set traffic metrics %s for peer `%s' in direction %s to %u\n",
-                       "DELAY", GNUNET_i2s(peer),
-                       (TM_BOTH == direction) ? "BOTH" : (TM_SEND == direction) ? "SEND": "RECEIVE", value);
-
-       if (UINT32_MAX == value)
-               val = UINT32_MAX - 1; /* prevent overflow */
-       else if (0 == value)
-               val = UINT32_MAX; /* disable */
-       else
-               val = value;
-
-       switch (direction) {
-               case TM_BOTH:
-                       tmp->metrics[TM_SEND][DELAY] = val;
-                       tmp->metrics[TM_RECEIVE][DELAY] = val;
-                       break;
-               case TM_SEND:
-                       tmp->metrics[TM_SEND][DELAY] = val;
-                       break;
-               case TM_RECEIVE:
-                       tmp->metrics[TM_RECEIVE][DELAY] = val;
-                       break;
-               default:
-                       break;
-       }
+/**
+ * DLL tail for delayed messages based on general delay
+ */
+static struct DelayQueueEntry *generic_dqe_tail;
+
+/**
+ * Task to schedule delayed sending based on general delay
+ */
+static struct GNUNET_SCHEDULER_Task *generic_send_delay_task;
 
+
+/**
+ * Set traffic metric to manipulate
+ *
+ * @param cls closure
+ * @param client client sending message
+ * @param message containing information
+ */
+void
+GST_manipulation_set_metric (void *cls,
+                             struct GNUNET_SERVER_Client *client,
+                             const struct GNUNET_MessageHeader *message)
+{
+  const struct TrafficMetricMessage *tm;
+  static struct GNUNET_PeerIdentity zero;
+  struct TM_Peer *tmp;
+
+  tm = (const struct TrafficMetricMessage *) message;
+  if (0 == memcmp (&tm->peer,
+                   &zero,
+                   sizeof(struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received traffic metrics for all peers\n");
+    delay_in = GNUNET_TIME_relative_ntoh (tm->delay_in);
+    delay_out = GNUNET_TIME_relative_ntoh (tm->delay_out);
+    GNUNET_SERVER_receive_done (client,
+                                GNUNET_OK);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received traffic metrics for peer `%s'\n",
+              GNUNET_i2s(&tm->peer));
+  if (NULL ==
+      (tmp = GNUNET_CONTAINER_multipeermap_get (peers,
+                                                &tm->peer)))
+  {
+    tmp = GNUNET_new (struct TM_Peer);
+    tmp->peer = tm->peer;
+    GNUNET_CONTAINER_multipeermap_put (peers,
+                                       &tm->peer,
+                                       tmp,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  GNUNET_ATS_properties_ntoh (&tmp->properties,
+                              &tm->properties);
+  tmp->delay_in = GNUNET_TIME_relative_ntoh (tm->delay_in);
+  tmp->delay_out = GNUNET_TIME_relative_ntoh (tm->delay_out);
+  GNUNET_SERVER_receive_done (client,
+                              GNUNET_OK);
 }
 
+
+/**
+ * We have delayed transmission, now it is time to send the
+ * message.
+ *
+ * @param cls the `struct DelayQueueEntry` to transmit
+ */
 static void
-set_distance (struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction, uint32_t value)
+send_delayed (void *cls)
 {
-       uint32_t val;
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Set traffic metrics %s for peer `%s' in direction %s to %u\n",
-                       "DISTANCE", GNUNET_i2s(peer),
-                       (TM_BOTH == direction) ? "BOTH" : (TM_SEND == direction) ? "SEND": "RECEIVE", value);
-
-       if (UINT32_MAX == value)
-               val = UINT32_MAX - 1; /* prevent overflow */
-       else if (0 == value)
-               val = UINT32_MAX; /* disable */
-       else
-               val = value;
-
-       switch (direction) {
-       case TM_BOTH:
-               tmp->metrics[TM_SEND][DISTANCE] = val;
-               tmp->metrics[TM_RECEIVE][DISTANCE] = val;
-               break;
-       case TM_SEND:
-               tmp->metrics[TM_SEND][DISTANCE] = val;
-               break;
-       case TM_RECEIVE:
-               tmp->metrics[TM_RECEIVE][DISTANCE] = val;
-               break;
-       default:
-               break;
-       }
+  struct DelayQueueEntry *dqe = cls;
+  struct DelayQueueEntry *next;
+  struct TM_Peer *tmp = dqe->tmp;
+  struct GNUNET_TIME_Relative delay;
+
+  GNUNET_break (GNUNET_YES ==
+                GST_neighbours_test_connected (&dqe->id));
+  if (NULL != tmp)
+  {
+    tmp->send_delay_task = NULL;
+    GNUNET_CONTAINER_DLL_remove (tmp->send_head,
+                                 tmp->send_tail,
+                                 dqe);
+    next = tmp->send_head;
+    if (NULL != next)
+    {
+      /* More delayed messages */
+      delay = GNUNET_TIME_absolute_get_remaining(next->sent_at);
+      tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed(delay,
+                                                          &send_delayed, next);
+    }
+  }
+  else
+  {
+    /* Remove from generic queue */
+    generic_send_delay_task = NULL;
+    GNUNET_CONTAINER_DLL_remove (generic_dqe_head,
+                                 generic_dqe_tail,
+                                 dqe);
+    next = generic_dqe_head;
+    if (NULL != next)
+    {
+      /* More delayed messages */
+      delay = GNUNET_TIME_absolute_get_remaining(next->sent_at);
+      generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                              &send_delayed,
+                                                              next);
+    }
+  }
+  GST_neighbours_send (&dqe->id,
+                       dqe->msg,
+                       dqe->msg_size,
+                       dqe->timeout,
+                       dqe->cont,
+                       dqe->cont_cls);
+  GNUNET_free(dqe);
 }
 
+
+/**
+ * Adapter function between transport's send function and transport plugins.
+ * Delays message transmission if an artificial delay is configured.
+ *
+ * @param target the peer the message to send to
+ * @param msg the message received
+ * @param msg_size message size
+ * @param timeout timeout
+ * @param cont the continuation to call after sending
+ * @param cont_cls cls for @a cont
+ */
 void
-GST_manipulation_set_metric (void *cls, struct GNUNET_SERVER_Client *client,
-    const struct GNUNET_MessageHeader *message)
+GST_manipulation_send (const struct GNUNET_PeerIdentity *target,
+                       const void *msg,
+                       size_t msg_size,
+                       struct GNUNET_TIME_Relative timeout,
+                       GST_NeighbourSendContinuation cont,
+                       void *cont_cls)
 {
-       struct TrafficMetricMessage *tm = (struct TrafficMetricMessage *) message;
-       struct GNUNET_ATS_Information *ats;
-       struct TM_Peer *tmp;
-       uint32_t type;
-       uint32_t value;
-       int c;
-       int c2;
-
-       if (0 == ntohs (tm->ats_count))
-         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received traffic metrics for peer `%s'\n",
-                       GNUNET_i2s(&tm->peer));
-
-       if (NULL == (tmp = GNUNET_CONTAINER_multihashmap_get (peers, &tm->peer.hashPubKey)))
-       {
-                       tmp = GNUNET_malloc (sizeof (struct TM_Peer));
-                       tmp->peer = (tm->peer);
-                       for (c = 0; c < TM_BOTH; c++)
-                       {
-                                       for (c2 = 0; c2 < GNUNET_ATS_QualityPropertiesCount; c2++)
-                                       {
-                                                       tmp->metrics[c][c2] = UINT32_MAX;
-                                       }
-                       }
-                       GNUNET_CONTAINER_multihashmap_put (peers, &tm->peer.hashPubKey, tmp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-       }
-
-       ats = (struct GNUNET_ATS_Information *) &tm[1];
-       for (c = 0; c < ntohs (tm->ats_count); c++)
-       {
-                       type = htonl (ats[c].type);
-                       value = htonl (ats[c].value);
-                       switch (type) {
-                               case GNUNET_ATS_QUALITY_NET_DELAY:
-                                       set_delay (tmp, &tm->peer, ntohs (tm->direction), value);
-                                       break;
-                               case GNUNET_ATS_QUALITY_NET_DISTANCE:
-                                       set_distance (tmp, &tm->peer, ntohs (tm->direction), value);
-                                       break;
-                               default:
-                                       break;
-                       }
-       }
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  struct TM_Peer *tmp;
+  struct DelayQueueEntry *dqe;
+  struct GNUNET_TIME_Relative delay;
+
+  if (NULL != (tmp =
+               GNUNET_CONTAINER_multipeermap_get (peers,
+                                                  target)))
+    delay = tmp->delay_out;
+  else
+    delay = delay_out;
+  if (0 == delay.rel_value_us)
+  {
+    /* Normal sending */
+    GST_neighbours_send (target,
+                         msg,
+                         msg_size,
+                         timeout,
+                         cont, cont_cls);
+    return;
+  }
+  dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
+  dqe->id = *target;
+  dqe->tmp = tmp;
+  dqe->sent_at = GNUNET_TIME_relative_to_absolute (delay);
+  dqe->cont = cont;
+  dqe->cont_cls = cont_cls;
+  dqe->msg = &dqe[1];
+  dqe->msg_size = msg_size;
+  dqe->timeout = timeout;
+  memcpy (dqe->msg,
+          msg,
+          msg_size);
+  if (NULL == tmp)
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (generic_dqe_head,
+                                      generic_dqe_tail,
+                                      dqe);
+    if (NULL == generic_send_delay_task)
+      generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                              &send_delayed,
+                                                              dqe);
+  }
+  else
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head,
+                                      tmp->send_tail,
+                                      dqe);
+    if (NULL == tmp->send_delay_task)
+      tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                           &send_delayed,
+                                                           dqe);
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delaying %u byte message to peer `%s' with peer specific delay for %s\n",
+              (unsigned int) msg_size,
+              GNUNET_i2s (target),
+              GNUNET_STRINGS_relative_time_to_string (delay,
+                                                      GNUNET_YES));
 }
 
-static void
-send_delayed (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-       struct DelayQueueEntry *dqe = cls;
-       struct DelayQueueEntry *next;
-       struct TM_Peer *tmp = dqe->tmp;
-       struct GNUNET_TIME_Relative delay;
-       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
-       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
-       GST_neighbours_send (&tmp->peer, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
-
-       next = tmp->send_head;
-       if (NULL != next)
-       {
-                       /* More delayed messages */
-                       delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
-                       tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
-       }
-
-       GNUNET_free (dqe);
-}
 
+/**
+ * Function that will be called to manipulate ATS information according to
+ * current manipulation settings
+ *
+ * @param address binary address
+ * @param session the session
+ * @param prop[IN|OUT] metrics to modify
+ */
 void
-GST_manipulation_send (const struct GNUNET_PeerIdentity *target, const void *msg,
-    size_t msg_size, struct GNUNET_TIME_Relative timeout,
-    GST_NeighbourSendContinuation cont, void *cont_cls)
+GST_manipulation_manipulate_metrics (const struct GNUNET_HELLO_Address *address,
+                                     struct GNUNET_ATS_Session *session,
+                                     struct GNUNET_ATS_Properties *prop)
 {
-       struct TM_Peer *tmp;
-       struct DelayQueueEntry *dqe;
-       struct GNUNET_TIME_Relative delay;
-
-       if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (peers, &target->hashPubKey)))
-       {
-                       /* Manipulate here */
-                       /* Delay */
-                       if (UINT32_MAX != tmp->metrics[TM_SEND][DELAY])
-                       {
-                                       /* We have a delay */
-                                       delay.rel_value = tmp->metrics[TM_SEND][DELAY];
-                                       dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
-                                       dqe->tmp = tmp;
-                                       dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
-                                       dqe->cont = cont;
-                                       dqe->cont_cls = cont_cls;
-                                       dqe->msg = &dqe[1];
-                                       dqe->msg_size = msg_size;
-                                       dqe->timeout = timeout;
-                                       memcpy (dqe->msg, msg, msg_size);
-                                       GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
-                                       if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
-                                               tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
-                                       return;
-                       }
-       }
-       /* Normal sending */
-       GST_neighbours_send (target, msg, msg_size, timeout, cont, cont_cls);
+  const struct GNUNET_PeerIdentity *peer = &address->peer;
+  struct TM_Peer *tmp;
+
+  tmp = GNUNET_CONTAINER_multipeermap_get (peers,
+                                           peer);
+  if (NULL != tmp)
+    *prop = tmp->properties;
 }
 
+
+/**
+ * Adapter function between transport plugins and transport receive function
+ * manipulation delays for next send.
+ *
+ * @param cls the closure for transport
+ * @param address the address and the peer the message was received from
+ * @param message the message received
+ * @param session the session the message was received on
+ * @return manipulated delay for next receive
+ */
 struct GNUNET_TIME_Relative
-GST_manipulation_recv (void *cls, const struct GNUNET_PeerIdentity *peer,
-    const struct GNUNET_MessageHeader *message,
-    const struct GNUNET_ATS_Information *ats,
-    uint32_t ats_count, struct Session *session,
-    const char *sender_address,
-    uint16_t sender_address_len)
+GST_manipulation_recv (void *cls,
+                       const struct GNUNET_HELLO_Address *address,
+                       struct GNUNET_ATS_Session *session,
+                       const struct GNUNET_MessageHeader *message)
 {
-       struct TM_Peer *tmp;
-       int d;
-       struct GNUNET_ATS_Information ats_new[ats_count];
-       struct GNUNET_TIME_Relative q_delay;
-       struct GNUNET_TIME_Relative m_delay;
-
-       for (d = 0; d < ats_count; d++)
-
-       if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey)))
-       {
-                       /* Manipulate distance */
-                       for (d = 0; d < ats_count; d++)
-                       {
-                                       ats_new[d] = ats[d];
-                                       /* Set distance */
-                                       if ((ntohl(ats[d].type) == GNUNET_ATS_QUALITY_NET_DISTANCE) &&
-                                                (UINT32_MAX != tmp->metrics[TM_RECEIVE][DISTANCE]))
-                                                       ats_new[d].value = htonl(tmp->metrics[TM_RECEIVE][DISTANCE]);
-                       }
-                       /* Manipulate receive delay */
-                       if (UINT32_MAX != tmp->metrics[TM_RECEIVE][DELAY])
-                       {
-                                       m_delay.rel_value = tmp->metrics[TM_RECEIVE][DELAY];
-                                       q_delay = GST_receive_callback (cls, peer, message, &ats_new[0], ats_count,
-                                                       session, sender_address, sender_address_len);
-
-                                       if (q_delay.rel_value >= m_delay.rel_value)
-                                       {
-                                                       return q_delay;
-                                       }
-                                       else
-                                       {
-                                                       return m_delay;
-                                       }
-                       }
-                       else
-                               return GST_receive_callback (cls, peer, message, &ats_new[0], ats_count,
-                                               session, sender_address, sender_address_len);
-       }
-
-       return GST_receive_callback (cls, peer, message, ats, ats_count,
-                       session, sender_address, sender_address_len);
+  struct TM_Peer *tmp;
+  struct GNUNET_TIME_Relative quota_delay;
+  struct GNUNET_TIME_Relative m_delay;
+
+  if (NULL !=
+      (tmp = GNUNET_CONTAINER_multipeermap_get (peers,
+                                                &address->peer)))
+    m_delay = tmp->delay_in;
+  else
+    m_delay = delay_in;
+
+  quota_delay = GST_receive_callback (cls,
+                                      address,
+                                      session,
+                                      message);
+  m_delay = GNUNET_TIME_relative_max (m_delay,
+                                      quota_delay);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delaying next receive for peer `%s' for %s\n",
+              GNUNET_i2s (&address->peer),
+              GNUNET_STRINGS_relative_time_to_string (m_delay,
+                                                      GNUNET_YES));
+  return m_delay;
 }
 
+
+/**
+ * Initialize traffic manipulation
+ */
 void
 GST_manipulation_init ()
 {
-       peers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
+  struct GNUNET_TIME_Relative delay;
+
+  if ( (GNUNET_OK ==
+        GNUNET_CONFIGURATION_get_value_time (GST_cfg,
+                                             "transport",
+                                             "MANIPULATE_DELAY_IN",
+                                             &delay)) &&
+       (delay.rel_value_us > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Delaying inbound traffic for %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+    delay_in = delay;
+  }
+  if ( (GNUNET_OK ==
+        GNUNET_CONFIGURATION_get_value_time (GST_cfg,
+                                             "transport",
+                                             "MANIPULATE_DELAY_OUT",
+                                             &delay)) &&
+       (delay.rel_value_us > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Delaying outbound traffic for %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+    delay_out = delay;
+  }
+  peers = GNUNET_CONTAINER_multipeermap_create (4,
+                                                GNUNET_NO);
 }
 
-int free_tmps (void *cls,
-                                                        const struct GNUNET_HashCode * key,
-                                                        void *value)
+
+/**
+ * Notify manipulation about disconnect so it can discard queued messages
+ *
+ * @param peer the disconnecting peer
+ */
+void
+GST_manipulation_peer_disconnect (const struct GNUNET_PeerIdentity *peer)
 {
-       struct DelayQueueEntry *dqe;
-       struct DelayQueueEntry *next;
-       if (NULL != value)
-       {
-                       struct TM_Peer *tmp = (struct TM_Peer *) value;
-                       GNUNET_CONTAINER_multihashmap_remove (peers, key, value);
-                       next = tmp->send_head;
-                       while (NULL != (dqe = next))
-                       {
-                                       next = dqe->next;
-                                       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
-                                       GNUNET_free (dqe);
-                       }
-                       if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
-                       {
-                                       GNUNET_SCHEDULER_cancel (tmp->send_delay_task);
-                                       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
-                       }
-                       GNUNET_free (tmp);
-       }
-       return GNUNET_OK;
+  struct TM_Peer *tmp;
+  struct DelayQueueEntry *dqe;
+  struct DelayQueueEntry *next;
+
+  tmp = GNUNET_CONTAINER_multipeermap_get (peers,
+                                           peer);
+  if (NULL != tmp)
+  {
+    while (NULL != (dqe = tmp->send_head))
+    {
+      GNUNET_CONTAINER_DLL_remove (tmp->send_head,
+                                   tmp->send_tail,
+                                   dqe);
+      if (NULL != dqe->cont)
+        dqe->cont (dqe->cont_cls,
+                   GNUNET_SYSERR,
+                   dqe->msg_size,
+                   0);
+      GNUNET_free(dqe);
+    }
+  }
+  next = generic_dqe_head;
+  while (NULL != (dqe = next))
+  {
+    next = dqe->next;
+    if (0 == memcmp(peer, &dqe->id, sizeof(dqe->id)))
+    {
+      GNUNET_CONTAINER_DLL_remove (generic_dqe_head,
+                                   generic_dqe_tail,
+                                   dqe);
+      if (NULL != dqe->cont)
+        dqe->cont (dqe->cont_cls,
+                   GNUNET_SYSERR,
+                   dqe->msg_size,
+                   0);
+      GNUNET_free(dqe);
+    }
+  }
+  if (NULL != generic_send_delay_task)
+  {
+    GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+    generic_send_delay_task = NULL;
+    if (NULL != generic_dqe_head)
+      generic_send_delay_task
+        = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining(generic_dqe_head->sent_at),
+                                        &send_delayed,
+                                        generic_dqe_head);
+  }
 }
 
+
+/**
+ * Free manipulation information about a peer.
+ *
+ * @param cls NULL
+ * @param key peer the info is about
+ * @param value a `struct TM_Peer` to free
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+free_tmps (void *cls,
+           const struct GNUNET_PeerIdentity *key,
+           void *value)
+{
+  struct TM_Peer *tmp = value;
+  struct DelayQueueEntry *dqe;
+
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (peers,
+                                                      key,
+                                                      value));
+  while (NULL != (dqe = tmp->send_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (tmp->send_head,
+                                 tmp->send_tail,
+                                 dqe);
+    if (NULL != dqe->cont)
+      dqe->cont (dqe->cont_cls,
+                 GNUNET_SYSERR,
+                 dqe->msg_size,
+                 0);
+    GNUNET_free (dqe);
+  }
+  if (NULL != tmp->send_delay_task)
+  {
+    GNUNET_SCHEDULER_cancel (tmp->send_delay_task);
+    tmp->send_delay_task = NULL;
+  }
+  GNUNET_free (tmp);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Stop traffic manipulation
+ */
 void
 GST_manipulation_stop ()
 {
-       GNUNET_CONTAINER_multihashmap_iterate (peers, &free_tmps,NULL);
-
-       GNUNET_CONTAINER_multihashmap_destroy (peers);
-       peers = NULL;
+  struct DelayQueueEntry *cur;
+
+  GNUNET_CONTAINER_multipeermap_iterate (peers,
+                                         &free_tmps,
+                                         NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (peers);
+  peers = NULL;
+  while (NULL != (cur = generic_dqe_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (generic_dqe_head,
+                                 generic_dqe_tail,
+                                 cur);
+    if (NULL != cur->cont)
+      cur->cont (cur->cont_cls,
+                 GNUNET_SYSERR,
+                 cur->msg_size,
+                 0);
+    GNUNET_free (cur);
+  }
+  if (NULL != generic_send_delay_task)
+  {
+    GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+    generic_send_delay_task = NULL;
+  }
 }
 
-
 /* end of file gnunet-service-transport_manipulation.c */