add changelog
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
index f07e1c88d1e5a14fae380f6d4c83c4766c94e1f2..8d68d90a67665ee9add0c61d9f4b1554bf94c02d 100644 (file)
@@ -1,19 +1,19 @@
 /*
- This file is part of GNUnet.
- Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
  This file is part of GNUnet.
  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
 
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
  GNUnet is free software: you can redistribute it and/or modify it
  under the terms of the GNU Affero General Public License as published
  by the Free Software Foundation, either version 3 of the License,
  or (at your option) any later version.
 
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- Affero General Public License for more details.
  GNUnet is distributed in the hope that it will be useful, but
  WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Affero General Public License for more details.
 
- You should have received a copy of the GNU Affero General Public License
- along with this program.  If not, see <http://www.gnu.org/licenses/>.
  You should have received a copy of the GNU Affero General Public License
  along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
      SPDX-License-Identifier: AGPL3.0-or-later
  */
  *
  * TODO:
  * Implement next:
- * - realize "pull" based logic (#handle_client_send()) for
- *   `struct PendingMessage` which waits for a queue on any
- *   applicable route to be 'ready', in contrast
- *   to the 'push' based routing we use for control messages.
- *   Basically, when a queue goes idle, it should "search"
- *   via its neighbour for either virtual links or DVH's that
- *   have it as first hop and then find messages in those
- *   virtual links!
- * - realize transport-to-transport flow control (needed in case
- *   communicators do not offer flow control).  Note that we may not
- *   want to simply delay the ACKs as that may cause unnecessary
- *   re-transmissions. => Introduce proper flow and congestion window(s)!
  * - review retransmission logic, right now there is no smartness there!
- *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
+ *   => congestion control, etc [PERFORMANCE-BASICS]
  *
- * Optimizations:
+ * Optimizations-Statistics:
+ * - Track ACK losses based on ACK-counter [ROUTING]
+ * - Need to track total bandwidth per VirtualLink and adjust how frequently
+ *   we send FC messages based on bandwidth-delay-product (and relation
+ *   to the window size!). See OPTIMIZE-FC-BDP.
+ * - Consider more statistics in #check_connection_quality() [FIXME-CONQ-STATISTICS]
+ * - Adapt available_fc_window_size, using larger values for high-bandwidth
+ *   and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls]
+ * - Set last_window_consum_limit promise properly based on
+ *   latency and bandwidth of the respective connection [GOODPUT / utilization / stalls]
+ *
+ * Optimizations-DV:
  * - When forwarding DV learn messages, if a peer is reached that
  *   has a *bidirectional* link to the origin beyond 1st hop,
  *   do NOT forward it to peers _other_ than the origin, as
  *   there is clearly a better path directly from the origin to
  *   whatever else we could reach.
- * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
- *   => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
- * - queue_send_msg and route_message both by API design have to make copies
- *   of the payload, and route_message on top of that requires a malloc/free.
- *   Change design to approximate "zero" copy better... [CPU]
- * - could avoid copying body of message into each fragment and keep
- *   fragments as just pointers into the original message and only
- *   fully build fragments just before transmission (optimization, should
- *   reduce CPU and memory use) [CPU, MEMORY]
- * - if messages are below MTU, consider adding ACKs and other stuff
- *   to the same transmission to avoid tiny messages (requires planning at
- *   receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
  * - When we passively learned DV (with unconfirmed freshness), we
  *   right now add the path to our list but with a zero path_valid_until
  *   time and only use it for unconfirmed routes.  However, we could consider
  *   triggering an explicit validation mechansim ourselves, specifically routing
  *   a challenge-response message over the path [ROUTING]
- * - Track ACK losses based on ACK-counter [ROUTING]
+ * = if available, try to confirm unconfirmed DV paths when trying to establish
+ *   virtual link for a `struct IncomingRequest`. (i.e. if DVH is
+ *   unconfirmed, incoming requests cause us to try to validate a passively
+ *   learned path (requires new message type!))
+ *
+ * Optimizations-Fragmentation:
+ * - Fragments send over a reliable channel could do without the
+ *   AcknowledgementUUIDP altogether, as they won't be acked! [BANDWIDTH]
+ *   (-> have 2nd type of acknowledgment message; low priority, as we
+ *       do not have an MTU-limited *reliable* communicator) [FIXME-FRAG-REL-UUID]
+ * - if messages are below MTU, consider adding ACKs and other stuff
+ *   to the same transmission to avoid tiny messages (requires planning at
+ *   receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
  *
- * Design realizations / discussion:
- * - communicators do flow control by calling MQ "notify sent"
- *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
- *   or explicitly via backchannel FC ACKs.  As long as the
- *   channel is not full, they may 'notify sent' even if the other
- *   peer has not yet confirmed receipt. The other peer confirming
- *   is _only_ for FC, not for more reliable transmission; reliable
- *   transmission (i.e. of fragments) is left to _transport_.
- * - ACKs sent back in uni-directional communicators are done via
- *   the background channel API; here transport _may_ initially
- *   broadcast (with bounded # hops) if no path is known;
- * - transport should _integrate_ DV-routing and build a view of
- *   the network; then background channel traffic can be
- *   routed via DV as well as explicit "DV" traffic.
- * - background channel is also used for ACKs and NAT traversal support
- * - transport service is responsible for AEAD'ing the background
- *   channel, timestamps and monotonic time are used against replay
- *   of old messages -> peerstore needs to be supplied with
- *   "latest timestamps seen" data
- * - if transport implements DV, we likely need a 3rd peermap
- *   in addition to ephemerals and (direct) neighbours
- *   ==> check if stuff needs to be moved out of "Neighbour"
- * - transport should encapsualte core-level messages and do its
- *   own ACKing for RTT/goodput/loss measurements _and_ fragment
- *   for retransmission
+ * Optimizations-internals:
+ * - queue_send_msg by API design has to make a copy
+ *   of the payload, and route_message on top of that requires a malloc/free.
+ *   Change design to approximate "zero" copy better... [CPU]
+ * - could avoid copying body of message into each fragment and keep
+ *   fragments as just pointers into the original message and only
+ *   fully build fragments just before transmission (optimization, should
+ *   reduce CPU and memory use) [CPU, MEMORY]
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
  */
 #define MAX_CUMMULATIVE_ACKS 64
 
+/**
+ * What is the 1:n chance that we send a Flow control response when
+ * receiving a flow control message that did not change anything for
+ * us? Basically, this is used in the case where both peers are stuck
+ * on flow control (no window changes), but one might continue sending
+ * flow control messages to the other peer as the first FC message
+ * when things stalled got lost, and then subsequently the other peer
+ * does *usually* not respond as nothing changed.  So to ensure that
+ * eventually the FC messages stop, we do send with 1/8th probability
+ * an FC message even if nothing changed.  That prevents one peer
+ * being stuck in sending (useless) FC messages "forever".
+ */
+#define FC_NO_CHANGE_REPLY_PROBABILITY 8
+
 /**
  * What is the size we assume for a read operation in the
  * absence of an MTU for the purpose of flow control?
  */
 #define GOODPUT_AGING_SLOTS 4
 
+/**
+ * How big is the flow control window size by default;
+ * limits per-neighbour RAM utilization.
+ */
+#define DEFAULT_WINDOW_SIZE (128 * 1024)
+
+/**
+ * For how many incoming connections do we try to create a
+ * virtual link for (at the same time!).  This does NOT
+ * limit the number of incoming connections, just the number
+ * for which we are actively trying to find working addresses
+ * in the absence (!) of our own applications wanting the
+ * link to go up.
+ */
+#define MAX_INCOMING_REQUEST 16
+
 /**
  * Maximum number of peers we select for forwarding DVInit
  * messages at the same time (excluding initiator).
 #define DELAY_WARN_THRESHOLD \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
+/**
+ * If a DVBox could not be forwarded after this number of
+ * seconds we drop it.
+ */
+#define DV_FORWARD_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
 /**
  * We only consider queues as "quality" connections when
  * suppressing the generation of DV initiation messages if
@@ -305,9 +324,9 @@ struct MessageUUIDP
 struct AcknowledgementUUIDP
 {
   /**
-   * The UUID value.  Not actually a hash, but a random value.
+   * The UUID value.
    */
-  struct GNUNET_ShortHashCode value;
+  struct GNUNET_Uuid value;
 };
 
 
@@ -345,7 +364,6 @@ struct TransportBackchannelEncapsulationMessage
  */
 struct EphemeralConfirmationPS
 {
-
   /**
    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
    */
@@ -386,7 +404,6 @@ struct EphemeralConfirmationPS
  */
 struct TransportDVBoxPayloadP
 {
-
   /**
    * Sender's peer identity.
    */
@@ -791,14 +808,13 @@ struct TransportDVBoxMessage
  */
 struct TransportValidationChallengeMessage
 {
-
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE
    */
   struct GNUNET_MessageHeader header;
 
   /**
-   * Zero.
+   * Always zero.
    */
   uint32_t reserved GNUNET_PACKED;
 
@@ -808,8 +824,8 @@ struct TransportValidationChallengeMessage
   struct ChallengeNonceP challenge;
 
   /**
-   * Timestamp of the sender, to be copied into the reply
-   * to allow sender to calculate RTT.
+   * Timestamp of the sender, to be copied into the reply to allow
+   * sender to calculate RTT.  Must be monotonically increasing!
    */
   struct GNUNET_TIME_AbsoluteNBO sender_time;
 };
@@ -821,7 +837,6 @@ struct TransportValidationChallengeMessage
  */
 struct TransportValidationPS
 {
-
   /**
    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE
    */
@@ -841,19 +856,18 @@ struct TransportValidationPS
 
 
 /**
- * Message send to a peer to respond to a
+ * Message  send to a peer to respond to a
  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
  */
 struct TransportValidationResponseMessage
 {
-
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE
    */
   struct GNUNET_MessageHeader header;
 
   /**
-   * Zero.
+   * Always zero.
    */
   uint32_t reserved GNUNET_PACKED;
 
@@ -882,6 +896,65 @@ struct TransportValidationResponseMessage
 };
 
 
+/**
+ * Message for Transport-to-Transport Flow control. Specifies the size
+ * of the flow control window, including how much we believe to have
+ * consumed (at transmission time), how much we believe to be allowed
+ * (at transmission time), and how much the other peer is allowed to
+ * send to us, and how much data we already received from the other
+ * peer.
+ */
+struct TransportFlowControlMessage
+{
+  /**
+   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Sequence number of the flow control message. Incremented by one
+   * for each message.  Starts at zero when a virtual link goes up.
+   * Used to detect one-sided connection drops. On wrap-around, the
+   * flow control counters will be reset as if the connection had
+   * dropped.
+   */
+  uint32_t seq GNUNET_PACKED;
+
+  /**
+   * Flow control window size in bytes, in NBO.
+   * The receiver can send this many bytes at most.
+   */
+  uint64_t inbound_window_size GNUNET_PACKED;
+
+  /**
+   * How many bytes has the sender sent that count for flow control at
+   * this time.  Used to allow the receiver to estimate the packet
+   * loss rate.
+   */
+  uint64_t outbound_sent GNUNET_PACKED;
+
+  /**
+   * Latest flow control window size we learned from the other peer,
+   * in bytes, in NBO.  We are limited to sending at most this many
+   * bytes to the other peer.  May help the other peer detect when
+   * flow control messages were lost and should thus be retransmitted.
+   * In particular, if the delta to @e outbound_sent is too small,
+   * this signals that we are stalled.
+   */
+  uint64_t outbound_window_size GNUNET_PACKED;
+
+  /**
+   * Timestamp of the sender.  Must be monotonically increasing!
+   * Used to enable receiver to ignore out-of-order packets in
+   * combination with the @e seq. Note that @e seq will go down
+   * (back to zero) whenever either side believes the connection
+   * was dropped, allowing the peers to detect that they need to
+   * reset the counters for the number of bytes sent!
+   */
+  struct GNUNET_TIME_AbsoluteNBO sender_time;
+};
+
+
 GNUNET_NETWORK_STRUCT_END
 
 
@@ -957,7 +1030,6 @@ enum RouteMessageOptions
  */
 struct LearnLaunchEntry
 {
-
   /**
    * Kept (also) in a DLL sorted by launch time.
    */
@@ -1056,6 +1128,16 @@ struct PendingMessage;
  */
 struct DistanceVectorHop;
 
+/**
+ * A virtual link is another reachable peer that is known to CORE.  It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`.  With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink;
+
 
 /**
  * Context from #handle_incoming_msg().  Closure for many
@@ -1063,7 +1145,6 @@ struct DistanceVectorHop;
  */
 struct CommunicatorMessageContext
 {
-
   /**
    * Kept in a DLL of `struct VirtualLink` if waiting for CORE
    * flow control to unchoke.
@@ -1094,6 +1175,41 @@ struct CommunicatorMessageContext
 };
 
 
+/**
+ * Closure for #core_env_sent_cb.
+ */
+struct CoreSentContext
+{
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *next;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *prev;
+
+  /**
+   * Virtual link this is about.
+   */
+  struct VirtualLink *vl;
+
+  /**
+   * How big was the message.
+   */
+  uint16_t size;
+
+  /**
+   * By how much should we increment @e vl's
+   * incoming_fc_window_size_used once we are done sending to CORE?
+   * Use to ensure we do not increment twice if there is more than one
+   * CORE client.
+   */
+  uint16_t isize;
+};
+
+
 /**
  * A virtual link is another reachable peer that is known to CORE.  It
  * can be either a `struct Neighbour` with at least one confirmed
@@ -1131,6 +1247,16 @@ struct VirtualLink
    */
   struct PendingMessage *pending_msg_tail;
 
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *csc_tail;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *csc_head;
+
   /**
    * Task scheduled to possibly notfiy core that this peer is no
    * longer counting as confirmed.  Runs the #core_visibility_check(),
@@ -1139,6 +1265,12 @@ struct VirtualLink
    */
   struct GNUNET_SCHEDULER_Task *visibility_task;
 
+  /**
+   * Task scheduled to periodically retransmit FC messages (in
+   * case one got lost).
+   */
+  struct GNUNET_SCHEDULER_Task *fc_retransmit_task;
+
   /**
    * Neighbour used by this virtual link, NULL if @e dv is used.
    */
@@ -1149,6 +1281,35 @@ struct VirtualLink
    */
   struct DistanceVector *dv;
 
+  /**
+   * Sender timestamp of @e n_challenge, used to generate out-of-order
+   * challenges (as sender's timestamps must be monotonically
+   * increasing).  FIXME: where do we need this?
+   */
+  struct GNUNET_TIME_Absolute n_challenge_time;
+
+  /**
+   * When did we last send a
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message?
+   * Used to determine whether it is time to re-transmit the message.
+   */
+  struct GNUNET_TIME_Absolute last_fc_transmission;
+
+  /**
+   * Sender timestamp of the last
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
+   * received.  Note that we do not persist this monotonic time as we
+   * do not really have to worry about ancient flow control window
+   * sizes after restarts.
+   */
+  struct GNUNET_TIME_Absolute last_fc_timestamp;
+
+  /**
+   * Expected RTT from the last FC transmission. (Zero if the last
+   * attempt failed, but could theoretically be zero even on success.)
+   */
+  struct GNUNET_TIME_Relative last_fc_rtt;
+
   /**
    * Used to generate unique UUIDs for messages that are being
    * fragmented.
@@ -1156,12 +1317,98 @@ struct VirtualLink
   uint64_t message_uuid_ctr;
 
   /**
-   * How many more messages can we send to core before we exhaust
+   * Memory allocated for this virtual link.  Expresses how much RAM
+   * we are willing to allocate to this virtual link.  OPTIMIZE-ME:
+   * Can be adapted to dedicate more RAM to links that need it, while
+   * sticking to some overall RAM limit.  For now, set to
+   * #DEFAULT_WINDOW_SIZE.
+   */
+  uint64_t available_fc_window_size;
+
+  /**
+   * Memory actually used to buffer packets on this virtual link.
+   * Expresses how much RAM we are currently using for virtual link.
+   * Note that once CORE is done with a packet, we decrement the value
+   * here.
+   */
+  uint64_t incoming_fc_window_size_ram;
+
+  /**
+   * Last flow control window size we provided to the other peer, in
+   * bytes.  We are allowing the other peer to send this
+   * many bytes.
+   */
+  uint64_t incoming_fc_window_size;
+
+  /**
+   * How much of the window did the other peer successfully use (and
+   * we already passed it on to CORE)? Must be below @e
+   * incoming_fc_window_size.   We should effectively signal the
+   * other peer that the window is this much bigger at the next
+   * opportunity / challenge.
+   */
+  uint64_t incoming_fc_window_size_used;
+
+  /**
+   * What is our current estimate on the message loss rate for the sender?
+   * Based on the difference between how much the sender sent according
+   * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message
+   * (@e outbound_sent field) and how much we actually received at that
+   * time (@e incoming_fc_window_size_used).  This delta is then
+   * added onto the @e incoming_fc_window_size when determining the
+   * @e outbound_window_size we send to the other peer.  Initially zero.
+   * May be negative if we (due to out-of-order delivery) actually received
+   * more than the sender claims to have sent in its last FC message.
+   */
+  int64_t incoming_fc_window_size_loss;
+
+  /**
+   * Our current flow control window size in bytes.  We
+   * are allowed to transmit this many bytes to @a n.
+   */
+  uint64_t outbound_fc_window_size;
+
+  /**
+   * How much of our current flow control window size have we
+   * used (in bytes).  Must be below
+   * @e outbound_fc_window_size.
+   */
+  uint64_t outbound_fc_window_size_used;
+
+  /**
+   * What is the most recent FC window the other peer sent us
+   * in `outbound_window_size`? This is basically the window
+   * size value the other peer has definitively received from
+   * us. If it matches @e incoming_fc_window_size, we should
+   * not send a FC message to increase the FC window. However,
+   * we may still send an FC message to notify the other peer
+   * that we received the other peer's FC message.
+   */
+  uint64_t last_outbound_window_size_received;
+
+  /**
+   * Generator for the sequence numbers of
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL messages we send.
+   */
+  uint32_t fc_seq_gen;
+
+  /**
+   * Last sequence number of a
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
+   * received.
+   */
+  uint32_t last_fc_seq;
+
+  /**
+   * How many more messages can we send to CORE before we exhaust
    * the receive window of CORE for this peer? If this hits zero,
    * we must tell communicators to stop providing us more messages
    * for this peer.  In fact, the window can go negative as we
    * have multiple communicators, so per communicator we can go
-   * down by one into the negative range.
+   * down by one into the negative range. Furthermore, we count
+   * delivery per CORE client, so if we had multiple cores, that
+   * might also cause a negative window size here (as one message
+   * would decrement the window by one per CORE client).
    */
   int core_recv_window;
 };
@@ -1172,7 +1419,6 @@ struct VirtualLink
  */
 struct PendingAcknowledgement
 {
-
   /**
    * If @e pm is non-NULL, this is the DLL in which this acknowledgement
    * is kept in relation to its pending message.
@@ -1267,7 +1513,6 @@ struct PendingAcknowledgement
  */
 struct DistanceVectorHop
 {
-
   /**
    * Kept in a MDLL, sorted by @e timeout.
    */
@@ -1350,7 +1595,6 @@ struct DistanceVectorHop
  */
 struct DistanceVector
 {
-
   /**
    * To which peer is this a route?
    */
@@ -1375,7 +1619,7 @@ struct DistanceVector
    * Do we have a confirmed working queue and are thus visible to
    * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct VirtualLink *link;
+  struct VirtualLink *vl;
 
   /**
    * Signature affirming @e ephemeral_key of type
@@ -1416,7 +1660,6 @@ struct DistanceVector
  */
 struct QueueEntry
 {
-
   /**
    * Kept as a DLL.
    */
@@ -1565,6 +1808,12 @@ struct Queue
    * Connection status for this queue.
    */
   enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+  /**
+   * Set to #GNUNET_YES if this queue is idle waiting for some
+   * virtual link to give it a pending message.
+   */
+  int idle;
 };
 
 
@@ -1573,7 +1822,6 @@ struct Queue
  */
 struct ReassemblyContext
 {
-
   /**
    * Original message ID for of the message that all the fragments
    * belong to.
@@ -1633,7 +1881,6 @@ struct ReassemblyContext
  */
 struct Neighbour
 {
-
   /**
    * Which peer is this about?
    */
@@ -1696,7 +1943,7 @@ struct Neighbour
    * Do we have a confirmed working queue and are thus visible to
    * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct VirtualLink *link;
+  struct VirtualLink *vl;
 
   /**
    * Latest DVLearn monotonic time seen from this peer.  Initialized only
@@ -1712,12 +1959,39 @@ struct Neighbour
 };
 
 
+/**
+ * Another peer attempted to talk to us, we should try to establish
+ * a connection in the other direction.
+ */
+struct IncomingRequest
+{
+  /**
+   * Kept in a DLL.
+   */
+  struct IncomingRequest *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct IncomingRequest *prev;
+
+  /**
+   * Handle for watching the peerstore for HELLOs for this peer.
+   */
+  struct GNUNET_PEERSTORE_WatchContext *wc;
+
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity pid;
+};
+
+
 /**
  * A peer that an application (client) would like us to talk to directly.
  */
 struct PeerRequest
 {
-
   /**
    * Which peer is this about?
    */
@@ -1752,7 +2026,6 @@ struct PeerRequest
  */
 enum PendingMessageType
 {
-
   /**
    * Ordinary message received from the CORE service.
    */
@@ -1769,15 +2042,9 @@ enum PendingMessageType
   PMT_RELIABILITY_BOX = 2,
 
   /**
-   * Any type of acknowledgement.
+   * Pending message created during #forward_dv_box().
    */
-  PMT_ACKNOWLEDGEMENT = 3,
-
-  /**
-   * Control traffic generated by the TRANSPORT service itself.
-   */
-  PMT_CONTROL = 4
-
+  PMT_DV_BOX = 3
 };
 
 
@@ -1852,8 +2119,8 @@ struct PendingMessage
   struct PendingAcknowledgement *pa_tail;
 
   /**
-   * This message, reliability boxed. Only possibly available if @e pmt is
-   * #PMT_CORE.
+   * This message, reliability *or* DV-boxed. Only possibly available
+   * if @e pmt is #PMT_CORE.
    */
   struct PendingMessage *bpm;
 
@@ -2009,7 +2276,6 @@ struct AcknowledgementCummulator
  */
 struct AddressListEntry
 {
-
   /**
    * Kept in a DLL.
    */
@@ -2063,7 +2329,6 @@ struct AddressListEntry
  */
 struct TransportClient
 {
-
   /**
    * Kept in a DLL.
    */
@@ -2091,13 +2356,11 @@ struct TransportClient
 
   union
   {
-
     /**
      * Information for @e type #CT_CORE.
      */
     struct
     {
-
       /**
        * Head of list of messages pending for this client, sorted by
        * transmission time ("next_attempt" + possibly internal prioritization).
@@ -2108,7 +2371,6 @@ struct TransportClient
        * Tail of list of messages pending for this client.
        */
       struct PendingMessage *pending_msg_tail;
-
     } core;
 
     /**
@@ -2116,7 +2378,6 @@ struct TransportClient
      */
     struct
     {
-
       /**
        * Peer identity to monitor the addresses of.
        * Zero to monitor all neighbours.  Valid if
@@ -2128,7 +2389,6 @@ struct TransportClient
        * Is this a one-shot monitor?
        */
       int one_shot;
-
     } monitor;
 
 
@@ -2176,7 +2436,6 @@ struct TransportClient
        * Characteristics of this communicator.
        */
       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
-
     } communicator;
 
     /**
@@ -2184,15 +2443,12 @@ struct TransportClient
      */
     struct
     {
-
       /**
        * Map of requests for peers the given client application would like to
        * see connections for.  Maps from PIDs to `struct PeerRequest`.
        */
       struct GNUNET_CONTAINER_MultiPeerMap *requests;
-
     } application;
-
   } details;
 };
 
@@ -2203,7 +2459,6 @@ struct TransportClient
  */
 struct ValidationState
 {
-
   /**
    * For which peer is @a address to be validated (or possibly valid)?
    * Serves as key in the #validation_map.
@@ -2292,6 +2547,13 @@ struct ValidationState
    */
   struct GNUNET_PEERSTORE_StoreContext *sc;
 
+  /**
+   * Self-imposed limit on the previous flow control window. (May be zero,
+   * if we never used data from the previous window or are establishing the
+   * connection for the first time).
+   */
+  uint32_t last_window_consum_limit;
+
   /**
    * We are technically ready to send the challenge, but we are waiting for
    * the respective queue to become available for transmission.
@@ -2411,7 +2673,7 @@ static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
  * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
  * a `struct PendingAcknowledgement`.
  */
-static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+static struct GNUNET_CONTAINER_MultiUuidmap *pending_acks;
 
 /**
  * Map from PIDs to `struct DistanceVector` entries describing
@@ -2480,6 +2742,23 @@ static struct PendingAcknowledgement *pa_head;
  */
 static struct PendingAcknowledgement *pa_tail;
 
+/**
+ * List of incomming connections where we are trying
+ * to get a connection back established. Length
+ * kept in #ir_total.
+ */
+static struct IncomingRequest *ir_head;
+
+/**
+ * Tail of DLL starting at #ir_head.
+ */
+static struct IncomingRequest *ir_tail;
+
+/**
+ * Length of the DLL starting at #ir_head.
+ */
+static unsigned int ir_total;
+
 /**
  * Generator of `logging_uuid` in `struct PendingMessage`.
  */
@@ -2522,6 +2801,22 @@ get_age ()
 }
 
 
+/**
+ * Release @a ir data structure.
+ *
+ * @param ir data structure to release
+ */
+static void
+free_incoming_request (struct IncomingRequest *ir)
+{
+  GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
+  GNUNET_assert (ir_total > 0);
+  ir_total--;
+  GNUNET_PEERSTORE_watch_cancel (ir->wc);
+  GNUNET_free (ir);
+}
+
+
 /**
  * Release @a pa data structure.
  *
@@ -2552,9 +2847,9 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa)
     pa->queue = NULL;
   }
   GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multishortmap_remove (pending_acks,
-                                                        &pa->ack_uuid.value,
-                                                        pa));
+                 GNUNET_CONTAINER_multiuuidmap_remove (pending_acks,
+                                                       &pa->ack_uuid.value,
+                                                       pa));
   GNUNET_free (pa);
 }
 
@@ -2628,7 +2923,11 @@ free_pending_message (struct PendingMessage *pm)
     GNUNET_assert (pm == pm->qe->pm);
     pm->qe->pm = NULL;
   }
-  GNUNET_free_non_null (pm->bpm);
+  if (NULL != pm->bpm)
+  {
+    free_fragment_tree (pm->bpm);
+    GNUNET_free (pm->bpm);
+  }
   GNUNET_free (pm);
 }
 
@@ -2642,15 +2941,28 @@ static void
 free_virtual_link (struct VirtualLink *vl)
 {
   struct PendingMessage *pm;
+  struct CoreSentContext *csc;
 
   while (NULL != (pm = vl->pending_msg_head))
     free_pending_message (pm);
-  GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl));
   if (NULL != vl->visibility_task)
   {
     GNUNET_SCHEDULER_cancel (vl->visibility_task);
     vl->visibility_task = NULL;
   }
+  if (NULL != vl->fc_retransmit_task)
+  {
+    GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+    vl->fc_retransmit_task = NULL;
+  }
+  while (NULL != (csc = vl->csc_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
+    GNUNET_assert (vl == csc->vl);
+    csc->vl = NULL;
+  }
   GNUNET_break (NULL == vl->n);
   GNUNET_break (NULL == vl->dv);
   GNUNET_free (vl);
@@ -2665,7 +2977,9 @@ free_virtual_link (struct VirtualLink *vl)
 static void
 free_validation_state (struct ValidationState *vs)
 {
-  GNUNET_CONTAINER_multipeermap_remove (validation_map, &vs->pid, vs);
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (validation_map, &vs->pid, vs));
   GNUNET_CONTAINER_heap_remove_node (vs->hn);
   vs->hn = NULL;
   if (NULL != vs->sc)
@@ -2679,7 +2993,7 @@ free_validation_state (struct ValidationState *vs)
 
 
 /**
- * Lookup neighbour record for peer @a pid.
+ * Lookup neighbour for peer @a pid.
  *
  * @param pid neighbour to look for
  * @return NULL if we do not have this peer as a neighbour
@@ -2691,6 +3005,19 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
 }
 
 
+/**
+ * Lookup virtual link for peer @a pid.
+ *
+ * @param pid virtual link to look for
+ * @return NULL if we do not have this peer as a virtual link
+ */
+static struct VirtualLink *
+lookup_virtual_link (const struct GNUNET_PeerIdentity *pid)
+{
+  return GNUNET_CONTAINER_multipeermap_get (links, pid);
+}
+
+
 /**
  * Details about what to notify monitors about.
  */
@@ -2752,26 +3079,83 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
 
 
 /**
- * Free entry in #dv_routes.  First frees all hops to the target, and
- * if there are no entries left, frees @a dv as well.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
  *
- * @param dv route to free
+ * @param cls a `struct VirtualLink`
  */
 static void
-free_dv_route (struct DistanceVector *dv)
-{
-  struct DistanceVectorHop *dvh;
+check_link_down (void *cls);
 
-  while (NULL != (dvh = dv->dv_head))
-    free_distance_vector_hop (dvh);
-  if (NULL == dv->dv_head)
-  {
-    GNUNET_assert (
-      GNUNET_YES ==
-      GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
-    if (NULL != dv->timeout_task)
-      GNUNET_SCHEDULER_cancel (dv->timeout_task);
-    GNUNET_free (dv);
+
+/**
+ * Send message to CORE clients that we lost a connection.
+ *
+ * @param pid peer the connection was for
+ */
+static void
+cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about disconnect from %s\n",
+              GNUNET_i2s (pid));
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
+    struct GNUNET_MQ_Envelope *env;
+    struct DisconnectInfoMessage *dim;
+
+    if (CT_CORE != tc->type)
+      continue;
+    env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+    dim->peer = *pid;
+    GNUNET_MQ_send (tc->mq, env);
+  }
+}
+
+
+/**
+ * Free entry in #dv_routes.  First frees all hops to the target, and
+ * if there are no entries left, frees @a dv as well.
+ *
+ * @param dv route to free
+ */
+static void
+free_dv_route (struct DistanceVector *dv)
+{
+  struct DistanceVectorHop *dvh;
+
+  while (NULL != (dvh = dv->dv_head))
+    free_distance_vector_hop (dvh);
+  if (NULL == dv->dv_head)
+  {
+    struct VirtualLink *vl;
+
+    GNUNET_assert (
+      GNUNET_YES ==
+      GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
+    if (NULL != (vl = dv->vl))
+    {
+      GNUNET_assert (dv == vl->dv);
+      vl->dv = NULL;
+      if (NULL == vl->n)
+      {
+        cores_send_disconnect_info (&dv->target);
+        free_virtual_link (vl);
+      }
+      else
+      {
+        GNUNET_SCHEDULER_cancel (vl->visibility_task);
+        vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+      }
+      dv->vl = NULL;
+    }
+
+    if (NULL != dv->timeout_task)
+    {
+      GNUNET_SCHEDULER_cancel (dv->timeout_task);
+      dv->timeout_task = NULL;
+    }
+    GNUNET_free (dv);
   }
 }
 
@@ -2906,7 +3290,7 @@ reassembly_cleanup_task (void *cls)
   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
   {
     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)
-               .rel_value_us)
+        .rel_value_us)
     {
       free_reassembly_context (rc);
       continue;
@@ -2950,6 +3334,7 @@ static void
 free_neighbour (struct Neighbour *neighbour)
 {
   struct DistanceVectorHop *dvh;
+  struct VirtualLink *vl;
 
   GNUNET_assert (NULL == neighbour->queue_head);
   GNUNET_assert (GNUNET_YES ==
@@ -2989,6 +3374,22 @@ free_neighbour (struct Neighbour *neighbour)
     GNUNET_PEERSTORE_store_cancel (neighbour->sc);
     neighbour->sc = NULL;
   }
+  if (NULL != (vl = neighbour->vl))
+  {
+    GNUNET_assert (neighbour == vl->n);
+    vl->n = NULL;
+    if (NULL == vl->dv)
+    {
+      cores_send_disconnect_info (&vl->target);
+      free_virtual_link (vl);
+    }
+    else
+    {
+      GNUNET_SCHEDULER_cancel (vl->visibility_task);
+      vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+    }
+    neighbour->vl = NULL;
+  }
   GNUNET_free (neighbour);
 }
 
@@ -3033,31 +3434,6 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
 }
 
 
-/**
- * Send message to CORE clients that we lost a connection.
- *
- * @param pid peer the connection was for
- */
-static void
-cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Informing CORE clients about disconnect from %s\n",
-              GNUNET_i2s (pid));
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
-  {
-    struct GNUNET_MQ_Envelope *env;
-    struct DisconnectInfoMessage *dim;
-
-    if (CT_CORE != tc->type)
-      continue;
-    env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
-    dim->peer = *pid;
-    GNUNET_MQ_send (tc->mq, env);
-  }
-}
-
-
 /**
  * We believe we are ready to transmit a message on a queue. Gives the
  * message to the communicator for transmission (updating the tracker,
@@ -3070,19 +3446,15 @@ transmit_on_queue (void *cls);
 
 
 /**
- * Schedule next run of #transmit_on_queue().  Does NOTHING if
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards!  This function must only
- * be called if the message queue is non-empty!
+ * Called whenever something changed that might effect when we
+ * try to do the next transmission on @a queue using #transmit_on_queue().
  *
  * @param queue the queue to do scheduling for
- * @param inside_job set to #GNUNET_YES if called from
- *            #transmit_on_queue() itself and NOT setting
- *            the task means running immediately
+ * @param p task priority to use, if @a queue is scheduled
  */
 static void
-schedule_transmit_on_queue (struct Queue *queue, int inside_job)
+schedule_transmit_on_queue (struct Queue *queue,
+                            enum GNUNET_SCHEDULER_Priority p)
 {
   if (queue->tc->details.communicator.total_queue_length >=
       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
@@ -3092,6 +3464,7 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
       "# Transmission throttled due to communicator queue limit",
       1,
       GNUNET_NO);
+    queue->idle = GNUNET_NO;
     return;
   }
   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
@@ -3100,38 +3473,18 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
                               "# Transmission throttled due to queue queue limit",
                               1,
                               GNUNET_NO);
+    queue->idle = GNUNET_NO;
     return;
   }
-#if FIXME - NEXT
-  struct Neighbour *n = queue->neighbour;
-  struct GNUNET_TIME_Relative out_delay;
-
-  if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
-  {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Schedule transmission <%llu> on queue %llu of %s decides to run immediately\n",
-      pm->logging_uuid,
-      (unsigned long long) queue->qid,
-      GNUNET_i2s (&n->pid));
-    return; /* we should run immediately! */
-  }
-  /* queue has changed since we were scheduled, reschedule again */
+  /* queue might indeed be ready, schedule it */
+  if (NULL != queue->transmit_task)
+    GNUNET_SCHEDULER_cancel (queue->transmit_task);
   queue->transmit_task =
-    GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
-  if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Next transmission <%llu> on queue `%s' in %s (high delay)\n",
-                pm->logging_uuid,
-                queue->address,
-                GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Next transmission <%llu> on queue `%s' in %s\n",
-                pm->logging_uuid,
-                queue->address,
-                GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
-#endif
+    GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Considering transmission on queue `%s' to %s\n",
+              queue->address,
+              GNUNET_i2s (&queue->neighbour->pid));
 }
 
 
@@ -3156,15 +3509,21 @@ check_link_down (void *cls)
        pos = pos->next_dv)
     dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
   if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+  {
+    vl->dv->vl = NULL;
     vl->dv = NULL;
+  }
   q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
   for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
     q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
-  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+  {
+    vl->n->vl = NULL;
     vl->n = NULL;
+  }
   if ((NULL == vl->n) && (NULL == vl->dv))
   {
-    cores_send_disconnect_info (&dv->target);
+    cores_send_disconnect_info (&vl->target);
     free_virtual_link (vl);
     return;
   }
@@ -3185,8 +3544,8 @@ free_queue (struct Queue *queue)
 {
   struct Neighbour *neighbour = queue->neighbour;
   struct TransportClient *tc = queue->tc;
-  struct MonitorEvent me = {.cs = GNUNET_TRANSPORT_CS_DOWN,
-                            .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
+  struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
+                             .rtt = GNUNET_TIME_UNIT_FOREVER_REL };
   struct QueueEntry *qe;
   int maxxed;
   struct PendingAcknowledgement *pa;
@@ -3229,7 +3588,7 @@ free_queue (struct Queue *queue)
   if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
                    tc->details.communicator.total_queue_length))
   {
-    /* Communicator dropped below threshold, resume all queues */
+    /* Communicator dropped below threshold, resume all _other_ queues */
     GNUNET_STATISTICS_update (
       GST_stats,
       "# Transmission throttled due to communicator queue limit",
@@ -3237,12 +3596,12 @@ free_queue (struct Queue *queue)
       GNUNET_NO);
     for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
          s = s->next_client)
-      schedule_transmit_on_queue (s, GNUNET_NO);
+      schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   GNUNET_free (queue);
 
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+  vl = lookup_virtual_link (&neighbour->pid);
   if ((NULL != vl) && (neighbour == vl->n))
   {
     GNUNET_SCHEDULER_cancel (vl->visibility_task);
@@ -3335,32 +3694,36 @@ client_disconnect_cb (void *cls,
   {
   case CT_NONE:
     break;
+
   case CT_CORE: {
-    struct PendingMessage *pm;
+      struct PendingMessage *pm;
 
-    while (NULL != (pm = tc->details.core.pending_msg_head))
-    {
-      GNUNET_CONTAINER_MDLL_remove (client,
-                                    tc->details.core.pending_msg_head,
-                                    tc->details.core.pending_msg_tail,
-                                    pm);
-      pm->client = NULL;
+      while (NULL != (pm = tc->details.core.pending_msg_head))
+      {
+        GNUNET_CONTAINER_MDLL_remove (client,
+                                      tc->details.core.pending_msg_head,
+                                      tc->details.core.pending_msg_tail,
+                                      pm);
+        pm->client = NULL;
+      }
     }
-  }
-  break;
+    break;
+
   case CT_MONITOR:
     break;
+
   case CT_COMMUNICATOR: {
-    struct Queue *q;
-    struct AddressListEntry *ale;
+      struct Queue *q;
+      struct AddressListEntry *ale;
+
+      while (NULL != (q = tc->details.communicator.queue_head))
+        free_queue (q);
+      while (NULL != (ale = tc->details.communicator.addr_head))
+        free_address_list_entry (ale);
+      GNUNET_free (tc->details.communicator.address_prefix);
+    }
+    break;
 
-    while (NULL != (q = tc->details.communicator.queue_head))
-      free_queue (q);
-    while (NULL != (ale = tc->details.communicator.addr_head))
-      free_address_list_entry (ale);
-    GNUNET_free (tc->details.communicator.address_prefix);
-  }
-  break;
   case CT_APPLICATION:
     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
                                            &stop_peer_request,
@@ -3455,8 +3818,8 @@ check_client_send (void *cls, const struct OutboundMessage *obm)
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
-  if (size < sizeof (struct GNUNET_MessageHeader))
+  size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
+  if (size < sizeof(struct GNUNET_MessageHeader))
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
@@ -3483,11 +3846,12 @@ client_send_response (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct VirtualLink *vl = pm->vl;
-  struct GNUNET_MQ_Envelope *env;
-  struct SendOkMessage *som;
 
   if (NULL != tc)
   {
+    struct GNUNET_MQ_Envelope *env;
+    struct SendOkMessage *som;
+
     env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
     som->peer = vl->target;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3528,8 +3892,8 @@ pick_random_dv_hops (const struct DistanceVector *dv,
   {
     if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
         (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
-           .rel_value_us == 0))
-      continue; /* pos unconfirmed and confirmed required */
+         .rel_value_us == 0))
+      continue;   /* pos unconfirmed and confirmed required */
     num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
     dv_count++;
   }
@@ -3568,8 +3932,8 @@ pick_random_dv_hops (const struct DistanceVector *dv,
 
     if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
         (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
-           .rel_value_us == 0))
-      continue; /* pos unconfirmed and confirmed required */
+         .rel_value_us == 0))
+      continue;   /* pos unconfirmed and confirmed required */
     for (unsigned int i = 0; i < hops_array_length; i++)
       if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
         hops_array[dv_count++] = pos;
@@ -3579,87 +3943,6 @@ pick_random_dv_hops (const struct DistanceVector *dv,
 }
 
 
-/**
- * Client asked for transmission to a peer.  Process the request.
- *
- * @param cls the client
- * @param obm the send message that was sent
- */
-static void
-handle_client_send (void *cls, const struct OutboundMessage *obm)
-{
-  struct TransportClient *tc = cls;
-  struct PendingMessage *pm;
-  const struct GNUNET_MessageHeader *obmm;
-  uint32_t bytes_msg;
-  struct VirtualLink *vl;
-  enum GNUNET_MQ_PriorityPreferences pp;
-  int was_empty;
-
-  GNUNET_assert (CT_CORE == tc->type);
-  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
-  bytes_msg = ntohs (obmm->size);
-  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
-  if (NULL == vl)
-  {
-    /* Failure: don't have this peer as a neighbour (anymore).
-       Might have gone down asynchronously, so this is NOT
-       a protocol violation by CORE. Still count the event,
-       as this should be rare. */
-    GNUNET_SERVICE_client_continue (tc->client);
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# messages dropped (neighbour unknown)",
-                              1,
-                              GNUNET_NO);
-    return;
-  }
-
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
-  pm->logging_uuid = logging_uuid_gen++;
-  pm->prefs = pp;
-  pm->client = tc;
-  pm->vl = vl;
-  pm->bytes_msg = bytes_msg;
-  memcpy (&pm[1], obmm, bytes_msg);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending %u bytes as <%llu> to %s\n",
-              bytes_msg,
-              pm->logging_uuid,
-              GNUNET_i2s (&obm->peer));
-  GNUNET_CONTAINER_MDLL_insert (client,
-                                tc->details.core.pending_msg_head,
-                                tc->details.core.pending_msg_tail,
-                                pm);
-  was_empty = (NULL == vl->pending_msg_head);
-  GNUNET_CONTAINER_MDLL_insert (vl,
-                                vl->pending_msg_head,
-                                vl->pending_msg_tail,
-                                pm);
-  if (! was_empty)
-    return; /* all queues must already be busy */
-#if 0
-  // FIXME: check if any DVH or neighbour queue of 'vl'
-  // is ready for transmission now. If so, encapsulate
-  // 'pm' accordingly and send!
-  for (struct Queue *queue = target->queue_head; NULL != queue;
-       queue = queue->next_neighbour)
-  {
-    /* try transmission on any queue that is idle */
-    if (NULL == queue->transmit_task)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Queue %llu to %s is idle, triggering transmission\n",
-                  (unsigned long long) queue->qid,
-                  GNUNET_i2s (&queue->neighbour->pid));
-      queue->transmit_task =
-        GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
-    }
-  }
-#endif
-}
-
-
 /**
  * Communicator started.  Test message is well-formed.
  *
@@ -3680,7 +3963,7 @@ check_communicator_available (
     return GNUNET_SYSERR;
   }
   tc->type = CT_COMMUNICATOR;
-  size = ntohs (cam->header.size) - sizeof (*cam);
+  size = ntohs (cam->header.size) - sizeof(*cam);
   if (0 == size)
     return GNUNET_OK; /* receive-only communicator */
   GNUNET_MQ_check_zero_termination (cam);
@@ -3736,7 +4019,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+  vl = lookup_virtual_link (&rom->peer);
   if (NULL == vl)
   {
     GNUNET_STATISTICS_update (GST_stats,
@@ -3773,12 +4056,12 @@ handle_communicator_available (
   struct TransportClient *tc = cls;
   uint16_t size;
 
-  size = ntohs (cam->header.size) - sizeof (*cam);
+  size = ntohs (cam->header.size) - sizeof(*cam);
   if (0 == size)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Receive-only communicator connected\n");
-    return; /* receive-only communicator */
+    return;   /* receive-only communicator */
   }
   tc->details.communicator.address_prefix =
     GNUNET_strdup ((const char *) &cam[1]);
@@ -3809,7 +4092,7 @@ check_communicator_backchannel (
   uint16_t isize;
 
   (void) cls;
-  msize = ntohs (cb->header.size) - sizeof (*cb);
+  msize = ntohs (cb->header.size) - sizeof(*cb);
   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
   isize = ntohs (inbox->size);
   if (isize >= msize)
@@ -3851,7 +4134,7 @@ update_ephemeral (struct DistanceVector *dv)
                  GNUNET_CRYPTO_ecdhe_key_create2 (&dv->private_key));
   GNUNET_CRYPTO_ecdhe_key_get_public (&dv->private_key, &dv->ephemeral_key);
   ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
-  ec.purpose.size = htonl (sizeof (ec));
+  ec.purpose.size = htonl (sizeof(ec));
   ec.target = dv->target;
   ec.ephemeral_key = dv->ephemeral_key;
   GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
@@ -3861,7 +4144,7 @@ update_ephemeral (struct DistanceVector *dv)
 
 
 /**
- * Send the control message @a payload on @a queue.
+ * Send the message @a payload on @a queue.
  *
  * @param queue the queue to use for transmission
  * @param pm pending message to update once transmission is done, may be NULL!
@@ -3879,11 +4162,12 @@ queue_send_msg (struct Queue *queue,
   struct GNUNET_TRANSPORT_SendMessageTo *smt;
   struct GNUNET_MQ_Envelope *env;
 
+  queue->idle = GNUNET_NO;
   GNUNET_log (
     GNUNET_ERROR_TYPE_DEBUG,
     "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
     (unsigned int) payload_size,
-    pm->logging_uuid,
+    (NULL == pm) ? 0 : pm->logging_uuid,
     (unsigned long long) queue->qid,
     GNUNET_i2s (&queue->neighbour->pid));
   env = GNUNET_MQ_msg_extra (smt,
@@ -3910,6 +4194,11 @@ queue_send_msg (struct Queue *queue,
     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
     queue->queue_length++;
     queue->tc->details.communicator.total_queue_length++;
+    if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
+        queue->tc->details.communicator.total_queue_length)
+      queue->idle = GNUNET_NO;
+    if (QUEUE_LENGTH_LIMIT == queue->queue_length)
+      queue->idle = GNUNET_NO;
     GNUNET_MQ_send (queue->tc->mq, env);
   }
 }
@@ -3923,8 +4212,9 @@ queue_send_msg (struct Queue *queue,
  * @param hdr message to send as payload
  * @param options whether queues must be confirmed or not,
  *        and whether we may pick multiple (2) queues
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
  */
-static void
+static struct GNUNET_TIME_Relative
 route_via_neighbour (const struct Neighbour *n,
                      const struct GNUNET_MessageHeader *hdr,
                      enum RouteMessageOptions options)
@@ -3933,6 +4223,7 @@ route_via_neighbour (const struct Neighbour *n,
   unsigned int candidates;
   unsigned int sel1;
   unsigned int sel2;
+  struct GNUNET_TIME_Relative rtt;
 
   /* Pick one or two 'random' queues from n (under constraints of options) */
   now = GNUNET_TIME_absolute_get ();
@@ -3960,9 +4251,10 @@ route_via_neighbour (const struct Neighbour *n,
                               "# route selection failed (all no valid queue)",
                               1,
                               GNUNET_NO);
-    return;
+    return GNUNET_TIME_UNIT_FOREVER_REL;
   }
 
+  rtt = GNUNET_TIME_UNIT_FOREVER_REL;
   sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
   if (0 == (options & RMO_REDUNDANT))
     sel2 = candidates; /* picks none! */
@@ -3983,11 +4275,13 @@ route_via_neighbour (const struct Neighbour *n,
                     GNUNET_i2s (&n->pid),
                     pos->address,
                     (sel1 == candidates) ? 1 : 2);
+        rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
         queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
       }
       candidates++;
     }
   }
+  return rtt;
 }
 
 
@@ -4006,7 +4300,6 @@ struct DVKeyState
    */
   struct
   {
-
     /**
      * Key used for HMAC calculations (via #GNUNET_CRYPTO_hmac()).
      */
@@ -4021,7 +4314,6 @@ struct DVKeyState
      * Counter value to use during setup.
      */
     char aes_ctr[128 / 8];
-
   } material;
 };
 
@@ -4042,13 +4334,13 @@ dv_setup_key_state_from_km (const struct GNUNET_HashCode *km,
   /* must match #dh_key_derive_eph_pub */
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CRYPTO_kdf (&key->material,
-                                    sizeof (key->material),
+                                    sizeof(key->material),
                                     "transport-backchannel-key",
                                     strlen ("transport-backchannel-key"),
                                     &km,
-                                    sizeof (km),
+                                    sizeof(km),
                                     iv,
-                                    sizeof (*iv)));
+                                    sizeof(*iv)));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Deriving backchannel key based on KM %s and IV %s\n",
               GNUNET_h2s (km),
@@ -4059,10 +4351,10 @@ dv_setup_key_state_from_km (const struct GNUNET_HashCode *km,
                     0 /* flags */);
   gcry_cipher_setkey (key->cipher,
                       &key->material.aes_key,
-                      sizeof (key->material.aes_key));
+                      sizeof(key->material.aes_key));
   gcry_cipher_setctr (key->cipher,
                       &key->material.aes_ctr,
-                      sizeof (key->material.aes_ctr));
+                      sizeof(key->material.aes_ctr));
 }
 
 
@@ -4179,7 +4471,7 @@ static void
 dv_key_clean (struct DVKeyState *key)
 {
   gcry_cipher_close (key->cipher);
-  GNUNET_CRYPTO_zero_keys (&key->material, sizeof (key->material));
+  GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
 }
 
 
@@ -4209,8 +4501,9 @@ typedef void (*DVMessageHandler) (void *cls,
  * @param use function to call with the encapsulated message
  * @param use_cls closure for @a use
  * @param options whether path must be confirmed or not, to be passed to @a use
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
  */
-static void
+static struct GNUNET_TIME_Relative
 encapsulate_for_dv (struct DistanceVector *dv,
                     unsigned int num_dvhs,
                     struct DistanceVectorHop **dvhs,
@@ -4222,10 +4515,11 @@ encapsulate_for_dv (struct DistanceVector *dv,
   struct TransportDVBoxMessage box_hdr;
   struct TransportDVBoxPayloadP payload_hdr;
   uint16_t enc_body_size = ntohs (hdr->size);
-  char enc[sizeof (struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
+  char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
   struct TransportDVBoxPayloadP *enc_payload_hdr =
     (struct TransportDVBoxPayloadP *) enc;
   struct DVKeyState key;
+  struct GNUNET_TIME_Relative rtt;
 
   /* Encrypt payload */
   box_hdr.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
@@ -4235,48 +4529,48 @@ encapsulate_for_dv (struct DistanceVector *dv,
   payload_hdr.sender_sig = dv->sender_sig;
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &box_hdr.iv,
-                              sizeof (box_hdr.iv));
+                              sizeof(box_hdr.iv));
   dh_key_derive_eph_pid (&dv->private_key, &dv->target, &box_hdr.iv, &key);
   payload_hdr.sender = GST_my_identity;
   payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
-  dv_encrypt (&key, &payload_hdr, enc_payload_hdr, sizeof (payload_hdr));
+  dv_encrypt (&key, &payload_hdr, enc_payload_hdr, sizeof(payload_hdr));
   dv_encrypt (&key,
               hdr,
-              &enc[sizeof (struct TransportDVBoxPayloadP)],
+              &enc[sizeof(struct TransportDVBoxPayloadP)],
               enc_body_size);
-  dv_hmac (&key, &box_hdr.hmac, enc, sizeof (enc));
+  dv_hmac (&key, &box_hdr.hmac, enc, sizeof(enc));
   dv_key_clean (&key);
-
+  rtt = GNUNET_TIME_UNIT_FOREVER_REL;
   /* For each selected path, take the pre-computed header and body
      and add the path in the middle of the message; then send it. */
   for (unsigned int i = 0; i < num_dvhs; i++)
   {
     struct DistanceVectorHop *dvh = dvhs[i];
     unsigned int num_hops = dvh->distance + 1;
-    char buf[sizeof (struct TransportDVBoxMessage) +
-             sizeof (struct GNUNET_PeerIdentity) * num_hops +
-             sizeof (struct TransportDVBoxPayloadP) +
-             enc_body_size] GNUNET_ALIGN;
+    char buf[sizeof(struct TransportDVBoxMessage)
+             + sizeof(struct GNUNET_PeerIdentity) * num_hops
+             + sizeof(struct TransportDVBoxPayloadP)
+             enc_body_size] GNUNET_ALIGN;
     struct GNUNET_PeerIdentity *dhops;
 
-    box_hdr.header.size = htons (sizeof (buf));
+    box_hdr.header.size = htons (sizeof(buf));
     box_hdr.num_hops = htons (num_hops);
-    memcpy (buf, &box_hdr, sizeof (box_hdr));
-    dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof (box_hdr)];
+    memcpy (buf, &box_hdr, sizeof(box_hdr));
+    dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
     memcpy (dhops,
             dvh->path,
-            dvh->distance * sizeof (struct GNUNET_PeerIdentity));
+            dvh->distance * sizeof(struct GNUNET_PeerIdentity));
     dhops[dvh->distance] = dv->target;
     if (GNUNET_EXTRA_LOGGING > 0)
     {
       char *path;
 
       path = GNUNET_strdup (GNUNET_i2s (&GST_my_identity));
-      for (unsigned int i = 0; i <= num_hops; i++)
+      for (unsigned int j = 0; j <= num_hops; j++)
       {
         char *tmp;
 
-        GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
+        GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
         GNUNET_free (path);
         path = tmp;
       }
@@ -4289,13 +4583,14 @@ encapsulate_for_dv (struct DistanceVector *dv,
                   path);
       GNUNET_free (path);
     }
-
-    memcpy (&dhops[num_hops], enc, sizeof (enc));
+    rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
+    memcpy (&dhops[num_hops], enc, sizeof(enc));
     use (use_cls,
          dvh->next_hop,
          (const struct GNUNET_MessageHeader *) buf,
          options);
   }
+  return rtt;
 }
 
 
@@ -4315,28 +4610,34 @@ send_dv_to_neighbour (void *cls,
                       enum RouteMessageOptions options)
 {
   (void) cls;
-  route_via_neighbour (next_hop, hdr, options);
+  (void) route_via_neighbour (next_hop, hdr, options);
 }
 
 
 /**
  * We need to transmit @a hdr to @a target.  If necessary, this may
- * involve DV routing.
+ * involve DV routing.  This function routes without applying flow
+ * control or congestion control and should only be used for control
+ * traffic.
  *
  * @param target peer to receive @a hdr
  * @param hdr header of the message to route and #GNUNET_free()
  * @param options which transmission channels are allowed
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
  */
-static void
-route_message (const struct GNUNET_PeerIdentity *target,
-               const struct GNUNET_MessageHeader *hdr,
-               enum RouteMessageOptions options)
+static struct GNUNET_TIME_Relative
+route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
+                                  const struct GNUNET_MessageHeader *hdr,
+                                  enum RouteMessageOptions options)
 {
   struct VirtualLink *vl;
   struct Neighbour *n;
   struct DistanceVector *dv;
+  struct GNUNET_TIME_Relative rtt1;
+  struct GNUNET_TIME_Relative rtt2;
 
-  vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+  vl = lookup_virtual_link (target);
+  GNUNET_assert (NULL != vl);
   n = vl->n;
   dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
   if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
@@ -4358,7 +4659,7 @@ route_message (const struct GNUNET_PeerIdentity *target,
                               "# Messages dropped in routing: no acceptable method",
                               1,
                               GNUNET_NO);
-    return;
+    return GNUNET_TIME_UNIT_FOREVER_REL;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Routing message of type %u to %s with options %X\n",
@@ -4377,9 +4678,11 @@ route_message (const struct GNUNET_PeerIdentity *target,
   if ((NULL != n) && (NULL != dv))
     options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
                                   enough for redunancy, so clear the flag. */
+  rtt1 = GNUNET_TIME_UNIT_FOREVER_REL;
+  rtt2 = GNUNET_TIME_UNIT_FOREVER_REL;
   if (NULL != n)
   {
-    route_via_neighbour (n, hdr, options);
+    rtt1 = route_via_neighbour (n, hdr, options);
   }
   if (NULL != dv)
   {
@@ -4394,82 +4697,287 @@ route_message (const struct GNUNET_PeerIdentity *target,
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Failed to route message, could not determine DV path\n");
-      return;
+      return rtt1;
     }
-    encapsulate_for_dv (dv,
-                        res,
-                        hops,
-                        hdr,
-                        &send_dv_to_neighbour,
-                        NULL,
-                        options & (~RMO_REDUNDANT));
+    rtt2 = encapsulate_for_dv (dv,
+                               res,
+                               hops,
+                               hdr,
+                               &send_dv_to_neighbour,
+                               NULL,
+                               options & (~RMO_REDUNDANT));
   }
+  return GNUNET_TIME_relative_min (rtt1, rtt2);
 }
 
 
 /**
- * Communicator requests backchannel transmission.  Process the request.
- * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *`
- * (which for now has exactly the same format, only a different message type)
- * and passes it on for routing.
+ * Something changed on the virtual link with respect to flow
+ * control. Consider retransmitting the FC window size.
  *
- * @param cls the client
- * @param cb the send message that was sent
+ * @param cls a `struct VirtualLink` to work with
  */
 static void
-handle_communicator_backchannel (
-  void *cls,
-  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+consider_sending_fc (void *cls)
 {
-  struct TransportClient *tc = cls;
-  const struct GNUNET_MessageHeader *inbox =
-    (const struct GNUNET_MessageHeader *) &cb[1];
-  uint16_t isize = ntohs (inbox->size);
-  const char *is = ((const char *) &cb[1]) + isize;
-  char
-    mbuf[isize +
-         sizeof (struct TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN;
-  struct TransportBackchannelEncapsulationMessage *be =
-    (struct TransportBackchannelEncapsulationMessage *) mbuf;
+  struct VirtualLink *vl = cls;
+  struct GNUNET_TIME_Absolute monotime;
+  struct TransportFlowControlMessage fc;
+  struct GNUNET_TIME_Relative duration;
+  struct GNUNET_TIME_Relative rtt;
+
+  duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission);
+  /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
+     it always! */
+  /* For example, we should probably ONLY do this if a bit more than
+     an RTT has passed, or if the window changed "significantly" since
+     then. See vl->last_fc_rtt! NOTE: to do this properly, we also
+     need an estimate for the bandwidth-delay-product for the entire
+     VL, as that determines "significantly". We have the delay, but
+     the bandwidth statistics need to be added for the VL!*/(void) duration;
 
-  /* 0-termination of 'is' was checked already in
-     #check_communicator_backchannel() */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Preparing backchannel transmission to %s:%s of type %u\n",
-              GNUNET_i2s (&cb->pid),
-              is,
-              ntohs (inbox->size));
-  /* encapsulate and encrypt message */
-  be->header.type =
-    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
-  be->header.size = htons (sizeof (mbuf));
-  memcpy (&be[1], inbox, isize);
-  memcpy (&mbuf[sizeof (struct TransportBackchannelEncapsulationMessage) +
-                isize],
-          is,
-          strlen (is) + 1);
-  route_message (&cb->pid, &be->header, RMO_DV_ALLOWED);
-  GNUNET_SERVICE_client_continue (tc->client);
+              "Sending FC seq %u to %s with new window %llu\n",
+              (unsigned int) vl->fc_seq_gen,
+              GNUNET_i2s (&vl->target),
+              (unsigned long long) vl->incoming_fc_window_size);
+  monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+  vl->last_fc_transmission = monotime;
+  fc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL);
+  fc.header.size = htons (sizeof(fc));
+  fc.seq = htonl (vl->fc_seq_gen++);
+  fc.inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size);
+  fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
+  fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
+  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
+  rtt = route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE);
+  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
+  {
+    rtt = GNUNET_TIME_UNIT_SECONDS;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "FC retransmission to %s failed, will retry in %s\n",
+                GNUNET_i2s (&vl->target),
+                GNUNET_STRINGS_relative_time_to_string (rtt, GNUNET_YES));
+    vl->last_fc_rtt = GNUNET_TIME_UNIT_ZERO;
+  }
+  else
+  {
+    /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
+    vl->last_fc_rtt = rtt;
+  }
+  if (NULL != vl->fc_retransmit_task)
+    GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+  vl->fc_retransmit_task =
+    GNUNET_SCHEDULER_add_delayed (rtt, &consider_sending_fc, vl);
 }
 
 
 /**
- * Address of our peer added.  Test message is well-formed.
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
  *
- * @param cls the client
- * @param aam the send message that was sent
- * @return #GNUNET_OK if message is well-formed
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
  */
-static int
-check_add_address (void *cls,
-                   const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
+static void
+check_vl_transmission (struct VirtualLink *vl)
 {
-  struct TransportClient *tc = cls;
-
-  if (CT_COMMUNICATOR != tc->type)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+  struct Neighbour *n = vl->n;
+  struct DistanceVector *dv = vl->dv;
+  struct GNUNET_TIME_Absolute now;
+  int elig;
+
+  /* Check that we have an eligible pending message!
+     (cheaper than having #transmit_on_queue() find out!) */
+  elig = GNUNET_NO;
+  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+       pm = pm->next_vl)
+  {
+    if (NULL != pm->qe)
+      continue;   /* not eligible, is in a queue! */
+    if (pm->bytes_msg + vl->outbound_fc_window_size_used >
+        vl->outbound_fc_window_size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Stalled transmision on VL %s due to flow control: %llu < %llu\n",
+                  GNUNET_i2s (&vl->target),
+                  (unsigned long long) vl->outbound_fc_window_size,
+                  (unsigned long long) (pm->bytes_msg
+                                        + vl->outbound_fc_window_size_used));
+      consider_sending_fc (vl);
+      return;     /* We have a message, but flow control says "nope" */
+    }
+    elig = GNUNET_YES;
+    break;
+  }
+  if (GNUNET_NO == elig)
+    return;
+
+  /* Notify queues at direct neighbours that we are interested */
+  now = GNUNET_TIME_absolute_get ();
+  if (NULL != n)
+  {
+    for (struct Queue *queue = n->queue_head; NULL != queue;
+         queue = queue->next_neighbour)
+      if ((GNUNET_YES == queue->idle) &&
+          (queue->validated_until.abs_value_us > now.abs_value_us))
+        schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+  }
+  /* Notify queues via DV that we are interested */
+  if (NULL != dv)
+  {
+    /* Do DV with lower scheduler priority, which effectively means that
+       IF a neighbour exists and is available, we prefer it. */
+    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+         pos = pos->next_dv)
+    {
+      struct Neighbour *nh = pos->next_hop;
+
+      if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+        continue;     /* skip this one: path not validated */
+      for (struct Queue *queue = nh->queue_head; NULL != queue;
+           queue = queue->next_neighbour)
+        if ((GNUNET_YES == queue->idle) &&
+            (queue->validated_until.abs_value_us > now.abs_value_us))
+          schedule_transmit_on_queue (queue,
+                                      GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+    }
+  }
+}
+
+
+/**
+ * Client asked for transmission to a peer.  Process the request.
+ *
+ * @param cls the client
+ * @param obm the send message that was sent
+ */
+static void
+handle_client_send (void *cls, const struct OutboundMessage *obm)
+{
+  struct TransportClient *tc = cls;
+  struct PendingMessage *pm;
+  const struct GNUNET_MessageHeader *obmm;
+  uint32_t bytes_msg;
+  struct VirtualLink *vl;
+  enum GNUNET_MQ_PriorityPreferences pp;
+
+  GNUNET_assert (CT_CORE == tc->type);
+  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+  bytes_msg = ntohs (obmm->size);
+  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
+  vl = lookup_virtual_link (&obm->peer);
+  if (NULL == vl)
+  {
+    /* Failure: don't have this peer as a neighbour (anymore).
+       Might have gone down asynchronously, so this is NOT
+       a protocol violation by CORE. Still count the event,
+       as this should be rare. */
+    GNUNET_SERVICE_client_continue (tc->client);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# messages dropped (neighbour unknown)",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+
+  pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
+  pm->logging_uuid = logging_uuid_gen++;
+  pm->prefs = pp;
+  pm->client = tc;
+  pm->vl = vl;
+  pm->bytes_msg = bytes_msg;
+  memcpy (&pm[1], obmm, bytes_msg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending %u bytes as <%llu> to %s\n",
+              bytes_msg,
+              pm->logging_uuid,
+              GNUNET_i2s (&obm->peer));
+  GNUNET_CONTAINER_MDLL_insert (client,
+                                tc->details.core.pending_msg_head,
+                                tc->details.core.pending_msg_tail,
+                                pm);
+  GNUNET_CONTAINER_MDLL_insert (vl,
+                                vl->pending_msg_head,
+                                vl->pending_msg_tail,
+                                pm);
+  check_vl_transmission (vl);
+}
+
+
+/**
+ * Communicator requests backchannel transmission.  Process the request.
+ * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *`
+ * (which for now has exactly the same format, only a different message type)
+ * and passes it on for routing.
+ *
+ * @param cls the client
+ * @param cb the send message that was sent
+ */
+static void
+handle_communicator_backchannel (
+  void *cls,
+  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+{
+  struct TransportClient *tc = cls;
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &cb[1];
+  uint16_t isize = ntohs (inbox->size);
+  const char *is = ((const char *) &cb[1]) + isize;
+  char
+    mbuf[isize
+         + sizeof(struct
+                  TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN;
+  struct TransportBackchannelEncapsulationMessage *be =
+    (struct TransportBackchannelEncapsulationMessage *) mbuf;
+
+  /* 0-termination of 'is' was checked already in
+   #check_communicator_backchannel() */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Preparing backchannel transmission to %s:%s of type %u\n",
+              GNUNET_i2s (&cb->pid),
+              is,
+              ntohs (inbox->size));
+  /* encapsulate and encrypt message */
+  be->header.type =
+    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
+  be->header.size = htons (sizeof(mbuf));
+  memcpy (&be[1], inbox, isize);
+  memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
+                + isize],
+          is,
+          strlen (is) + 1);
+  route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
+  GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
+/**
+ * Address of our peer added.  Test message is well-formed.
+ *
+ * @param cls the client
+ * @param aam the send message that was sent
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_add_address (void *cls,
+                   const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
+{
+  struct TransportClient *tc = cls;
+
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   GNUNET_MQ_check_zero_termination (aam);
   return GNUNET_OK;
@@ -4580,8 +5088,8 @@ handle_add_address (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Communicator added address `%s'!\n",
               (const char *) &aam[1]);
-  slen = ntohs (aam->header.size) - sizeof (*aam);
-  ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
+  slen = ntohs (aam->header.size) - sizeof(*aam);
+  ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
   ale->tc = tc;
   ale->address = (const char *) &ale[1];
   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
@@ -4607,6 +5115,7 @@ handle_del_address (void *cls,
                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
 {
   struct TransportClient *tc = cls;
+  struct AddressListEntry *alen;
 
   if (CT_COMMUNICATOR != tc->type)
   {
@@ -4616,8 +5125,9 @@ handle_del_address (void *cls,
   }
   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
        NULL != ale;
-       ale = ale->next)
+       ale = alen)
   {
+    alen = ale->next;
     if (dam->aid != ale->aid)
       continue;
     GNUNET_assert (ale->tc == tc);
@@ -4644,6 +5154,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg);
 
 
+/**
+ * Function called when we are done giving a message of a certain
+ * size to CORE and should thus decrement the number of bytes of
+ * RAM reserved for that peer's MQ.
+ *
+ * @param cls a `struct CoreSentContext`
+ */
+static void
+core_env_sent_cb (void *cls)
+{
+  struct CoreSentContext *ctx = cls;
+  struct VirtualLink *vl = ctx->vl;
+
+  if (NULL == vl)
+  {
+    /* lost the link in the meantime, ignore */
+    GNUNET_free (ctx);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx);
+  GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
+  vl->incoming_fc_window_size_ram -= ctx->size;
+  vl->incoming_fc_window_size_used += ctx->isize;
+  consider_sending_fc (vl);
+  GNUNET_free (ctx);
+}
+
+
 /**
  * Communicator gave us an unencapsulated message to pass as-is to
  * CORE.  Process the request.
@@ -4660,8 +5198,8 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
   uint16_t size = ntohs (mh->size);
   int have_core;
 
-  if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
-      (size < sizeof (struct GNUNET_MessageHeader)))
+  if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
+      (size < sizeof(struct GNUNET_MessageHeader)))
   {
     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
 
@@ -4670,7 +5208,7 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     GNUNET_SERVICE_client_drop (client);
     return;
   }
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+  vl = lookup_virtual_link (&cmc->im.sender);
   if (NULL == vl)
   {
     /* FIXME: sender is giving us messages for CORE but we don't have
@@ -4679,8 +5217,7 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
        we pass this on, CORE would be confused (link down, messages
        arrive).  We should investigate more if this happens often,
        or in a persistent manner, and possibly do "something" about
-       it. Thus logging as error for now. */
-    GNUNET_break_op (0);
+       it. Thus logging as error for now. */GNUNET_break_op (0);
     GNUNET_STATISTICS_update (GST_stats,
                               "# CORE messages droped (virtual link still down)",
                               1,
@@ -4689,30 +5226,65 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     finish_cmc_handling (cmc);
     return;
   }
+  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (FC arithmetic overflow)",
+                              1,
+                              GNUNET_NO);
+
+    finish_cmc_handling (cmc);
+    return;
+  }
+  if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (FC window overflow)",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+
   /* Forward to all CORE clients */
   have_core = GNUNET_NO;
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
     struct GNUNET_MQ_Envelope *env;
     struct InboundMessage *im;
+    struct CoreSentContext *ctx;
 
     if (CT_CORE != tc->type)
       continue;
-    have_core = GNUNET_YES;
+    vl->incoming_fc_window_size_ram += size;
     env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+    ctx = GNUNET_new (struct CoreSentContext);
+    ctx->vl = vl;
+    ctx->size = size;
+    ctx->isize = (GNUNET_NO == have_core) ? size : 0;
+    have_core = GNUNET_YES;
+    GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx);
+    GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx);
     im->peer = cmc->im.sender;
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
+    vl->core_recv_window--;
   }
-  vl->core_recv_window--;
   if (GNUNET_NO == have_core)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Dropped message to CORE: no CORE client connected!\n");
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Delivered message from %s of type %u to CORE\n",
-                GNUNET_i2s (&cmc->im.sender),
-                ntohs (mh->type));
+    /* Nevertheless, count window as used, as it is from the
+       perspective of the other peer! */
+    vl->incoming_fc_window_size_used += size;
+    /* TODO-M1 */
+    finish_cmc_handling (cmc);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivered message from %s of type %u to CORE\n",
+              GNUNET_i2s (&cmc->im.sender),
+              ntohs (mh->type));
   if (vl->core_recv_window > 0)
   {
     finish_cmc_handling (cmc);
@@ -4735,7 +5307,7 @@ static int
 check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
 {
   uint16_t size = ntohs (fb->header.size);
-  uint16_t bsize = size - sizeof (*fb);
+  uint16_t bsize = size - sizeof(*fb);
 
   (void) cls;
   if (0 == bsize)
@@ -4785,9 +5357,9 @@ static void
 transmit_cummulative_ack_cb (void *cls)
 {
   struct AcknowledgementCummulator *ac = cls;
-  char buf[sizeof (struct TransportReliabilityAckMessage) +
-           ac->ack_counter *
-             sizeof (struct TransportCummulativeAckPayloadP)] GNUNET_ALIGN;
+  char buf[sizeof(struct TransportReliabilityAckMessage)
+           + ac->ack_counter
+           * sizeof(struct TransportCummulativeAckPayloadP)] GNUNET_ALIGN;
   struct TransportReliabilityAckMessage *ack =
     (struct TransportReliabilityAckMessage *) buf;
   struct TransportCummulativeAckPayloadP *ap;
@@ -4800,8 +5372,8 @@ transmit_cummulative_ack_cb (void *cls)
   GNUNET_assert (0 < ac->ack_counter);
   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
   ack->header.size =
-    htons (sizeof (*ack) +
-           ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+    htons (sizeof(*ack)
+           + ac->ack_counter * sizeof(struct TransportCummulativeAckPayloadP));
   ack->ack_counter = htonl (ac->ack_counter++);
   ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
   for (unsigned int i = 0; i < ac->ack_counter; i++)
@@ -4810,7 +5382,7 @@ transmit_cummulative_ack_cb (void *cls)
     ap[i].ack_delay = GNUNET_TIME_relative_hton (
       GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
   }
-  route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+  route_control_message_without_fc (&ac->target, &ack->header, RMO_DV_ALLOWED);
   ac->num_acks = 0;
   ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
                                            &destroy_ack_cummulator,
@@ -4835,7 +5407,7 @@ cummulative_ack (const struct GNUNET_PeerIdentity *pid,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Scheduling ACK %s for transmission to %s\n",
-              GNUNET_sh2s (&ack_uuid->value),
+              GNUNET_uuid2s (&ack_uuid->value),
               GNUNET_i2s (pid));
   ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
   if (NULL == ac)
@@ -4958,14 +5530,14 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
   msize = ntohs (fb->msg_size);
   fc.message_uuid = fb->msg_uuid;
   fc.rc = NULL;
-  GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
-                                                fb->msg_uuid.uuid,
-                                                &find_by_message_uuid,
-                                                &fc);
+  (void) GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
+                                                       fb->msg_uuid.uuid,
+                                                       &find_by_message_uuid,
+                                                       &fc);
   if (NULL == (rc = fc.rc))
   {
-    rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */
-                        (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
+    rc = GNUNET_malloc (sizeof(*rc) + msize    /* reassembly payload buffer */
+                        + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
     rc->msg_uuid = fb->msg_uuid;
     rc->neighbour = n;
     rc->msg_size = msize;
@@ -5009,7 +5581,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
   }
 
   /* reassemble */
-  fsize = ntohs (fb->header.size) - sizeof (*fb);
+  fsize = ntohs (fb->header.size) - sizeof(*fb);
   if (0 == fsize)
   {
     GNUNET_break (0);
@@ -5017,6 +5589,13 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
     return;
   }
   frag_off = ntohs (fb->frag_off);
+  if (frag_off + fsize > msize)
+  {
+    /* Fragment (plus fragment size) exceeds message size! */
+    GNUNET_break_op (0);
+    finish_cmc_handling (cmc);
+    return;
+  }
   memcpy (&target[frag_off], &fb[1], fsize);
   /* update bitfield and msg_missing */
   for (unsigned int i = frag_off; i < frag_off + fsize; i++)
@@ -5103,19 +5682,19 @@ handle_reliability_box (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received reliability box from %s with UUID %s of type %u\n",
               GNUNET_i2s (&cmc->im.sender),
-              GNUNET_sh2s (&rb->ack_uuid.value),
+              GNUNET_uuid2s (&rb->ack_uuid.value),
               (unsigned int) ntohs (inbox->type));
   rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
                                      do not really have an RTT for the
-                                     *incoming* queue (should we have
+                                   incoming* queue (should we have
                                      the sender add it to the rb message?) */
   cummulative_ack (
     &cmc->im.sender,
     &rb->ack_uuid,
     (0 == ntohl (rb->ack_countdown))
-      ? GNUNET_TIME_UNIT_ZERO_ABS
-      : GNUNET_TIME_relative_to_absolute (
-          GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
+    ? GNUNET_TIME_UNIT_ZERO_ABS
+    : GNUNET_TIME_relative_to_absolute (
+      GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
   /* continue with inner message */
   /* FIXME: check that inbox is NOT a DV Box, fragment or another
      reliability box (not allowed!) */
@@ -5209,6 +5788,58 @@ update_dvh_performance (struct DistanceVectorHop *dvh,
 }
 
 
+/**
+ * We have completed transmission of @a pm, remove it from
+ * the transmission queues (and if it is a fragment, continue
+ * up the tree as necessary).
+ *
+ * @param pm pending message that was transmitted
+ */
+static void
+completed_pending_message (struct PendingMessage *pm)
+{
+  struct PendingMessage *pos;
+
+  switch (pm->pmt)
+  {
+  case PMT_CORE:
+  case PMT_RELIABILITY_BOX:
+    /* Full message sent, we are done */
+    client_send_response (pm);
+    return;
+
+  case PMT_FRAGMENT_BOX:
+    /* Fragment sent over reliabile channel */
+    free_fragment_tree (pm);
+    pos = pm->frag_parent;
+    GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+    GNUNET_free (pm);
+    /* check if subtree is done */
+    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
+           (pos != pm))
+    {
+      pm = pos;
+      pos = pm->frag_parent;
+      GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+      GNUNET_free (pm);
+    }
+
+    /* Was this the last applicable fragmment? */
+    if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
+        (pos->frag_off == pos->bytes_msg))
+      client_send_response (pos);
+    return;
+
+  case PMT_DV_BOX:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Completed transmission of message %llu (DV Box)\n",
+                pm->logging_uuid);
+    free_pending_message (pm);
+    return;
+  }
+}
+
+
 /**
  * The @a pa was acknowledged, process the acknowledgement.
  *
@@ -5220,7 +5851,6 @@ static void
 handle_acknowledged (struct PendingAcknowledgement *pa,
                      struct GNUNET_TIME_Relative ack_delay)
 {
-  struct PendingMessage *pm = pa->pm;
   struct GNUNET_TIME_Relative delay;
 
   delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
@@ -5232,25 +5862,8 @@ handle_acknowledged (struct PendingAcknowledgement *pa,
     update_queue_performance (pa->queue, delay, pa->message_size);
   if (NULL != pa->dvh)
     update_dvh_performance (pa->dvh, delay, pa->message_size);
-  if (NULL != pm)
-  {
-    if (NULL != pm->frag_parent)
-    {
-      pm = pm->frag_parent;
-      free_fragment_tree (pa->pm);
-    }
-    while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
-    {
-      struct PendingMessage *parent = pm->frag_parent;
-
-      free_fragment_tree (pm);
-      pm = parent;
-    }
-    if (NULL != pm->head_frag)
-      pm = NULL; /* we are done, otherwise free 'pm' below */
-  }
-  if (NULL != pm)
-    free_pending_message (pm);
+  if (NULL != pa->pm)
+    completed_pending_message (pa->pm);
   free_pending_acknowledgement (pa);
 }
 
@@ -5269,15 +5882,15 @@ check_reliability_ack (void *cls,
   unsigned int n_acks;
 
   (void) cls;
-  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
-           sizeof (struct TransportCummulativeAckPayloadP);
+  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
+           / sizeof(struct TransportCummulativeAckPayloadP);
   if (0 == n_acks)
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  if ((ntohs (ra->header.size) - sizeof (*ra)) !=
-      n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+  if ((ntohs (ra->header.size) - sizeof(*ra)) !=
+      n_acks * sizeof(struct TransportCummulativeAckPayloadP))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
@@ -5299,23 +5912,22 @@ handle_reliability_ack (void *cls,
 {
   struct CommunicatorMessageContext *cmc = cls;
   const struct TransportCummulativeAckPayloadP *ack;
-  struct PendingAcknowledgement *pa;
   unsigned int n_acks;
   uint32_t ack_counter;
 
-  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
-           sizeof (struct TransportCummulativeAckPayloadP);
+  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
+           / sizeof(struct TransportCummulativeAckPayloadP);
   ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
   for (unsigned int i = 0; i < n_acks; i++)
   {
-    pa =
-      GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
+    struct PendingAcknowledgement *pa =
+      GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
     if (NULL == pa)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Received ACK from %s with UUID %s which is unknown to us!\n",
                   GNUNET_i2s (&cmc->im.sender),
-                  GNUNET_sh2s (&ack[i].ack_uuid.value));
+                  GNUNET_uuid2s (&ack[i].ack_uuid.value));
       GNUNET_STATISTICS_update (
         GST_stats,
         "# FRAGMENT_ACKS dropped, no matching pending message",
@@ -5326,12 +5938,12 @@ handle_reliability_ack (void *cls,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Received ACK from %s with UUID %s\n",
                 GNUNET_i2s (&cmc->im.sender),
-                GNUNET_sh2s (&ack[i].ack_uuid.value));
+                GNUNET_uuid2s (&ack[i].ack_uuid.value));
     handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
   }
 
   ack_counter = htonl (ra->ack_counter);
-  (void) ack_counter; /* silence compiler warning for now */
+  (void) ack_counter;  /* silence compiler warning for now */
   // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
   // (DV and/or Neighbour?)
   finish_cmc_handling (cmc);
@@ -5350,7 +5962,7 @@ check_backchannel_encapsulation (
   void *cls,
   const struct TransportBackchannelEncapsulationMessage *be)
 {
-  uint16_t size = ntohs (be->header.size) - sizeof (*be);
+  uint16_t size = ntohs (be->header.size) - sizeof(*be);
   const struct GNUNET_MessageHeader *inbox =
     (const struct GNUNET_MessageHeader *) &be[1];
   const char *is;
@@ -5476,7 +6088,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
   struct DistanceVector *dv = hop->dv;
   struct VirtualLink *vl;
 
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+  vl = lookup_virtual_link (&dv->target);
   if (NULL != vl)
   {
     /* Link was already up, remember dv is also now available and we are done */
@@ -5494,7 +6106,9 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
   vl->target = dv->target;
   vl->dv = dv;
+  dv->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
   GNUNET_break (GNUNET_YES ==
@@ -5503,6 +6117,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
                   &vl->target,
                   vl,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  consider_sending_fc (vl);
   /* We lacked a confirmed connection to the target
      before, so tell CORE about it (finally!) */
   cores_send_connect_info (&dv->target);
@@ -5635,7 +6250,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
         if (last_timeout.rel_value_us <
             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
                                            DV_PATH_DISCOVERY_FREQUENCY)
-              .rel_value_us)
+            .rel_value_us)
         {
           /* Some peer send DV learn messages too often, we are learning
              the same path faster than it would be useful; do not forward! */
@@ -5664,14 +6279,14 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Discovered new DV path to %s\n",
               GNUNET_i2s (&dv->target));
-  hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
-                       sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
+  hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
+                       + sizeof(struct GNUNET_PeerIdentity) * (path_len - 2));
   hop->next_hop = next_hop;
   hop->dv = dv;
   hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
   memcpy (&hop[1],
           &path[2],
-          sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
+          sizeof(struct GNUNET_PeerIdentity) * (path_len - 2));
   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
   hop->path_valid_until = path_valid_until;
   hop->distance = path_len - 2;
@@ -5702,7 +6317,7 @@ check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
   const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
 
   (void) cls;
-  if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
+  if (size != sizeof(*dvl) + num_hops * sizeof(struct DVPathEntryP))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
@@ -5749,8 +6364,8 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
                   struct GNUNET_TIME_Absolute in_time)
 {
   struct DVPathEntryP *dhops;
-  char buf[sizeof (struct TransportDVLearnMessage) +
-           (nhops + 1) * sizeof (struct DVPathEntryP)] GNUNET_ALIGN;
+  char buf[sizeof(struct TransportDVLearnMessage)
+           + (nhops + 1) * sizeof(struct DVPathEntryP)] GNUNET_ALIGN;
   struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
   struct GNUNET_TIME_Relative nnd;
 
@@ -5761,8 +6376,8 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
               GNUNET_i2s2 (next_hop));
   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
-  fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
-                            (nhops + 1) * sizeof (struct DVPathEntryP));
+  fwd->header.size = htons (sizeof(struct TransportDVLearnMessage)
+                            + (nhops + 1) * sizeof(struct DVPathEntryP));
   fwd->num_hops = htons (nhops + 1);
   fwd->bidirectional = htons (bi_history);
   nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
@@ -5773,22 +6388,24 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
   fwd->initiator = msg->initiator;
   fwd->challenge = msg->challenge;
   dhops = (struct DVPathEntryP *) &fwd[1];
-  GNUNET_memcpy (dhops, hops, sizeof (struct DVPathEntryP) * nhops);
+  GNUNET_memcpy (dhops, hops, sizeof(struct DVPathEntryP) * nhops);
   dhops[nhops].hop = GST_my_identity;
   {
-    struct DvHopPS dhp = {.purpose.purpose =
-                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
-                          .purpose.size = htonl (sizeof (dhp)),
-                          .pred = dhops[nhops - 1].hop,
-                          .succ = *next_hop,
-                          .challenge = msg->challenge};
+    struct DvHopPS dhp = { .purpose.purpose =
+                             htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                           .purpose.size = htonl (sizeof(dhp)),
+                           .pred = dhops[nhops - 1].hop,
+                           .succ = *next_hop,
+                           .challenge = msg->challenge };
 
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
                                              &dhp.purpose,
                                              &dhops[nhops].hop_sig));
   }
-  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+  route_control_message_without_fc (next_hop,
+                                    &fwd->header,
+                                    RMO_UNCONFIRMED_ALLOWED);
 }
 
 
@@ -5808,11 +6425,11 @@ validate_dv_initiator_signature (
   const struct ChallengeNonceP *challenge,
   const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
 {
-  struct DvInitPS ip = {.purpose.purpose = htonl (
-                          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
-                        .purpose.size = htonl (sizeof (ip)),
-                        .monotonic_time = sender_monotonic_time,
-                        .challenge = *challenge};
+  struct DvInitPS ip = { .purpose.purpose = htonl (
+                           GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+                         .purpose.size = htonl (sizeof(ip)),
+                         .monotonic_time = sender_monotonic_time,
+                         .challenge = *challenge };
 
   if (
     GNUNET_OK !=
@@ -5895,7 +6512,8 @@ dv_neighbour_selection (void *cls,
     return GNUNET_YES; /* skip initiator */
   for (unsigned int i = 0; i < nsc->nhops; i++)
     if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
-      return GNUNET_YES; /* skip peers on path */
+      return GNUNET_YES;
+  /* skip peers on path */
   nsc->num_eligible++;
   return GNUNET_YES;
 }
@@ -5923,7 +6541,8 @@ dv_neighbour_transmission (void *cls,
     return GNUNET_YES; /* skip initiator */
   for (unsigned int i = 0; i < nsc->nhops; i++)
     if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
-      return GNUNET_YES; /* skip peers on path */
+      return GNUNET_YES;
+  /* skip peers on path */
   for (unsigned int i = 0; i < nsc->num_selections; i++)
   {
     if (nsc->selections[i] == nsc->num_eligible)
@@ -6000,7 +6619,7 @@ calculate_fork_degree (unsigned int hops_taken,
   if (hops_taken >= 64)
   {
     GNUNET_break (0);
-    return 0; /* precaution given bitshift below */
+    return 0;   /* precaution given bitshift below */
   }
   for (unsigned int i = 1; i < hops_taken; i++)
   {
@@ -6065,7 +6684,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
   struct GNUNET_TIME_Absolute in_time;
   struct Neighbour *n;
 
-  nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
+  nhops = ntohs (dvl->bidirectional);  /* 0 = sender is initiator */
   bi_history = ntohs (dvl->bidirectional);
   hops = (const struct DVPathEntryP *) &dvl[1];
   if (0 == nhops)
@@ -6130,7 +6749,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
                                 &dvl->initiator,
                                 GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
                                 &dvl->monotonic_time,
-                                sizeof (dvl->monotonic_time),
+                                sizeof(dvl->monotonic_time),
                                 GNUNET_TIME_UNIT_FOREVER_ABS,
                                 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
                                 &neighbour_store_dvmono_cb,
@@ -6141,13 +6760,13 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
      If signature verification load too high, implement random drop strategy */
   for (unsigned int i = 0; i < nhops; i++)
   {
-    struct DvHopPS dhp = {.purpose.purpose =
-                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
-                          .purpose.size = htonl (sizeof (dhp)),
-                          .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
-                          .succ = (nhops - 1 == i) ? GST_my_identity
-                                                   : hops[i + 1].hop,
-                          .challenge = dvl->challenge};
+    struct DvHopPS dhp = { .purpose.purpose =
+                             htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                           .purpose.size = htonl (sizeof(dhp)),
+                           .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+                           .succ = (nhops == i + 1) ? GST_my_identity
+                                   : hops[i + 1].hop,
+                           .challenge = dvl->challenge };
 
     if (GNUNET_OK !=
         GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
@@ -6200,7 +6819,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
 
     // Need also something to lookup initiation time
     // to compute RTT! -> add RTT argument here?
-    latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
+    latency = GNUNET_TIME_UNIT_FOREVER_REL;   // FIXME: initialize properly
     // (based on dvl->challenge, we can identify time of origin!)
 
     network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
@@ -6235,14 +6854,14 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
     struct GNUNET_PeerIdentity path[nhops + 2];
 
     path[0] = GST_my_identity;
-    path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
+    path[1] = hops[nhops - 1].hop;   /* direct neighbour == predecessor! */
     for (unsigned int i = 0; i < nhops; i++)
     {
       int iret;
 
       if (0 == (bi_history & (1 << i)))
-        break; /* i-th hop not bi-directional, stop learning! */
-      if (i == nhops)
+        break;     /* i-th hop not bi-directional, stop learning! */
+      if (i == nhops - 1)
       {
         path[i + 2] = dvl->initiator;
       }
@@ -6329,7 +6948,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
                                            &dv_neighbour_selection,
                                            &nsc);
     if (0 == nsc.num_eligible)
-      return; /* done here, cannot forward to anyone else */
+      return;   /* done here, cannot forward to anyone else */
     nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
     nsc.num_selections =
       GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
@@ -6339,8 +6958,8 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
     for (unsigned int i = 0; i < nsc.num_selections; i++)
       nsc.selections[i] =
         (nsc.num_selections == n_cnt)
-          ? i /* all were selected, avoid collisions by chance */
-          : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
+        ? i   /* all were selected, avoid collisions by chance */
+        : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
     nsc.num_eligible = 0;
     GNUNET_CONTAINER_multipeermap_iterate (neighbours,
                                            &dv_neighbour_transmission,
@@ -6365,8 +6984,8 @@ check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
     (const struct GNUNET_PeerIdentity *) &dvb[1];
 
   (void) cls;
-  if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) +
-               sizeof (struct GNUNET_MessageHeader))
+  if (size < sizeof(*dvb) + num_hops * sizeof(struct GNUNET_PeerIdentity)
+      + sizeof(struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
@@ -6403,17 +7022,39 @@ forward_dv_box (struct Neighbour *next_hop,
                 const void *enc_payload,
                 uint16_t enc_payload_size)
 {
-  char buf[sizeof (struct TransportDVBoxMessage) +
-           num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size];
-  struct GNUNET_PeerIdentity *dhops =
-    (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
-
-  memcpy (buf, hdr, sizeof (*hdr));
-  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
+  struct VirtualLink *vl = next_hop->vl;
+  struct PendingMessage *pm;
+  size_t msg_size;
+  char *buf;
+  struct GNUNET_PeerIdentity *dhops;
+
+  GNUNET_assert (NULL != vl);
+  msg_size = sizeof(struct TransportDVBoxMessage)
+             + num_hops * sizeof(struct GNUNET_PeerIdentity) + enc_payload_size;
+  pm = GNUNET_malloc (sizeof(struct PendingMessage) + msg_size);
+  pm->pmt = PMT_DV_BOX;
+  pm->vl = vl;
+  pm->timeout = GNUNET_TIME_relative_to_absolute (DV_FORWARD_TIMEOUT);
+  pm->logging_uuid = logging_uuid_gen++;
+  pm->prefs = GNUNET_MQ_PRIO_BACKGROUND;
+  pm->bytes_msg = msg_size;
+  buf = (char *) &pm[1];
+  memcpy (buf, hdr, sizeof(*hdr));
+  dhops =
+    (struct GNUNET_PeerIdentity *) &buf[sizeof(struct TransportDVBoxMessage)];
+  memcpy (dhops, hops, num_hops * sizeof(struct GNUNET_PeerIdentity));
   memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
-  route_message (&next_hop->pid,
-                 (const struct GNUNET_MessageHeader *) buf,
-                 RMO_NONE);
+  GNUNET_CONTAINER_MDLL_insert (vl,
+                                vl->pending_msg_head,
+                                vl->pending_msg_tail,
+                                pm);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Created pending message %llu for DV Box with next hop %s (%u/%u)\n",
+              pm->logging_uuid,
+              GNUNET_i2s (&next_hop->pid),
+              (unsigned int) num_hops,
+              (unsigned int) total_hops);
+  check_vl_transmission (vl);
 }
 
 
@@ -6525,7 +7166,7 @@ backtalker_monotime_cb (void *cls,
     b->cmc = NULL;
     return;
   }
-  if (sizeof (*mtbe) != record->value_size)
+  if (sizeof(*mtbe) != record->value_size)
   {
     GNUNET_break (0);
     return;
@@ -6600,7 +7241,7 @@ update_backtalker_monotime (struct Backtalker *b)
                             &b->pid,
                             GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
                             &mtbe,
-                            sizeof (mtbe),
+                            sizeof(mtbe),
                             GNUNET_TIME_UNIT_FOREVER_ABS,
                             GNUNET_PEERSTORE_STOREOPTION_REPLACE,
                             &backtalker_monotime_store_cb,
@@ -6619,13 +7260,13 @@ static void
 handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
+  uint16_t size = ntohs (dvb->header.size) - sizeof(*dvb);
   uint16_t num_hops = ntohs (dvb->num_hops);
   const struct GNUNET_PeerIdentity *hops =
     (const struct GNUNET_PeerIdentity *) &dvb[1];
   const char *enc_payload = (const char *) &hops[num_hops];
   uint16_t enc_payload_size =
-    size - (num_hops * sizeof (struct GNUNET_PeerIdentity));
+    size - (num_hops * sizeof(struct GNUNET_PeerIdentity));
   struct DVKeyState key;
   struct GNUNET_HashCode hmac;
   const char *hdr;
@@ -6674,8 +7315,8 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
       forward_dv_box (n,
                       dvb,
                       ntohs (dvb->total_hops) + 1,
-                      num_hops - i - 1, /* number of hops left */
-                      &hops[i + 1], /* remaining hops */
+                      num_hops - i - 1,    /* number of hops left */
+                      &hops[i + 1],    /* remaining hops */
                       enc_payload,
                       enc_payload_size);
       GNUNET_STATISTICS_update (GST_stats,
@@ -6706,7 +7347,7 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 
   dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, &key);
   hdr = (const char *) &dvb[1];
-  hdr_len = ntohs (dvb->header.size) - sizeof (*dvb);
+  hdr_len = ntohs (dvb->header.size) - sizeof(*dvb);
   dv_hmac (&key, &hmac, hdr, hdr_len);
   if (0 != GNUNET_memcmp (&hmac, &dvb->hmac))
   {
@@ -6720,16 +7361,16 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
     struct Backtalker *b;
     struct GNUNET_TIME_Absolute monotime;
     struct TransportDVBoxPayloadP ppay;
-    char body[hdr_len - sizeof (ppay)] GNUNET_ALIGN;
+    char body[hdr_len - sizeof(ppay)] GNUNET_ALIGN;
     const struct GNUNET_MessageHeader *mh =
       (const struct GNUNET_MessageHeader *) body;
 
     GNUNET_assert (hdr_len >=
-                   sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
-    dv_decrypt (&key, &ppay, hdr, sizeof (ppay));
-    dv_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
+                   sizeof(ppay) + sizeof(struct GNUNET_MessageHeader));
+    dv_decrypt (&key, &ppay, hdr, sizeof(ppay));
+    dv_decrypt (&key, &body, &hdr[sizeof(ppay)], hdr_len - sizeof(ppay));
     dv_key_clean (&key);
-    if (ntohs (mh->size) != sizeof (body))
+    if (ntohs (mh->size) != sizeof(body))
     {
       GNUNET_break_op (0);
       finish_cmc_handling (cmc);
@@ -6742,10 +7383,12 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
       GNUNET_break_op (0);
       finish_cmc_handling (cmc);
       return;
+
     case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN:
       GNUNET_break_op (0);
       finish_cmc_handling (cmc);
       return;
+
     default:
       /* permitted, continue */
       break;
@@ -6772,15 +7415,16 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
       struct EphemeralConfirmationPS ec;
 
       ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
-      ec.purpose.size = htonl (sizeof (ec));
+      ec.purpose.size = htonl (sizeof(ec));
       ec.target = GST_my_identity;
       ec.ephemeral_key = dvb->ephemeral_key;
       if (
         GNUNET_OK !=
-        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
-                                    &ec.purpose,
-                                    &ppay.sender_sig,
-                                    &ppay.sender.public_key))
+        GNUNET_CRYPTO_eddsa_verify (
+          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
+          &ec.purpose,
+          &ppay.sender_sig,
+          &ppay.sender.public_key))
       {
         /* Signature invalid, disard! */
         GNUNET_break_op (0);
@@ -6809,10 +7453,10 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
     }
     /* setup data structure to cache signature AND check
        monotonic time with PEERSTORE before forwarding backchannel payload */
-    b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
+    b = GNUNET_malloc (sizeof(struct Backtalker) + sizeof(body));
     b->pid = ppay.sender;
-    b->body_size = sizeof (body);
-    memcpy (&b[1], body, sizeof (body));
+    b->body_size = sizeof(body);
+    memcpy (&b[1], body, sizeof(body));
     GNUNET_assert (GNUNET_YES ==
                    GNUNET_CONTAINER_multipeermap_put (
                      backtalkers,
@@ -6859,54 +7503,278 @@ check_incoming_msg (void *cls,
 
 
 /**
- * Communicator gave us a transport address validation challenge.  Process the
- * request.
- *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param tvc the message that was received
+ * Closure for #check_known_address.
  */
-static void
-handle_validation_challenge (
-  void *cls,
-  const struct TransportValidationChallengeMessage *tvc)
+struct CheckKnownAddressContext
 {
-  struct CommunicatorMessageContext *cmc = cls;
-  struct TransportValidationResponseMessage *tvr;
+  /**
+   * Set to the address we are looking for.
+   */
+  const char *address;
 
-  if (cmc->total_hops > 0)
-  {
-    /* DV routing is not allowed for validation challenges! */
-    GNUNET_break_op (0);
+  /**
+   * Set to a matching validation state, if one was found.
+   */
+  struct ValidationState *vs;
+};
+
+
+/**
+ * Test if the validation state in @a value matches the
+ * address from @a cls.
+ *
+ * @param cls a `struct CheckKnownAddressContext`
+ * @param pid unused (must match though)
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
+ */
+static int
+check_known_address (void *cls,
+                     const struct GNUNET_PeerIdentity *pid,
+                     void *value)
+{
+  struct CheckKnownAddressContext *ckac = cls;
+  struct ValidationState *vs = value;
+
+  (void) pid;
+  if (0 != strcmp (vs->address, ckac->address))
+    return GNUNET_OK;
+  ckac->vs = vs;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Task run periodically to validate some address based on #validation_heap.
+ *
+ * @param cls NULL
+ */
+static void
+validation_start_cb (void *cls);
+
+
+/**
+ * Set the time for next_challenge of @a vs to @a new_time.
+ * Updates the heap and if necessary reschedules the job.
+ *
+ * @param vs validation state to update
+ * @param new_time new time for revalidation
+ */
+static void
+update_next_challenge_time (struct ValidationState *vs,
+                            struct GNUNET_TIME_Absolute new_time)
+{
+  struct GNUNET_TIME_Relative delta;
+
+  if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
+    return; /* be lazy */
+  vs->next_challenge = new_time;
+  if (NULL == vs->hn)
+    vs->hn =
+      GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
+  else
+    GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
+  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
+      (NULL != validation_task))
+    return;
+  if (NULL != validation_task)
+    GNUNET_SCHEDULER_cancel (validation_task);
+  /* randomize a bit */
+  delta.rel_value_us =
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                              MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
+  new_time = GNUNET_TIME_absolute_add (new_time, delta);
+  validation_task =
+    GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
+}
+
+
+/**
+ * Start address validation.
+ *
+ * @param pid peer the @a address is for
+ * @param address an address to reach @a pid (presumably)
+ */
+static void
+start_address_validation (const struct GNUNET_PeerIdentity *pid,
+                          const char *address)
+{
+  struct GNUNET_TIME_Absolute now;
+  struct ValidationState *vs;
+  struct CheckKnownAddressContext ckac = { .address = address, .vs = NULL };
+
+  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                                     pid,
+                                                     &check_known_address,
+                                                     &ckac);
+  if (NULL != (vs = ckac.vs))
+  {
+    /* if 'vs' is not currently valid, we need to speed up retrying the
+     * validation */
+    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
+    {
+      /* reduce backoff as we got a fresh advertisement */
+      vs->challenge_backoff =
+        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
+                                  GNUNET_TIME_relative_divide (
+                                    vs->challenge_backoff,
+                                    2));
+      update_next_challenge_time (vs,
+                                  GNUNET_TIME_relative_to_absolute (
+                                    vs->challenge_backoff));
+    }
+    return;
+  }
+  now = GNUNET_TIME_absolute_get ();
+  vs = GNUNET_new (struct ValidationState);
+  vs->pid = *pid;
+  vs->valid_until =
+    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
+  vs->first_challenge_use = now;
+  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                              &vs->challenge,
+                              sizeof(vs->challenge));
+  vs->address = GNUNET_strdup (address);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Starting address validation `%s' of peer %s using challenge %s\n",
+              address,
+              GNUNET_i2s (pid),
+              GNUNET_sh2s (&vs->challenge.value));
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_put (
+                   validation_map,
+                   &vs->pid,
+                   vs,
+                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  update_next_challenge_time (vs, now);
+}
+
+
+/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure, a `struct IncomingRequest`
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ */
+static void
+handle_hello_for_incoming (void *cls,
+                           const struct GNUNET_PEERSTORE_Record *record,
+                           const char *emsg)
+{
+  struct IncomingRequest *ir = cls;
+  const char *val;
+
+  if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Got failure from PEERSTORE: %s\n",
+                emsg);
+    return;
+  }
+  val = record->value;
+  if ((0 == record->value_size) || ('\0' != val[record->value_size - 1]))
+  {
+    GNUNET_break (0);
+    return;
+  }
+  start_address_validation (&ir->pid, (const char *) record->value);
+}
+
+
+/**
+ * Communicator gave us a transport address validation challenge.  Process the
+ * request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param tvc the message that was received
+ */
+static void
+handle_validation_challenge (
+  void *cls,
+  const struct TransportValidationChallengeMessage *tvc)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  struct TransportValidationResponseMessage tvr;
+  struct VirtualLink *vl;
+  struct GNUNET_TIME_RelativeNBO validity_duration;
+  struct IncomingRequest *ir;
+  struct Neighbour *n;
+  struct GNUNET_PeerIdentity sender;
+
+  /* DV-routed messages are not allowed for validation challenges */
+  if (cmc->total_hops > 0)
+  {
+    GNUNET_break_op (0);
     finish_cmc_handling (cmc);
     return;
   }
+  validity_duration = cmc->im.expected_address_validity;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received address validation challenge %s\n",
               GNUNET_sh2s (&tvc->challenge.value));
-  tvr = GNUNET_new (struct TransportValidationResponseMessage);
-  tvr->header.type =
+  /* If we have a virtual link, we use this mechanism to signal the
+     size of the flow control window, and to allow the sender
+     to ask for increases. If for us the virtual link is still down,
+     we will always give a window size of zero. */
+  tvr.header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
-  tvr->header.size = htons (sizeof (*tvr));
-  tvr->challenge = tvc->challenge;
-  tvr->origin_time = tvc->sender_time;
-  tvr->validity_duration = cmc->im.expected_address_validity;
+  tvr.header.size = htons (sizeof(tvr));
+  tvr.reserved = htonl (0);
+  tvr.challenge = tvc->challenge;
+  tvr.origin_time = tvc->sender_time;
+  tvr.validity_duration = validity_duration;
   {
     /* create signature */
     struct TransportValidationPS tvp =
-      {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
-       .purpose.size = htonl (sizeof (tvp)),
-       .validity_duration = tvr->validity_duration,
-       .challenge = tvc->challenge};
+    .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+      .purpose.size = htonl (sizeof(tvp)),
+      .validity_duration = validity_duration,
+      .challenge = tvc->challenge };
 
     GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
                                                           &tvp.purpose,
-                                                          &tvr->signature));
+                                                          &tvr.signature));
   }
-  route_message (&cmc->im.sender,
-                 &tvr->header,
-                 RMO_ANYTHING_GOES | RMO_REDUNDANT);
+  route_control_message_without_fc (&cmc->im.sender,
+                                    &tvr.header,
+                                    RMO_ANYTHING_GOES | RMO_REDUNDANT);
+  sender = cmc->im.sender;
   finish_cmc_handling (cmc);
+  vl = lookup_virtual_link (&sender);
+  if (NULL != vl)
+    return;
+
+  /* For us, the link is still down, but we need bi-directional
+     connections (for flow-control and for this to be useful for
+     CORE), so we must try to bring the link up! */
+
+  /* (1) Check existing queues, if any, we may be lucky! */
+  n = lookup_neighbour (&sender);
+  if (NULL != n)
+    for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+      start_address_validation (&sender, q->address);
+  /* (2) Also try to see if we have addresses in PEERSTORE for this peer
+     we could use */
+  for (ir = ir_head; NULL != ir; ir = ir->next)
+    if (0 == GNUNET_memcmp (&ir->pid, &sender))
+      return;
+  /* we are already trying */
+  ir = GNUNET_new (struct IncomingRequest);
+  ir->pid = sender;
+  GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
+  ir->wc = GNUNET_PEERSTORE_watch (peerstore,
+                                   "transport",
+                                   &ir->pid,
+                                   GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
+                                   &handle_hello_for_incoming,
+                                   ir);
+  ir_total++;
+  /* Bound attempts we do in parallel here, might otherwise get excessive */
+  while (ir_total > MAX_INCOMING_REQUEST)
+    free_incoming_request (ir_head);
 }
 
 
@@ -6974,51 +7842,6 @@ peerstore_store_validation_cb (void *cls, int success)
 }
 
 
-/**
- * Task run periodically to validate some address based on #validation_heap.
- *
- * @param cls NULL
- */
-static void
-validation_start_cb (void *cls);
-
-
-/**
- * Set the time for next_challenge of @a vs to @a new_time.
- * Updates the heap and if necessary reschedules the job.
- *
- * @param vs validation state to update
- * @param new_time new time for revalidation
- */
-static void
-update_next_challenge_time (struct ValidationState *vs,
-                            struct GNUNET_TIME_Absolute new_time)
-{
-  struct GNUNET_TIME_Relative delta;
-
-  if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
-    return; /* be lazy */
-  vs->next_challenge = new_time;
-  if (NULL == vs->hn)
-    vs->hn =
-      GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
-  else
-    GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
-  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
-      (NULL != validation_task))
-    return;
-  if (NULL != validation_task)
-    GNUNET_SCHEDULER_cancel (validation_task);
-  /* randomize a bit */
-  delta.rel_value_us =
-    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
-                              MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
-  new_time = GNUNET_TIME_absolute_add (new_time, delta);
-  validation_task =
-    GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
-}
-
-
 /**
  * Find the queue matching @a pid and @a address.
  *
@@ -7059,8 +7882,8 @@ handle_validation_response (
 {
   struct CommunicatorMessageContext *cmc = cls;
   struct ValidationState *vs;
-  struct CheckKnownChallengeContext ckac = {.challenge = &tvr->challenge,
-                                            .vs = NULL};
+  struct CheckKnownChallengeContext ckac = { .challenge = &tvr->challenge,
+                                             .vs = NULL };
   struct GNUNET_TIME_Absolute origin_time;
   struct Queue *q;
   struct Neighbour *n;
@@ -7099,10 +7922,10 @@ handle_validation_response (
   {
     /* check signature */
     struct TransportValidationPS tvp =
-      {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
-       .purpose.size = htonl (sizeof (tvp)),
-       .validity_duration = tvr->validity_duration,
-       .challenge = tvr->challenge};
+    .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+      .purpose.size = htonl (sizeof(tvp)),
+      .validity_duration = tvr->validity_duration,
+      .challenge = tvr->challenge };
 
     if (
       GNUNET_OK !=
@@ -7131,7 +7954,7 @@ handle_validation_response (
   vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &vs->challenge,
-                              sizeof (vs->challenge));
+                              sizeof(vs->challenge));
   vs->first_challenge_use = GNUNET_TIME_absolute_subtract (
     vs->validated_until,
     GNUNET_TIME_relative_multiply (vs->validation_rtt,
@@ -7169,14 +7992,22 @@ handle_validation_response (
   q->validated_until = vs->validated_until;
   q->pd.aged_rtt = vs->validation_rtt;
   n = q->neighbour;
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+  vl = lookup_virtual_link (&vs->pid);
   if (NULL != vl)
   {
     /* Link was already up, remember n is also now available and we are done */
-    vl->n = n;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Virtual link to %s could now also direct neighbour!\n",
-                GNUNET_i2s (&vs->pid));
+    if (NULL == vl->n)
+    {
+      vl->n = n;
+      n->vl = vl;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Virtual link to %s could now also direct neighbour!\n",
+                  GNUNET_i2s (&vs->pid));
+    }
+    else
+    {
+      GNUNET_assert (n == vl->n);
+    }
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -7185,7 +8016,9 @@ handle_validation_response (
   vl = GNUNET_new (struct VirtualLink);
   vl->target = n->pid;
   vl->n = n;
+  n->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
   GNUNET_break (GNUNET_YES ==
@@ -7194,6 +8027,7 @@ handle_validation_response (
                   &vl->target,
                   vl,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  consider_sending_fc (vl);
   /* We lacked a confirmed connection to the target
      before, so tell CORE about it (finally!) */
   cores_send_connect_info (&n->pid);
@@ -7216,9 +8050,94 @@ handle_incoming_msg (void *cls,
   cmc->tc = tc;
   cmc->im = *im;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message via communicator from peer %s\n",
-              GNUNET_i2s (&im->sender));
-  demultiplex_with_cmc (cmc, (const struct GNUNET_MessageHeader *) &im[1]);
+              "Received message via communicator from peer %s\n",
+              GNUNET_i2s (&im->sender));
+  demultiplex_with_cmc (cmc, (const struct GNUNET_MessageHeader *) &im[1]);
+}
+
+
+/**
+ * Communicator gave us a transport address validation response.  Process the
+ * request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param fc the message that was received
+ */
+static void
+handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  struct VirtualLink *vl;
+  uint32_t seq;
+  struct GNUNET_TIME_Absolute st;
+  uint64_t os;
+  uint64_t wnd;
+
+  vl = lookup_virtual_link (&cmc->im.sender);
+  if (NULL == vl)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# FC dropped: virtual link unknown",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+  st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
+  if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
+  {
+    /* out of order, drop */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# FC dropped: message out of order",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+  seq = ntohl (fc->seq);
+  if (seq < vl->last_fc_seq)
+  {
+    /* Wrap-around/reset of other peer; start all counters from zero */
+    vl->outbound_fc_window_size_used = 0;
+  }
+  vl->last_fc_seq = seq;
+  vl->last_fc_timestamp = st;
+  vl->outbound_fc_window_size = GNUNET_ntohll (fc->inbound_window_size);
+  os = GNUNET_ntohll (fc->outbound_sent);
+  vl->incoming_fc_window_size_loss =
+    (int64_t) (os - vl->incoming_fc_window_size_used);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received FC from %s, seq %u, new window %llu (loss at %lld)\n",
+              GNUNET_i2s (&vl->target),
+              (unsigned int) seq,
+              (unsigned long long) vl->outbound_fc_window_size,
+              (long long) vl->incoming_fc_window_size_loss);
+  wnd = GNUNET_ntohll (fc->outbound_window_size);
+  if ((wnd < vl->incoming_fc_window_size) ||
+      (vl->last_outbound_window_size_received != wnd) ||
+      (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX)
+       % FC_NO_CHANGE_REPLY_PROBABILITY))
+  {
+    /* Consider re-sending our FC message, as clearly the
+       other peer's idea of the window is not up-to-date */
+    consider_sending_fc (vl);
+  }
+  if ((wnd == vl->incoming_fc_window_size) &&
+      (vl->last_outbound_window_size_received == wnd) &&
+      (NULL != vl->fc_retransmit_task))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Stopping FC retransmission to %s: peer is current at window %llu\n",
+                GNUNET_i2s (&vl->target),
+                (unsigned long long) wnd);
+    GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+    vl->fc_retransmit_task = NULL;
+  }
+  vl->last_outbound_window_size_received = wnd;
+  /* FC window likely increased, check transmission possibilities! */
+  check_vl_transmission (vl);
+  finish_cmc_handling (cmc);
 }
 
 
@@ -7234,41 +8153,45 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_MQ_MessageHandler handlers[] =
-    {GNUNET_MQ_hd_var_size (fragment_box,
-                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
-                            struct TransportFragmentBoxMessage,
-                            &cmc),
-     GNUNET_MQ_hd_var_size (reliability_box,
-                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
-                            struct TransportReliabilityBoxMessage,
-                            &cmc),
-     GNUNET_MQ_hd_var_size (reliability_ack,
-                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
-                            struct TransportReliabilityAckMessage,
-                            &cmc),
-     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
-                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
-                            struct TransportBackchannelEncapsulationMessage,
-                            &cmc),
-     GNUNET_MQ_hd_var_size (dv_learn,
-                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
-                            struct TransportDVLearnMessage,
-                            &cmc),
-     GNUNET_MQ_hd_var_size (dv_box,
-                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
-                            struct TransportDVBoxMessage,
-                            &cmc),
-     GNUNET_MQ_hd_fixed_size (
-       validation_challenge,
-       GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
-       struct TransportValidationChallengeMessage,
-       &cmc),
-     GNUNET_MQ_hd_fixed_size (
-       validation_response,
-       GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
-       struct TransportValidationResponseMessage,
-       &cmc),
-     GNUNET_MQ_handler_end ()};
+  { GNUNET_MQ_hd_var_size (fragment_box,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+                           struct TransportFragmentBoxMessage,
+                           &cmc),
+    GNUNET_MQ_hd_var_size (reliability_box,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+                           struct TransportReliabilityBoxMessage,
+                           &cmc),
+    GNUNET_MQ_hd_var_size (reliability_ack,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+                           struct TransportReliabilityAckMessage,
+                           &cmc),
+    GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+                           struct TransportBackchannelEncapsulationMessage,
+                           &cmc),
+    GNUNET_MQ_hd_var_size (dv_learn,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+                           struct TransportDVLearnMessage,
+                           &cmc),
+    GNUNET_MQ_hd_var_size (dv_box,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+                           struct TransportDVBoxMessage,
+                           &cmc),
+    GNUNET_MQ_hd_fixed_size (
+      validation_challenge,
+      GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
+      struct TransportValidationChallengeMessage,
+      &cmc),
+    GNUNET_MQ_hd_fixed_size (flow_control,
+                             GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL,
+                             struct TransportFlowControlMessage,
+                             &cmc),
+    GNUNET_MQ_hd_fixed_size (
+      validation_response,
+      GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
+      struct TransportValidationResponseMessage,
+      &cmc),
+    GNUNET_MQ_handler_end () };
   int ret;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -7351,12 +8274,13 @@ prepare_pending_acknowledgement (struct Queue *queue,
   {
     GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                                 &pa->ack_uuid,
-                                sizeof (pa->ack_uuid));
-  } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
-                           pending_acks,
-                           &pa->ack_uuid.value,
-                           pa,
-                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+                                sizeof(pa->ack_uuid));
+  }
+  while (GNUNET_YES != GNUNET_CONTAINER_multiuuidmap_put (
+           pending_acks,
+           &pa->ack_uuid.value,
+           pa,
+           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
   GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
   if (NULL != dvh)
@@ -7365,7 +8289,7 @@ prepare_pending_acknowledgement (struct Queue *queue,
   pa->message_size = pm->bytes_msg;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Waiting for ACKnowledgment `%s' for <%llu>\n",
-              GNUNET_sh2s (&pa->ack_uuid.value),
+              GNUNET_uuid2s (&pa->ack_uuid.value),
               pm->logging_uuid);
   return pa;
 }
@@ -7392,8 +8316,8 @@ fragment_message (struct Queue *queue,
   uint16_t mtu;
 
   mtu = (0 == queue->mtu)
-          ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
-          : queue->mtu;
+        ? UINT16_MAX - sizeof(struct GNUNET_TRANSPORT_SendMessageTo)
+        : queue->mtu;
   set_pending_message_uuid (pm);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Fragmenting message %llu <%llu> to %s for MTU %u\n",
@@ -7404,7 +8328,7 @@ fragment_message (struct Queue *queue,
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
 
   /* This invariant is established in #handle_add_queue_message() */
-  GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
+  GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage));
 
   /* select fragment for transmission, descending the tree if it has
      been expanded until we are at a leaf or at a fragment that is small
@@ -7414,7 +8338,7 @@ fragment_message (struct Queue *queue,
   while (((ff->bytes_msg > mtu) || (pm == ff)) &&
          (ff->frag_off == ff->bytes_msg) && (NULL != ff->head_frag))
   {
-    ff = ff->head_frag; /* descent into fragmented fragments */
+    ff = ff->head_frag;   /* descent into fragmented fragments */
   }
 
   if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg))
@@ -7436,31 +8360,31 @@ fragment_message (struct Queue *queue,
       const struct TransportFragmentBoxMessage *tfbo;
 
       tfbo = (const struct TransportFragmentBoxMessage *) orig;
-      orig += sizeof (struct TransportFragmentBoxMessage);
-      msize -= sizeof (struct TransportFragmentBoxMessage);
+      orig += sizeof(struct TransportFragmentBoxMessage);
+      msize -= sizeof(struct TransportFragmentBoxMessage);
       xoff = ntohs (tfbo->frag_off);
     }
-    fragmax = mtu - sizeof (struct TransportFragmentBoxMessage);
+    fragmax = mtu - sizeof(struct TransportFragmentBoxMessage);
     fragsize = GNUNET_MIN (msize - ff->frag_off, fragmax);
     frag =
-      GNUNET_malloc (sizeof (struct PendingMessage) +
-                     sizeof (struct TransportFragmentBoxMessage) + fragsize);
+      GNUNET_malloc (sizeof(struct PendingMessage)
+                     + sizeof(struct TransportFragmentBoxMessage) + fragsize);
     frag->logging_uuid = logging_uuid_gen++;
     frag->vl = pm->vl;
     frag->frag_parent = ff;
     frag->timeout = pm->timeout;
-    frag->bytes_msg = sizeof (struct TransportFragmentBoxMessage) + fragsize;
+    frag->bytes_msg = sizeof(struct TransportFragmentBoxMessage) + fragsize;
     frag->pmt = PMT_FRAGMENT_BOX;
     msg = (char *) &frag[1];
     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
     tfb.header.size =
-      htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
+      htons (sizeof(struct TransportFragmentBoxMessage) + fragsize);
     tfb.ack_uuid = pa->ack_uuid;
     tfb.msg_uuid = pm->msg_uuid;
     tfb.frag_off = htons (ff->frag_off + xoff);
     tfb.msg_size = htons (pm->bytes_msg);
-    memcpy (msg, &tfb, sizeof (tfb));
-    memcpy (&msg[sizeof (tfb)], &orig[ff->frag_off], fragsize);
+    memcpy (msg, &tfb, sizeof(tfb));
+    memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize);
     GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag);
     ff->frag_off += fragsize;
     ff = frag;
@@ -7507,7 +8431,7 @@ reliability_box_message (struct Queue *queue,
   if (NULL != pm->bpm)
     return pm->bpm; /* already computed earlier: do nothing */
   GNUNET_assert (NULL == pm->head_frag);
-  if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
+  if (pm->bytes_msg + sizeof(rbox) > UINT16_MAX)
   {
     /* failed hard */
     GNUNET_break (0);
@@ -7521,24 +8445,24 @@ reliability_box_message (struct Queue *queue,
               queue->address);
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
 
-  bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
-                       pm->bytes_msg);
+  bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox)
+                       pm->bytes_msg);
   bpm->logging_uuid = logging_uuid_gen++;
   bpm->vl = pm->vl;
   bpm->frag_parent = pm;
   GNUNET_CONTAINER_MDLL_insert (frag, pm->head_frag, pm->tail_frag, bpm);
   bpm->timeout = pm->timeout;
   bpm->pmt = PMT_RELIABILITY_BOX;
-  bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
+  bpm->bytes_msg = pm->bytes_msg + sizeof(rbox);
   set_pending_message_uuid (bpm);
   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
-  rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
-  rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
+  rbox.header.size = htons (sizeof(rbox) + pm->bytes_msg);
+  rbox.ack_countdown = htonl (0);  // FIXME: implement ACK countdown support
 
   rbox.ack_uuid = pa->ack_uuid;
   msg = (char *) &bpm[1];
-  memcpy (msg, &rbox, sizeof (rbox));
-  memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
+  memcpy (msg, &rbox, sizeof(rbox));
+  memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg);
   pm->bpm = bpm;
   return bpm;
 }
@@ -7604,10 +8528,206 @@ update_pm_next_attempt (struct PendingMessage *pm,
 
 
 /**
- * We believe we are ready to transmit a message on a queue.
- * Gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * Context for #select_best_pending_from_link().
+ */
+struct PendingMessageScoreContext
+{
+  /**
+   * Set to the best message that was found, NULL for none.
+   */
+  struct PendingMessage *best;
+
+  /**
+   * DVH that @e best should take, or NULL for direct transmission.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * What is the estimated total overhead for this message?
+   */
+  size_t real_overhead;
+
+  /**
+   * Number of pending messages we seriously considered this time.
+   */
+  unsigned int consideration_counter;
+
+  /**
+   * Did we have to fragment?
+   */
+  int frag;
+
+  /**
+   * Did we have to reliability box?
+   */
+  int relb;
+};
+
+
+/**
+ * Select the best pending message from @a vl for transmission
+ * via @a queue.
+ *
+ * @param sc[in,out] best message so far (NULL for none), plus scoring data
+ * @param queue the queue that will be used for transmission
+ * @param vl the virtual link providing the messages
+ * @param dvh path we are currently considering, or NULL for none
+ * @param overhead number of bytes of overhead to be expected
+ *        from DV encapsulation (0 for without DV)
+ */
+static void
+select_best_pending_from_link (struct PendingMessageScoreContext *sc,
+                               struct Queue *queue,
+                               struct VirtualLink *vl,
+                               struct DistanceVectorHop *dvh,
+                               size_t overhead)
+{
+  struct GNUNET_TIME_Absolute now;
+
+  now = GNUNET_TIME_absolute_get ();
+  for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos;
+       pos = pos->next_vl)
+  {
+    size_t real_overhead = overhead;
+    int frag;
+    int relb;
+
+    if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
+      continue;   /* DV messages must not be DV-routed to next hop! */
+    if (pos->next_attempt.abs_value_us > now.abs_value_us)
+      break;   /* too early for all messages, they are sorted by next_attempt */
+    if (NULL != pos->qe)
+      continue;   /* not eligible */
+    sc->consideration_counter++;
+    /* determine if we have to fragment, if so add fragmentation
+       overhead! */
+    frag = GNUNET_NO;
+    if (((0 != queue->mtu) &&
+         (pos->bytes_msg + real_overhead > queue->mtu)) ||
+        (pos->bytes_msg > UINT16_MAX - sizeof(struct
+                                              GNUNET_TRANSPORT_SendMessageTo))
+        ||
+        (NULL != pos->head_frag /* fragments already exist, should
+                                     respect that even if MTU is 0 for
+                                     this queue */))
+    {
+      frag = GNUNET_YES;
+      if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+      {
+        /* FIXME-FRAG-REL-UUID: we could use an optimized, shorter fragmentation
+           header without the ACK UUID when using a *reliable* channel! */
+      }
+      real_overhead = overhead + sizeof(struct TransportFragmentBoxMessage);
+    }
+    /* determine if we have to reliability-box, if so add reliability box
+       overhead */
+    relb = GNUNET_NO;
+    if ((GNUNET_NO == frag) &&
+        (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) &&
+        (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc))
+    {
+      relb = GNUNET_YES;
+      real_overhead += sizeof(struct TransportReliabilityBoxMessage);
+    }
+
+    /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
+       message would beat it! */
+    if (NULL != sc->best)
+    {
+      /* CHECK if pos fits queue BETTER (=smaller) than pm, if not: continue;
+         OPTIMIZE-ME: This is a heuristic, which so far has NOT been
+         experimentally validated. There may be some huge potential for
+         improvement here. Also, we right now only compare how well the
+         given message fits _this_ queue, and do not consider how well other
+         queues might suit the message. Taking other queues into consideration
+         may further improve the result, but could also be expensive
+         in terms of CPU time.  */long long sc_score = sc->frag * 40 + sc->relb * 20 + sc->real_overhead;
+      long long pm_score = frag * 40 + relb * 20 + real_overhead;
+      long long time_delta =
+        (sc->best->next_attempt.abs_value_us - pos->next_attempt.abs_value_us)
+        / 1000LL;
+
+      /* "time_delta" considers which message has been 'ready' for transmission
+         for longer, if a message has a preference for low latency, increase
+         the weight of the time_delta by 10x if it is favorable for that message */
+      if ((0 != (pos->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+          (0 != (sc->best->prefs & GNUNET_MQ_PREF_LOW_LATENCY)))
+        time_delta *= 10;     /* increase weight (always, both are low latency) */
+      else if ((0 != (pos->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+               (time_delta > 0))
+        time_delta *=
+          10;     /* increase weight, favors 'pos', which is low latency */
+      else if ((0 != (sc->best->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+               (time_delta < 0))
+        time_delta *=
+          10;     /* increase weight, favors 'sc->best', which is low latency */
+      if (0 != queue->mtu)
+      {
+        /* Grant bonus if we are bellow MTU, larger bonus the closer we will
+           be to the MTU */
+        if (queue->mtu > sc->real_overhead + sc->best->bytes_msg)
+          sc_score -= queue->mtu - (sc->real_overhead + sc->best->bytes_msg);
+        if (queue->mtu > real_overhead + pos->bytes_msg)
+          pm_score -= queue->mtu - (real_overhead + pos->bytes_msg);
+      }
+      if (sc_score + time_delta > pm_score)
+        continue;     /* sc_score larger, keep sc->best */
+    }
+    sc->best = pos;
+    sc->dvh = dvh;
+    sc->frag = frag;
+    sc->relb = relb;
+  }
+}
+
+
+/**
+ * Function to call to further operate on the now DV encapsulated
+ * message @a hdr, forwarding it via @a next_hop under respect of
+ * @a options.
+ *
+ * @param cls a `struct PendingMessageScoreContext`
+ * @param next_hop next hop of the DV path
+ * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
+ * @param options options of the original message
+ */
+static void
+extract_box_cb (void *cls,
+                struct Neighbour *next_hop,
+                const struct GNUNET_MessageHeader *hdr,
+                enum RouteMessageOptions options)
+{
+  struct PendingMessageScoreContext *sc = cls;
+  struct PendingMessage *pm = sc->best;
+  struct PendingMessage *bpm;
+  uint16_t bsize = ntohs (hdr->size);
+
+  GNUNET_assert (NULL == pm->bpm);
+  bpm = GNUNET_malloc (sizeof(struct PendingMessage) + bsize);
+  bpm->logging_uuid = logging_uuid_gen++;
+  bpm->pmt = PMT_DV_BOX;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating DV Box %llu for original message %llu (next hop is %s)\n",
+              bpm->logging_uuid,
+              pm->logging_uuid,
+              GNUNET_i2s (&next_hop->pid));
+  memcpy (&bpm[1], hdr, bsize);
+  pm->bpm = bpm;
+}
+
+
+/**
+ * We believe we are ready to transmit a `struct PendingMessage` on a
+ * queue, the big question is which one!  We need to see if there is
+ * one pending that is allowed by flow control and congestion control
+ * and (ideally) matches our queue's performance profile.
+ *
+ * If such a message is found, we give the message to the communicator
+ * for transmission (updating the tracker, and re-scheduling ourselves
+ * if applicable).
+ *
+ * If no such message is found, the queue's `idle` field must be set
+ * to #GNUNET_YES.
  *
  * @param cls the `struct Queue` to process transmissions for
  */
@@ -7615,128 +8735,143 @@ static void
 transmit_on_queue (void *cls)
 {
   struct Queue *queue = cls;
-
-  queue->transmit_task = NULL;
-#if FIXME - NEXT
   struct Neighbour *n = queue->neighbour;
+  struct PendingMessageScoreContext sc;
   struct PendingMessage *pm;
-  struct PendingMessage *s;
-  uint32_t overhead;
 
-  if (NULL == (pm = n->pending_msg_head))
+  queue->transmit_task = NULL;
+  if (NULL == n->vl)
   {
-    /* no message pending, nothing to do here! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "No messages waiting on queue %s to %s, going to sleep\n",
-                queue->address,
-                GNUNET_i2s (&n->pid));
+                "Virtual link `%s' is down, cannot have PM for queue `%s'\n",
+                GNUNET_i2s (&n->pid),
+                queue->address);
+    queue->idle = GNUNET_YES;
     return;
   }
-  if (NULL != pm->qe)
+  memset (&sc, 0, sizeof(sc));
+  select_best_pending_from_link (&sc, queue, n->vl, NULL, 0);
+  if (NULL == sc.best)
   {
-    /* message still pending with communciator!
-       LOGGING-FIXME: Use stats? Should this not be rare? */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Waiting on communicator for queue %s to %s, going to sleep\n",
-                queue->address,
-                GNUNET_i2s (&n->pid));
-    return;
+    /* Also look at DVH that have the n as first hop! */
+    for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh;
+         dvh = dvh->next_neighbour)
+    {
+      select_best_pending_from_link (&sc,
+                                     queue,
+                                     dvh->dv->vl,
+                                     dvh,
+                                     sizeof(struct GNUNET_PeerIdentity)
+                                     * (1 + dvh->distance)
+                                     + sizeof(struct TransportDVBoxMessage)
+                                     + sizeof(struct TransportDVBoxPayloadP));
+    }
   }
-  schedule_transmit_on_queue (queue, GNUNET_YES);
-  if (NULL != queue->transmit_task)
+  if (NULL == sc.best)
   {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Scheduled transmission on queue %s to %s for later, going to sleep\n",
-      queue->address,
-      GNUNET_i2s (&n->pid));
-    return; /* do it later */
-  }
-  overhead = 0;
-  if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    overhead += sizeof (struct TransportReliabilityBoxMessage);
-  s = pm;
-  if ( ( (0 != queue->mtu) &&
-        (pm->bytes_msg + overhead > queue->mtu) ) ||
-       (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
-       (NULL != pm->head_frag /* fragments already exist, should
-                                respect that even if MTU is 0 for
-                                this queue */) )
-    s = fragment_message (queue, pm->dvh, s);
-  if (NULL == s)
-  {
-    /* Fragmentation failed, try next message... */
+    /* no message pending, nothing to do here! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
+                "No pending messages, queue `%s' to %s now idle\n",
                 queue->address,
-                GNUNET_i2s (&n->pid),
-                pm->logging_uuid);
-    schedule_transmit_on_queue (queue, GNUNET_NO);
-    return;
-  }
-  if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
-    s = reliability_box_message (queue, pm->dvh, s);
-  if (NULL == s)
-  {
-    /* Reliability boxing failed, try next message... */
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
-      queue->address,
-      GNUNET_i2s (&n->pid),
-      pm->logging_uuid);
-    schedule_transmit_on_queue (queue, GNUNET_NO);
+                GNUNET_i2s (&n->pid));
+    queue->idle = GNUNET_YES;
     return;
   }
 
-  /* Pass 's' for transission to the communicator */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Passing message <%llu> to queue %s for peer %s\n",
-              s->logging_uuid,
-              queue->address,
-              GNUNET_i2s (&n->pid));
-  queue_send_msg (queue, s, &s[1], s->bytes_msg);
-  // FIXME: do something similar to the logic below
-  // in defragmentation / reliability ACK handling!
-
-  /* Check if this transmission somehow conclusively finished handing 'pm'
-     even without any explicit ACKs */
-  if ((PMT_CORE == s->pmt) &&
-      (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
+  /* Given selection in `sc`, do transmission */
+  pm = sc.best;
+  if (NULL != sc.dvh)
   {
-    /* Full message sent, and over reliabile channel */
-    client_send_response (pm);
+    GNUNET_assert (PMT_DV_BOX != pm->pmt);
+    if (NULL != sc.best->bpm)
+    {
+      /* We did this boxing before, but possibly for a different path!
+         Discard old DV box!  OPTIMIZE-ME: we might want to check if
+         it is the same and then not re-build the message... */
+      free_pending_message (sc.best->bpm);
+      sc.best->bpm = NULL;
+    }
+    encapsulate_for_dv (sc.dvh->dv,
+                        1,
+                        &sc.dvh,
+                        (const struct GNUNET_MessageHeader *) &sc.best[1],
+                        &extract_box_cb,
+                        &sc,
+                        RMO_NONE);
+    GNUNET_assert (NULL != sc.best->bpm);
+    pm = sc.best->bpm;
+  }
+  if (GNUNET_YES == sc.frag)
+  {
+    pm = fragment_message (queue, sc.dvh, pm);
+    if (NULL == pm)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
+                  queue->address,
+                  GNUNET_i2s (&n->pid),
+                  sc.best->logging_uuid);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      return;
+    }
   }
-  else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
-            queue->tc->details.communicator.cc) &&
-           (PMT_FRAGMENT_BOX == s->pmt))
+  else if (GNUNET_YES == sc.relb)
   {
-    struct PendingMessage *pos;
-
-    /* Fragment sent over reliabile channel */
-    free_fragment_tree (s);
-    pos = s->frag_parent;
-    GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
-    GNUNET_free (s);
-    /* check if subtree is done */
-    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
-           (pos != pm))
+    pm = reliability_box_message (queue, sc.dvh, pm);
+    if (NULL == pm)
     {
-      s = pos;
-      pos = s->frag_parent;
-      GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
-      GNUNET_free (s);
+      /* Reliability boxing failed, try next message... */
+      GNUNET_log (
+        GNUNET_ERROR_TYPE_DEBUG,
+        "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
+        queue->address,
+        GNUNET_i2s (&n->pid),
+        sc.best->logging_uuid);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      return;
     }
+  }
 
-    /* Was this the last applicable fragmment? */
-    if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (pm);
+  /* Pass 'pm' for transission to the communicator */
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "Passing message <%llu> to queue %s for peer %s (considered %u others)\n",
+    pm->logging_uuid,
+    queue->address,
+    GNUNET_i2s (&n->pid),
+    sc.consideration_counter);
+
+  /* Flow control: increment amount of traffic sent; if we are routing
+     via DV (and thus the ultimate target of the pending message is for
+     a different virtual link than the one of the queue), then we need
+     to use up not only the window of the direct link but also the
+     flow control window for the DV link! */pm->vl->outbound_fc_window_size_used += pm->bytes_msg;
+
+  if (pm->vl != queue->neighbour->vl)
+  {
+    /* If the virtual link of the queue differs, this better be distance
+       vector routing! */
+    GNUNET_assert (NULL != sc.dvh);
+    /* If we do distance vector routing, we better not do this for a
+       message that was itself DV-routed */
+    GNUNET_assert (PMT_DV_BOX != sc.best->pmt);
+    /* We use the size of the unboxed message here, to avoid counting
+       the DV-Box header which is eaten up on the way by intermediaries */
+    queue->neighbour->vl->outbound_fc_window_size_used += sc.best->bytes_msg;
   }
-  else if (PMT_CORE != pm->pmt)
+  else
   {
-    /* This was an acknowledgement of some type, always free */
-    free_pending_message (pm);
+    GNUNET_assert (NULL == sc.dvh);
+  }
+
+  queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
+
+  /* Check if this transmission somehow conclusively finished handing 'pm'
+     even without any explicit ACKs */
+  if ((PMT_CORE == pm->pmt) ||
+      (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
+  {
+    completed_pending_message (pm);
   }
   else
   {
@@ -7745,18 +8880,17 @@ transmit_on_queue (void *cls)
        characteristics (i.e. RTT); it takes one RTT for the message to
        arrive and the ACK to come back in the best case; but the other
        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
-       retransmitting.  Note that in the future this heuristic should
-       likely be improved further (measure RTT stability, consider
-       message urgency and size when delaying ACKs, etc.) */
-    update_pm_next_attempt (s,
+       retransmitting.
+
+       OPTIMIZE: Note that in the future this heuristic should likely
+       be improved further (measure RTT stability, consider message
+       urgency and size when delaying ACKs, etc.) */update_pm_next_attempt (pm,
                             GNUNET_TIME_relative_to_absolute (
                               GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
                                                              4)));
   }
-
   /* finally, re-schedule queue transmission task itself */
-  schedule_transmit_on_queue (queue, GNUNET_NO);
-#endif
+  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
 }
 
 
@@ -7871,7 +9005,7 @@ handle_send_message_ack (void *cls,
     for (struct Queue *queue = tc->details.communicator.queue_head;
          NULL != queue;
          queue = queue->next_client)
-      schedule_transmit_on_queue (queue, GNUNET_NO);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
   {
@@ -7880,7 +9014,7 @@ handle_send_message_ack (void *cls,
                               "# Transmission throttled due to queue queue limit",
                               -1,
                               GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue, GNUNET_NO);
+    schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
 
   if (NULL != (pm = qe->pm))
@@ -7894,21 +9028,7 @@ handle_send_message_ack (void *cls,
        transmit on queue for queues of the neighbour */
     vl = pm->vl;
     if (vl->pending_msg_head == pm)
-    {
-#if FIXME - NEXT
-      for (struct Queue *queue = n->queue_head; NULL != queue;
-           queue = queue->next_neighbour)
-        schedule_transmit_on_queue (queue, GNUNET_NO);
-#endif
-    }
-    if (GNUNET_OK != ntohl (sma->status))
-    {
-      GNUNET_log (
-        GNUNET_ERROR_TYPE_INFO,
-        "Queue failed in transmission <%llu>, will try retransmission immediately\n",
-        pm->logging_uuid);
-      update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
-    }
+      check_vl_transmission (vl);
   }
   GNUNET_free (qe);
 }
@@ -7935,10 +9055,10 @@ notify_client_queues (void *cls,
   for (struct Queue *q = neighbour->queue_head; NULL != q;
        q = q->next_neighbour)
   {
-    struct MonitorEvent me = {.rtt = q->pd.aged_rtt,
-                              .cs = q->cs,
-                              .num_msg_pending = q->num_msg_pending,
-                              .num_bytes_pending = q->num_bytes_pending};
+    struct MonitorEvent me = { .rtt = q->pd.aged_rtt,
+                               .cs = q->cs,
+                               .num_msg_pending = q->num_msg_pending,
+                               .num_bytes_pending = q->num_bytes_pending };
 
     notify_monitor (tc, pid, q->address, q->nt, &me);
   }
@@ -8018,7 +9138,7 @@ suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char *address)
   prefix = GNUNET_HELLO_address_to_prefix (address);
   if (NULL == prefix)
   {
-    GNUNET_break (0); /* We got an invalid address!? */
+    GNUNET_break (0);  /* We got an invalid address!? */
     return;
   }
   tc = lookup_communicator (prefix);
@@ -8032,6 +9152,7 @@ suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char *address)
                 "Cannot connect to %s at `%s', no matching communicator present\n",
                 GNUNET_i2s (pid),
                 address);
+    GNUNET_free (prefix);
     return;
   }
   /* forward suggestion for queue creation to communicator */
@@ -8040,6 +9161,7 @@ suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char *address)
               (unsigned int) idgen,
               prefix,
               address);
+  GNUNET_free (prefix);
   alen = strlen (address) + 1;
   env =
     GNUNET_MQ_msg_extra (cqm, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
@@ -8062,10 +9184,10 @@ validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
 {
   struct TransportValidationChallengeMessage tvc;
 
-  vs->last_challenge_use = GNUNET_TIME_absolute_get ();
+  vs->last_challenge_use = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
   tvc.header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
-  tvc.header.size = htons (sizeof (tvc));
+  tvc.header.size = htons (sizeof(tvc));
   tvc.reserved = htonl (0);
   tvc.challenge = vs->challenge;
   tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
@@ -8073,7 +9195,7 @@ validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
               "Sending address validation challenge %s to %s\n",
               GNUNET_sh2s (&tvc.challenge.value),
               GNUNET_i2s (&q->neighbour->pid));
-  queue_send_msg (q, NULL, &tvc, sizeof (tvc));
+  queue_send_msg (q, NULL, &tvc, sizeof(tvc));
 }
 
 
@@ -8103,8 +9225,8 @@ validation_start_cb (void *cls)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Address validation task not scheduled anymore, nothing to do\n");
-    return; /* woopsie, no more addresses known, should only
-               happen if we're really a lonely peer */
+    return;   /* woopsie, no more addresses known, should only
+                 happen if we're really a lonely peer */
   }
   q = find_queue (&vs->pid, vs->address);
   if (NULL == q)
@@ -8183,7 +9305,7 @@ check_connection_quality (void *cls,
     ctx->num_queues++;
     if (0 == ctx->k--)
       ctx->q = q;
-    /* OPTIMIZE-FIXME: in the future, add reliability / goodput
+    /* FIXME-CONQ-STATISTICS: in the future, add reliability / goodput
        statistics and consider those as well here? */
     if (q->pd.aged_rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
       do_inc = GNUNET_YES;
@@ -8252,7 +9374,7 @@ start_dv_learn (void *cls)
   lle = GNUNET_new (struct LearnLaunchEntry);
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &lle->challenge,
-                              sizeof (lle->challenge));
+                              sizeof(lle->challenge));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting launch DV learn with challenge %s\n",
               GNUNET_sh2s (&lle->challenge.value));
@@ -8264,18 +9386,18 @@ start_dv_learn (void *cls)
                   lle,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   dvl.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
-  dvl.header.size = htons (sizeof (dvl));
+  dvl.header.size = htons (sizeof(dvl));
   dvl.num_hops = htons (0);
   dvl.bidirectional = htons (0);
   dvl.non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
   dvl.monotonic_time =
     GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
   {
-    struct DvInitPS dvip = {.purpose.purpose = htonl (
-                              GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
-                            .purpose.size = htonl (sizeof (dvip)),
-                            .monotonic_time = dvl.monotonic_time,
-                            .challenge = lle->challenge};
+    struct DvInitPS dvip = { .purpose.purpose = htonl (
+                               GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+                             .purpose.size = htonl (sizeof(dvip)),
+                             .monotonic_time = dvl.monotonic_time,
+                             .challenge = lle->challenge };
 
     GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
                                                           &dvip.purpose,
@@ -8296,7 +9418,7 @@ start_dv_learn (void *cls)
   /* Do this as close to transmission time as possible! */
   lle->launch_time = GNUNET_TIME_absolute_get ();
 
-  queue_send_msg (qqc.q, NULL, &dvl, sizeof (dvl));
+  queue_send_msg (qqc.q, NULL, &dvl, sizeof(dvl));
   /* reschedule this job, randomizing the time it runs (but no
      actual backoff!) */
   dvlearn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (
@@ -8360,7 +9482,7 @@ neighbour_dv_monotime_cb (void *cls,
     n->dv_monotime_available = GNUNET_YES;
     return;
   }
-  if (sizeof (*mtbe) != record->value_size)
+  if (sizeof(*mtbe) != record->value_size)
   {
     GNUNET_break (0);
     return;
@@ -8388,7 +9510,7 @@ handle_add_queue_message (void *cls,
   const char *addr;
   uint16_t addr_len;
 
-  if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBoxMessage))
+  if (ntohl (aqm->mtu) <= sizeof(struct TransportFragmentBoxMessage))
   {
     /* MTU so small as to be useless for transmissions,
        required for #fragment_message()! */
@@ -8415,14 +9537,14 @@ handle_add_queue_message (void *cls,
                                 &neighbour_dv_monotime_cb,
                                 neighbour);
   }
-  addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
+  addr_len = ntohs (aqm->header.size) - sizeof(*aqm);
   addr = (const char *) &aqm[1];
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "New queue %s to %s available with QID %llu\n",
               addr,
               GNUNET_i2s (&aqm->receiver),
               (unsigned long long) aqm->qid);
-  queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
+  queue = GNUNET_malloc (sizeof(struct Queue) + addr_len);
   queue->tc = tc;
   queue->address = (const char *) &queue[1];
   queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
@@ -8431,10 +9553,11 @@ handle_add_queue_message (void *cls,
   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
   queue->neighbour = neighbour;
+  queue->idle = GNUNET_YES;
   memcpy (&queue[1], addr, addr_len);
   /* notify monitors about new queue */
   {
-    struct MonitorEvent me = {.rtt = queue->pd.aged_rtt, .cs = queue->cs};
+    struct MonitorEvent me = { .rtt = queue->pd.aged_rtt, .cs = queue->cs };
 
     notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   }
@@ -8448,10 +9571,12 @@ handle_add_queue_message (void *cls,
                                 queue);
   /* check if valdiations are waiting for the queue */
   (void)
-    GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                                &aqm->receiver,
-                                                &check_validation_request_pending,
-                                                queue);
+  GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                              &aqm->receiver,
+                                              &check_validation_request_pending,
+                                              queue);
+  /* look for traffic for this queue */
+  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   /* might be our first queue, try launching DV learning */
   if (NULL == dvlearn_task)
     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
@@ -8553,120 +9678,17 @@ handle_suggest_cancel (void *cls, const struct ExpressPreferenceMessage *msg)
 }
 
 
-/**
- * Closure for #check_known_address.
- */
-struct CheckKnownAddressContext
-{
-  /**
-   * Set to the address we are looking for.
-   */
-  const char *address;
-
-  /**
-   * Set to a matching validation state, if one was found.
-   */
-  struct ValidationState *vs;
-};
-
-
-/**
- * Test if the validation state in @a value matches the
- * address from @a cls.
- *
- * @param cls a `struct CheckKnownAddressContext`
- * @param pid unused (must match though)
- * @param value a `struct ValidationState`
- * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
- */
-static int
-check_known_address (void *cls,
-                     const struct GNUNET_PeerIdentity *pid,
-                     void *value)
-{
-  struct CheckKnownAddressContext *ckac = cls;
-  struct ValidationState *vs = value;
-
-  (void) pid;
-  if (0 != strcmp (vs->address, ckac->address))
-    return GNUNET_OK;
-  ckac->vs = vs;
-  return GNUNET_NO;
-}
-
-
-/**
- * Start address validation.
- *
- * @param pid peer the @a address is for
- * @param address an address to reach @a pid (presumably)
- */
-static void
-start_address_validation (const struct GNUNET_PeerIdentity *pid,
-                          const char *address)
-{
-  struct GNUNET_TIME_Absolute now;
-  struct ValidationState *vs;
-  struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
-
-  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                                     pid,
-                                                     &check_known_address,
-                                                     &ckac);
-  if (NULL != (vs = ckac.vs))
-  {
-    /* if 'vs' is not currently valid, we need to speed up retrying the
-     * validation */
-    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
-    {
-      /* reduce backoff as we got a fresh advertisement */
-      vs->challenge_backoff =
-        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
-                                  GNUNET_TIME_relative_divide (vs->challenge_backoff,
-                                                               2));
-      update_next_challenge_time (vs,
-                                  GNUNET_TIME_relative_to_absolute (
-                                    vs->challenge_backoff));
-    }
-    return;
-  }
-  now = GNUNET_TIME_absolute_get ();
-  vs = GNUNET_new (struct ValidationState);
-  vs->pid = *pid;
-  vs->valid_until =
-    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
-  vs->first_challenge_use = now;
-  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
-  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                              &vs->challenge,
-                              sizeof (vs->challenge));
-  vs->address = GNUNET_strdup (address);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Starting address validation `%s' of peer %s using challenge %s\n",
-              address,
-              GNUNET_i2s (pid),
-              GNUNET_sh2s (&vs->challenge.value));
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multipeermap_put (
-                   validation_map,
-                   &vs->pid,
-                   vs,
-                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  update_next_challenge_time (vs, now);
-}
-
-
 /**
  * Function called by PEERSTORE for each matching record.
  *
- * @param cls closure
+ * @param cls closure, a `struct PeerRequest`
  * @param record peerstore record information
  * @param emsg error message, or NULL if no errors
  */
 static void
-handle_hello (void *cls,
-              const struct GNUNET_PEERSTORE_Record *record,
-              const char *emsg)
+handle_hello_for_client (void *cls,
+                         const struct GNUNET_PEERSTORE_Record *record,
+                         const char *emsg)
 {
   struct PeerRequest *pr = cls;
   const char *val;
@@ -8724,10 +9746,10 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg)
   pr->bw = msg->bw;
   pr->pk = (enum GNUNET_MQ_PriorityPreferences) ntohl (msg->pk);
   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_put (
-                      tc->details.application.requests,
-                      &pr->pid,
-                      pr,
-                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+        tc->details.application.requests,
+        &pr->pid,
+        pr,
+        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
   {
     GNUNET_break (0);
     GNUNET_free (pr);
@@ -8738,7 +9760,7 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg)
                                    "transport",
                                    &pr->pid,
                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
-                                   &handle_hello,
+                                   &handle_hello_for_client,
                                    pr);
   GNUNET_SERVICE_client_continue (tc->client);
 }
@@ -8797,7 +9819,7 @@ free_neighbour_cb (void *cls,
 
   (void) cls;
   (void) pid;
-  GNUNET_break (0); // should this ever happen?
+  GNUNET_break (0);  // should this ever happen?
   free_neighbour (neighbour);
 
   return GNUNET_OK;
@@ -8858,9 +9880,7 @@ free_validation_state_cb (void *cls,
  * @return #GNUNET_OK (always)
  */
 static int
-free_pending_ack_cb (void *cls,
-                     const struct GNUNET_ShortHashCode *key,
-                     void *value)
+free_pending_ack_cb (void *cls, const struct GNUNET_Uuid *key, void *value)
 {
   struct PendingAcknowledgement *pa = value;
 
@@ -8903,6 +9923,7 @@ static void
 do_shutdown (void *cls)
 {
   struct LearnLaunchEntry *lle;
+
   (void) cls;
 
   GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL);
@@ -8926,10 +9947,10 @@ do_shutdown (void *cls)
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
   ack_cummulators = NULL;
-  GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
-                                          &free_pending_ack_cb,
-                                          NULL);
-  GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+  GNUNET_CONTAINER_multiuuidmap_iterate (pending_acks,
+                                         &free_pending_ack_cb,
+                                         NULL);
+  GNUNET_CONTAINER_multiuuidmap_destroy (pending_acks);
   pending_acks = NULL;
   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
@@ -8947,6 +9968,9 @@ do_shutdown (void *cls)
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
   validation_map = NULL;
+  while (NULL != ir_head)
+    free_incoming_request (ir_head);
+  GNUNET_assert (0 == ir_total);
   while (NULL != (lle = lle_head))
   {
     GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);
@@ -8980,7 +10004,7 @@ run (void *cls,
   hello_mono_time = GNUNET_TIME_absolute_get_monotonic (c);
   GST_cfg = c;
   backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
-  pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+  pending_acks = GNUNET_CONTAINER_multiuuidmap_create (32768, GNUNET_YES);
   ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);