split T2T-FC from CR for address validation
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
index 08a8ec893fd41a0c524589cc67e48688239467f8..83c057795a1226834ea0235e2434dc8b629dee91 100644 (file)
  *
  * TODO:
  * Implement next:
- * - properly encrypt *all* DV traffic, not only backchannel;
- *   rename BackchannelEncapsulation logic to DVEncapsulation!
- * - realize transport-to-transport flow control (needed in case
- *   communicators do not offer flow control).  Note that we may not
- *   want to simply delay the ACKs as that may cause unnecessary
- *   re-transmissions. => Introduce proper flow and congestion window(s)!
+ * - FIXME-FC: realize transport-to-transport flow control (needed in case
+ *   communicators do not offer flow control).
+ *   We do transmit FC window sizes now.  Left:
+ *   for SENDING)
+ *   - Increment "outbound_fc_window_size_used" on transmission
+ *   - Throttle sending if "outbound_fc_window_size_used" reaches limit
+ *   - Send *new* challenge when we get close to the limit (including
+ *     at the beginning when the limit is zero!)
+ *   - Retransmit challenge if it goes unanswered!
+ *
+ *   for DV)
+ *   - send challenges via DV (when DVH is confirmed *and* we care about
+ *     the target to get window size, or when DVH is unconfirmed (passive
+ *     learning!) to confirm it!)
+ *   - handle challenge responses in this case (note: validity period of addresses
+ *     will be zero!)
+ *   - if available, try to use DV paths when trying to establish
+ *     virtual link for a `struct IncomingRequest`. (i.e. if DVH is
+ *     unconfirmed, incoming requests also trigger challenge-via-DV!)
+ *
  * - review retransmission logic, right now there is no smartness there!
- *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
+ *   => congestion control, etc [PERFORMANCE-BASICS]
  *
  * Optimizations:
  * - When forwarding DV learn messages, if a peer is reached that
@@ -41,7 +55,7 @@
  *   whatever else we could reach.
  * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
  *   => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
- * - queue_send_msg and route_message both by API design have to make copies
+ * - queue_send_msg by API design has to make a copy
  *   of the payload, and route_message on top of that requires a malloc/free.
  *   Change design to approximate "zero" copy better... [CPU]
  * - could avoid copying body of message into each fragment and keep
  *   triggering an explicit validation mechansim ourselves, specifically routing
  *   a challenge-response message over the path [ROUTING]
  * - Track ACK losses based on ACK-counter [ROUTING]
+ * - Fragments send over a reliable channel could do without the
+ *   AcknowledgementUUIDP altogether, as they won't be acked! [BANDWIDTH]
+ *   (-> have 2nd type of acknowledgment message; low priority, as we
+ *       do not have an MTU-limited *reliable* communicator)
+ * - Adapt available_fc_window_size, using larger values for high-bandwidth
+ *   and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls]
+ * - Set last_window_consum_limit promise properly based on
+ *   latency and bandwidth of the respective connection [GOODPUT / utilization / stalls]
+ * - re-sending challenge response without a challenge when we have
+ *   significantly increased the FC window (upon CORE being done with messages)
+ *   so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
+ *   Also can re-use signature in this case [CPU]. Marked with "TODO-M1"
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
  */
 #define GOODPUT_AGING_SLOTS 4
 
+/**
+ * How big is the flow control window size by default;
+ * limits per-neighbour RAM utilization.
+ */
+#define DEFAULT_WINDOW_SIZE (128 * 1024)
+
+/**
+ * For how many incoming connections do we try to create a
+ * virtual link for (at the same time!).  This does NOT
+ * limit the number of incoming connections, just the number
+ * for which we are actively trying to find working addresses
+ * in the absence (!) of our own applications wanting the
+ * link to go up.
+ */
+#define MAX_INCOMING_REQUEST 16
+
 /**
  * Maximum number of peers we select for forwarding DVInit
  * messages at the same time (excluding initiator).
@@ -327,37 +369,10 @@ struct TransportBackchannelEncapsulationMessage
    */
   struct GNUNET_MessageHeader header;
 
-  /**
-   * Reserved, always zero.
-   */
-  uint32_t reserved GNUNET_PACKED;
-
-  /**
-   * Target's peer identity (as backchannels may be transmitted
-   * indirectly, or even be broadcast).
-   */
-  struct GNUNET_PeerIdentity target;
-
-  /**
-   * Ephemeral key setup by the sender for @e target, used
-   * to encrypt the payload.
-   */
-  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
-
-  /**
-   * We use an IV here as the @e ephemeral_key is re-used for
-   * #EPHEMERAL_VALIDITY time to avoid re-signing it all the time.
-   */
-  struct GNUNET_ShortHashCode iv;
-
-  /**
-   * HMAC over the ciphertext of the encrypted, variable-size
-   * body that follows.  Verified via DH of @e target and
-   * @e ephemeral_key
-   */
-  struct GNUNET_HashCode hmac;
+  /* Followed by *another* message header which is the message to
+     the communicator */
 
-  /* Followed by encrypted, variable-size payload */
+  /* Followed by a 0-terminated name of the communicator */
 };
 
 
@@ -405,7 +420,7 @@ struct EphemeralConfirmationPS
  * Plaintext of the variable-size payload that is encrypted
  * within a `struct TransportBackchannelEncapsulationMessage`
  */
-struct TransportBackchannelRequestPayloadP
+struct TransportDVBoxPayloadP
 {
 
   /**
@@ -432,10 +447,7 @@ struct TransportBackchannelRequestPayloadP
   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
 
   /* Followed by a `struct GNUNET_MessageHeader` with a message
-     for a communicator */
-
-  /* Followed by a 0-termianted string specifying the name of
-     the communicator which is to receive the message */
+     for the target peer */
 };
 
 
@@ -746,6 +758,13 @@ struct TransportDVLearnMessage
  * shortcut.
  *
  * If a peer finds itself still on the list, it must drop the message.
+ *
+ * The payload of the box can only be decrypted and verified by the
+ * ultimate receiver. Intermediaries do not learn the sender's
+ * identity and the path the message has taken.  However, the first
+ * hop does learn the sender as @e total_hops would be zero and thus
+ * the predecessor must be the origin (so this is not really useful
+ * for anonymization).
  */
 struct TransportDVBoxMessage
 {
@@ -757,27 +776,48 @@ struct TransportDVBoxMessage
   /**
    * Number of total hops this messages travelled. In NBO.
    * @e origin sets this to zero, to be incremented at
-   * each hop.
+   * each hop.  Peers should limit the @e total_hops value
+   * they accept from other peers.
    */
   uint16_t total_hops GNUNET_PACKED;
 
   /**
-   * Number of hops this messages includes. In NBO.
+   * Number of hops this messages includes. In NBO.  Reduced by one
+   * or more at each hop.  Peers should limit the @e num_hops value
+   * they accept from other peers.
    */
   uint16_t num_hops GNUNET_PACKED;
 
   /**
-   * Identity of the peer that originated the message.
+   * Ephemeral key setup by the sender for target, used to encrypt the
+   * payload.  Intermediaries must not change this value.
    */
-  struct GNUNET_PeerIdentity origin;
+  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+  /**
+   * We use an IV here as the @e ephemeral_key is re-used for
+   * #EPHEMERAL_VALIDITY time to avoid re-signing it all the time.
+   * Intermediaries must not change this value.
+   */
+  struct GNUNET_ShortHashCode iv;
+
+  /**
+   * HMAC over the ciphertext of the encrypted, variable-size body
+   * that follows.  Verified via DH of target and @e ephemeral_key.
+   * Intermediaries must not change this value.
+   */
+  struct GNUNET_HashCode hmac;
 
   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
      excluding the @e origin and the current peer, the last must be
      the ultimate target; if @e num_hops is zero, the receiver of this
      message is the ultimate target. */
 
-  /* Followed by the actual message, which itself may be
-     another box, but not a DV_LEARN or DV_BOX message! */
+  /* Followed by encrypted, variable-size payload, which
+     must begin with a `struct TransportDVBoxPayloadP` */
+
+  /* Followed by the actual message, which itself must not be a
+     a DV_LEARN or DV_BOX message! */
 };
 
 
@@ -794,7 +834,7 @@ struct TransportValidationChallengeMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Zero.
+   * Always zero.
    */
   uint32_t reserved GNUNET_PACKED;
 
@@ -804,8 +844,8 @@ struct TransportValidationChallengeMessage
   struct ChallengeNonceP challenge;
 
   /**
-   * Timestamp of the sender, to be copied into the reply
-   * to allow sender to calculate RTT.
+   * Timestamp of the sender, to be copied into the reply to allow
+   * sender to calculate RTT.  Must be monotonically increasing!
    */
   struct GNUNET_TIME_AbsoluteNBO sender_time;
 };
@@ -837,7 +877,7 @@ struct TransportValidationPS
 
 
 /**
- * Message send to a peer to respond to a
+ * Message  send to a peer to respond to a
  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
  */
 struct TransportValidationResponseMessage
@@ -849,7 +889,7 @@ struct TransportValidationResponseMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Zero.
+   * Always zero.
    */
   uint32_t reserved GNUNET_PACKED;
 
@@ -878,6 +918,66 @@ struct TransportValidationResponseMessage
 };
 
 
+/**
+ * Message for Transport-to-Transport Flow control. Specifies the size
+ * of the flow control window, including how much we believe to have
+ * consumed (at transmission time), how much we believe to be allowed
+ * (at transmission time), and how much the other peer is allowed to
+ * send to us, and how much data we already received from the other
+ * peer.
+ */
+struct TransportFlowControlMessage
+{
+  /**
+   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Sequence number of the flow control message. Incremented by one
+   * for each message.  Starts at zero when a virtual link goes up.
+   * Used to detect one-sided connection drops. On wrap-around, the
+   * flow control counters will be reset as if the connection had
+   * dropped.
+   */ 
+  uint32_t seq GNUNET_PACKED;
+  
+  /**
+   * Flow control window size in bytes, in NBO.
+   * The receiver can send this many bytes at most.
+   */
+  uint64_t inbound_window_size GNUNET_PACKED;
+
+  /**
+   * How many bytes has the sender sent that count for flow control at
+   * this time.  Used to allow the receiver to estimate the packet
+   * loss rate.
+   */
+  uint64_t outbound_sent GNUNET_PACKED;
+
+  /**
+   * Latest flow control window size we learned from the other peer,
+   * in bytes, in NBO.  We are limited to sending at most this many
+   * bytes to the other peer.  May help the other peer detect when
+   * flow control messages were lost and should thus be retransmitted.
+   * In particular, if the delta to @e outbound_sent is too small,
+   * this signals that we are stalled.
+   */
+  uint64_t outbound_window_size GNUNET_PACKED;
+
+  /**
+   * Timestamp of the sender.  Must be monotonically increasing!
+   * Used to enable receiver to ignore out-of-order packets in
+   * combination with the @e seq. Note that @e seq will go down
+   * (back to zero) whenever either side believes the connection
+   * was dropped, allowing the peers to detect that they need to
+   * reset the counters for the number of bytes sent!
+   */
+  struct GNUNET_TIME_AbsoluteNBO sender_time;
+  
+};
+
+
 GNUNET_NETWORK_STRUCT_END
 
 
@@ -977,56 +1077,6 @@ struct LearnLaunchEntry
 };
 
 
-/**
- * Entry in our cache of ephemeral keys we currently use.  This way, we only
- * sign an ephemeral once per @e target, and then can re-use it over multiple
- * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION messages (as
- * signing is expensive and in some cases we may use backchannel messages a
- * lot).
- */
-struct EphemeralCacheEntry
-{
-
-  /**
-   * Target's peer identity (we don't re-use ephemerals
-   * to limit linkability of messages).
-   */
-  struct GNUNET_PeerIdentity target;
-
-  /**
-   * Signature affirming @e ephemeral_key of type
-   * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
-   */
-  struct GNUNET_CRYPTO_EddsaSignature sender_sig;
-
-  /**
-   * How long is @e sender_sig valid
-   */
-  struct GNUNET_TIME_Absolute ephemeral_validity;
-
-  /**
-   * What time was @e sender_sig created
-   */
-  struct GNUNET_TIME_Absolute monotime;
-
-  /**
-   * Our ephemeral key.
-   */
-  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
-
-  /**
-   * Our private ephemeral key.
-   */
-  struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
-
-  /**
-   * Node in the ephemeral cache for this entry.
-   * Used for expiration.
-   */
-  struct GNUNET_CONTAINER_HeapNode *hn;
-};
-
-
 /**
  * Information we keep per #GOODPUT_AGING_SLOTS about historic
  * (or current) transmission performance.
@@ -1102,6 +1152,16 @@ struct PendingMessage;
  */
 struct DistanceVectorHop;
 
+/**
+ * A virtual link is another reachable peer that is known to CORE.  It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`.  With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink;
+
 
 /**
  * Context from #handle_incoming_msg().  Closure for many
@@ -1140,6 +1200,42 @@ struct CommunicatorMessageContext
 };
 
 
+/**
+ * Closure for #core_env_sent_cb.
+ */
+struct CoreSentContext
+{
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *next;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *prev;
+
+  /**
+   * Virtual link this is about.
+   */
+  struct VirtualLink *vl;
+
+  /**
+   * How big was the message.
+   */
+  uint16_t size;
+
+  /**
+   * By how much should we increment @e vl's
+   * incoming_fc_window_size_used once we are done sending to CORE?
+   * Use to ensure we do not increment twice if there is more than one
+   * CORE client.
+   */
+  uint16_t isize;
+};
+
+
 /**
  * A virtual link is another reachable peer that is known to CORE.  It
  * can be either a `struct Neighbour` with at least one confirmed
@@ -1167,6 +1263,26 @@ struct VirtualLink
    */
   struct CommunicatorMessageContext *cmc_tail;
 
+  /**
+   * Head of list of messages pending for this VL.
+   */
+  struct PendingMessage *pending_msg_head;
+
+  /**
+   * Tail of list of messages pending for this VL.
+   */
+  struct PendingMessage *pending_msg_tail;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *csc_tail;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *csc_head;
+
   /**
    * Task scheduled to possibly notfiy core that this peer is no
    * longer counting as confirmed.  Runs the #core_visibility_check(),
@@ -1184,14 +1300,124 @@ struct VirtualLink
    * Distance vector used by this virtual link, NULL if @e n is used.
    */
   struct DistanceVector *dv;
+  
+  /**
+   * Last challenge we received from @a n.
+   * FIXME: where do we need this?
+   */
+  struct ChallengeNonceP n_challenge;
+
+  /**
+   * Last challenge we used with @a n for flow control. 
+   * FIXME: where do we need this?
+   */
+  struct ChallengeNonceP my_challenge;
+
+  /**
+   * Sender timestamp of @e n_challenge, used to generate out-of-order
+   * challenges (as sender's timestamps must be monotonically
+   * increasing).  FIXME: where do we need this?
+   */
+  struct GNUNET_TIME_Absolute n_challenge_time;
+
+  /**
+   * Sender timestamp of the last
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
+   * received.  Note that we do not persist this monotonic time as we
+   * do not really have to worry about ancient flow control window
+   * sizes after restarts.
+   */
+  struct GNUNET_TIME_Absolute last_fc_timestamp;
+
+  /**
+   * Used to generate unique UUIDs for messages that are being
+   * fragmented.
+   */
+  uint64_t message_uuid_ctr;
+
+  /**
+   * Memory allocated for this virtual link.  Expresses how much RAM
+   * we are willing to allocate to this virtual link.  OPTIMIZE-ME:
+   * Can be adapted to dedicate more RAM to links that need it, while
+   * sticking to some overall RAM limit.  For now, set to
+   * #DEFAULT_WINDOW_SIZE.
+   */
+  uint64_t available_fc_window_size;
+
+  /**
+   * Memory actually used to buffer packets on this virtual link.
+   * Expresses how much RAM we are currently using for virtual link.
+   * Note that once CORE is done with a packet, we decrement the value
+   * here.
+   */
+  uint64_t incoming_fc_window_size_ram;
+
+  /**
+   * Last flow control window size we provided to the other peer, in
+   * bytes.  We are allowing the other peer to send this
+   * many bytes.
+   */
+  uint64_t incoming_fc_window_size;
+
+  /**
+   * How much of the window did the other peer successfully use (and
+   * we already passed it on to CORE)? Must be below @e
+   * incoming_fc_window_size.   We should effectively signal the
+   * other peer that the window is this much bigger at the next
+   * opportunity / challenge.
+   */
+  uint64_t incoming_fc_window_size_used;
+
+  /**
+   * What is our current estimate on the message loss rate for the sender?
+   * Based on the difference between how much the sender sent according
+   * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message
+   * (@e outbound_sent field) and how much we actually received at that
+   * time (@e incoming_fc_window_size_used).  This delta is then 
+   * added onto the @e incoming_fc_window_size when determining the
+   * @e outbound_window_size we send to the other peer.  Initially zero.
+   * May be negative if we (due to out-of-order delivery) actually received
+   * more than the sender claims to have sent in its last FC message.
+   */
+  int64_t incoming_fc_window_size_loss;
+
+  /**
+   * Our current flow control window size in bytes.  We
+   * are allowed to transmit this many bytes to @a n as per
+   * our @e my_challenge "account".
+   */
+  uint64_t outbound_fc_window_size;
+
+  /**
+   * How much of our current flow control window size have we
+   * used (in bytes).  Must be below
+   * @e outbound_fc_window_size.
+   */
+  uint64_t outbound_fc_window_size_used;
 
   /**
-   * How many more messages can we send to core before we exhaust
+   * Generator for the sequence numbers of
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL messages we send.
+   */
+  uint32_t fc_seq_gen;
+
+  /**
+   * Last sequence number of a
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
+   * received.
+   */
+  uint32_t last_fc_seq;
+  
+  /**
+   * How many more messages can we send to CORE before we exhaust
    * the receive window of CORE for this peer? If this hits zero,
    * we must tell communicators to stop providing us more messages
    * for this peer.  In fact, the window can go negative as we
    * have multiple communicators, so per communicator we can go
-   * down by one into the negative range.
+   * down by one into the negative range. Furthermore, we count
+   * delivery per CORE client, so if we had multiple cores, that
+   * might also cause a negative window size here (as one message
+   * would decrement the window by one per CORE client).
    */
   int core_recv_window;
 };
@@ -1318,16 +1544,6 @@ struct DistanceVectorHop
    */
   struct DistanceVectorHop *prev_neighbour;
 
-  /**
-   * Head of MDLL of messages routed via this path.
-   */
-  struct PendingMessage *pending_msg_head;
-
-  /**
-   * Tail of MDLL of messages routed via this path.
-   */
-  struct PendingMessage *pending_msg_tail;
-
   /**
    * Head of DLL of PAs that used our @a path.
    */
@@ -1415,7 +1631,33 @@ struct DistanceVector
    * Do we have a confirmed working queue and are thus visible to
    * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct VirtualLink *link;
+  struct VirtualLink *vl;
+
+  /**
+   * Signature affirming @e ephemeral_key of type
+   * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
+   */
+  struct GNUNET_CRYPTO_EddsaSignature sender_sig;
+
+  /**
+   * How long is @e sender_sig valid
+   */
+  struct GNUNET_TIME_Absolute ephemeral_validity;
+
+  /**
+   * What time was @e sender_sig created
+   */
+  struct GNUNET_TIME_Absolute monotime;
+
+  /**
+   * Our ephemeral key.
+   */
+  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+  /**
+   * Our private ephemeral key.
+   */
+  struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
 };
 
 
@@ -1579,6 +1821,12 @@ struct Queue
    * Connection status for this queue.
    */
   enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+  /**
+   * Set to #GNUNET_YES if this queue is idle waiting for some
+   * virtual link to give it a pending message.
+   */
+  int idle;
 };
 
 
@@ -1672,16 +1920,6 @@ struct Neighbour
    */
   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
 
-  /**
-   * Head of list of messages pending for this neighbour.
-   */
-  struct PendingMessage *pending_msg_head;
-
-  /**
-   * Tail of list of messages pending for this neighbour.
-   */
-  struct PendingMessage *pending_msg_tail;
-
   /**
    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
    * purged if this neighbour goes down.
@@ -1720,7 +1958,7 @@ struct Neighbour
    * Do we have a confirmed working queue and are thus visible to
    * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct VirtualLink *link;
+  struct VirtualLink *vl;
 
   /**
    * Latest DVLearn monotonic time seen from this peer.  Initialized only
@@ -1728,12 +1966,6 @@ struct Neighbour
    */
   struct GNUNET_TIME_Absolute last_dv_learn_monotime;
 
-  /**
-   * Used to generate unique UUIDs for messages that are being
-   * fragmented.
-   */
-  uint64_t message_uuid_ctr;
-
   /**
    * Do we have the lastest value for @e last_dv_learn_monotime from
    * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
@@ -1743,18 +1975,47 @@ struct Neighbour
 
 
 /**
- * A peer that an application (client) would like us to talk to directly.
+ * Another peer attempted to talk to us, we should try to establish
+ * a connection in the other direction.
  */
-struct PeerRequest
+struct IncomingRequest
 {
 
   /**
-   * Which peer is this about?
+   * Kept in a DLL.
    */
-  struct GNUNET_PeerIdentity pid;
+  struct IncomingRequest *next;
 
   /**
-   * Client responsible for the request.
+   * Kept in a DLL.
+   */
+  struct IncomingRequest *prev;
+
+  /**
+   * Handle for watching the peerstore for HELLOs for this peer.
+   */
+  struct GNUNET_PEERSTORE_WatchContext *wc;
+
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity pid;
+};
+
+
+/**
+ * A peer that an application (client) would like us to talk to directly.
+ */
+struct PeerRequest
+{
+
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity pid;
+
+  /**
+   * Client responsible for the request.
    */
   struct TransportClient *tc;
 
@@ -1796,17 +2057,7 @@ enum PendingMessageType
   /**
    * Reliability box.
    */
-  PMT_RELIABILITY_BOX = 2,
-
-  /**
-   * Any type of acknowledgement.
-   */
-  PMT_ACKNOWLEDGEMENT = 3,
-
-  /**
-   * Control traffic generated by the TRANSPORT service itself.
-   */
-  PMT_CONTROL = 4
+  PMT_RELIABILITY_BOX = 2
 
 };
 
@@ -1840,14 +2091,14 @@ enum PendingMessageType
 struct PendingMessage
 {
   /**
-   * Kept in a MDLL of messages for this @a target.
+   * Kept in a MDLL of messages for this @a vl.
    */
-  struct PendingMessage *next_neighbour;
+  struct PendingMessage *next_vl;
 
   /**
-   * Kept in a MDLL of messages for this @a target.
+   * Kept in a MDLL of messages for this @a vl.
    */
-  struct PendingMessage *prev_neighbour;
+  struct PendingMessage *prev_vl;
 
   /**
    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
@@ -1871,18 +2122,6 @@ struct PendingMessage
    */
   struct PendingMessage *prev_frag;
 
-  /**
-   * Kept in a MDLL of messages using this @a dvh (if @e dvh is
-   * non-NULL).
-   */
-  struct PendingMessage *next_dvh;
-
-  /**
-   * Kept in a MDLL of messages using this @a dvh (if @e dvh is
-   * non-NULL).
-   */
-  struct PendingMessage *prev_dvh;
-
   /**
    * Head of DLL of PAs for this pending message.
    */
@@ -1900,16 +2139,9 @@ struct PendingMessage
   struct PendingMessage *bpm;
 
   /**
-   * Target of the request (for transmission, may not be ultimate
-   * destination!).
+   * Target of the request (always the ultimate destination!).
    */
-  struct Neighbour *target;
-
-  /**
-   * Distance vector path selected for this message, or
-   * NULL if transmitted directly.
-   */
-  struct DistanceVectorHop *dvh;
+  struct VirtualLink *vl;
 
   /**
    * Set to non-NULL value if this message is currently being given to a
@@ -2341,6 +2573,13 @@ struct ValidationState
    */
   struct GNUNET_PEERSTORE_StoreContext *sc;
 
+  /**
+   * Self-imposed limit on the previous flow control window. (May be zero,
+   * if we never used data from the previous window or are establishing the
+   * connection for the first time).
+   */
+  uint32_t last_window_consum_limit;
+
   /**
    * We are technically ready to send the challenge, but we are waiting for
    * the respective queue to become available for transmission.
@@ -2507,26 +2746,6 @@ static struct GNUNET_CONTAINER_Heap *validation_heap;
  */
 static struct GNUNET_PEERSTORE_Handle *peerstore;
 
-/**
- * Heap sorting `struct EphemeralCacheEntry` by their
- * key/signature validity.
- */
-static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
-
-/**
- * Hash map for looking up `struct EphemeralCacheEntry`s
- * by peer identity. (We may have ephemerals in our
- * cache for which we do not have a neighbour entry,
- * and similar many neighbours may not need ephemerals,
- * so we use a second map.)
- */
-static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
-
-/**
- * Task to free expired ephemerals.
- */
-static struct GNUNET_SCHEDULER_Task *ephemeral_task;
-
 /**
  * Task run to initiate DV learning.
  */
@@ -2549,6 +2768,23 @@ static struct PendingAcknowledgement *pa_head;
  */
 static struct PendingAcknowledgement *pa_tail;
 
+/**
+ * List of incomming connections where we are trying
+ * to get a connection back established. Length
+ * kept in #ir_total.
+ */
+static struct IncomingRequest *ir_head;
+
+/**
+ * Tail of DLL starting at #ir_head.
+ */
+static struct IncomingRequest *ir_tail;
+
+/**
+ * Length of the DLL starting at #ir_head.
+ */
+static unsigned int ir_total;
+
 /**
  * Generator of `logging_uuid` in `struct PendingMessage`.
  */
@@ -2591,6 +2827,22 @@ get_age ()
 }
 
 
+/**
+ * Release @a ir data structure.
+ *
+ * @param ir data structure to release
+ */
+static void
+free_incoming_request (struct IncomingRequest *ir)
+{
+  GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
+  GNUNET_assert (ir_total > 0);
+  ir_total--;
+  GNUNET_PEERSTORE_watch_cancel (ir->wc);
+  GNUNET_free (ir);
+}
+
+
 /**
  * Release @a pa data structure.
  *
@@ -2629,16 +2881,76 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa)
 
 
 /**
- * Free cached ephemeral key.
+ * Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
+ *
+ * @param root root of the tree to free
+ */
+static void
+free_fragment_tree (struct PendingMessage *root)
+{
+  struct PendingMessage *frag;
+
+  while (NULL != (frag = root->head_frag))
+  {
+    struct PendingAcknowledgement *pa;
+
+    free_fragment_tree (frag);
+    while (NULL != (pa = frag->pa_head))
+    {
+      GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+      pa->pm = NULL;
+    }
+    GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
+    GNUNET_free (frag);
+  }
+}
+
+
+/**
+ * Release memory associated with @a pm and remove @a pm from associated
+ * data structures.  @a pm must be a top-level pending message and not
+ * a fragment in the tree.  The entire tree is freed (if applicable).
  *
- * @param ece cached signature to free
+ * @param pm the pending message to free
  */
 static void
-free_ephemeral (struct EphemeralCacheEntry *ece)
+free_pending_message (struct PendingMessage *pm)
 {
-  GNUNET_CONTAINER_multipeermap_remove (ephemeral_map, &ece->target, ece);
-  GNUNET_CONTAINER_heap_remove_node (ece->hn);
-  GNUNET_free (ece);
+  struct TransportClient *tc = pm->client;
+  struct VirtualLink *vl = pm->vl;
+  struct PendingAcknowledgement *pa;
+
+  if (NULL != tc)
+  {
+    GNUNET_CONTAINER_MDLL_remove (client,
+                                  tc->details.core.pending_msg_head,
+                                  tc->details.core.pending_msg_tail,
+                                  pm);
+  }
+  if (NULL != vl)
+  {
+    GNUNET_CONTAINER_MDLL_remove (vl,
+                                  vl->pending_msg_head,
+                                  vl->pending_msg_tail,
+                                  pm);
+  }
+  while (NULL != (pa = pm->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+
+  free_fragment_tree (pm);
+  if (NULL != pm->qe)
+  {
+    GNUNET_assert (pm == pm->qe->pm);
+    pm->qe->pm = NULL;
+  }
+  GNUNET_free_non_null (pm->bpm);
+  GNUNET_free (pm);
 }
 
 
@@ -2650,12 +2962,23 @@ free_ephemeral (struct EphemeralCacheEntry *ece)
 static void
 free_virtual_link (struct VirtualLink *vl)
 {
+  struct PendingMessage *pm;
+  struct CoreSentContext *csc;
+
+  while (NULL != (pm = vl->pending_msg_head))
+    free_pending_message (pm);
   GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
   if (NULL != vl->visibility_task)
   {
     GNUNET_SCHEDULER_cancel (vl->visibility_task);
     vl->visibility_task = NULL;
   }
+  while (NULL != (csc = vl->csc_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
+    GNUNET_assert (vl == csc->vl);
+    csc->vl = NULL;
+  }
   GNUNET_break (NULL == vl->n);
   GNUNET_break (NULL == vl->dv);
   GNUNET_free (vl);
@@ -2684,7 +3007,7 @@ free_validation_state (struct ValidationState *vs)
 
 
 /**
- * Lookup neighbour record for peer @a pid.
+ * Lookup neighbour for peer @a pid.
  *
  * @param pid neighbour to look for
  * @return NULL if we do not have this peer as a neighbour
@@ -2696,6 +3019,19 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
 }
 
 
+/**
+ * Lookup virtual link for peer @a pid.
+ *
+ * @param pid virtual link to look for
+ * @return NULL if we do not have this peer as a virtual link
+ */
+static struct VirtualLink *
+lookup_virtual_link (const struct GNUNET_PeerIdentity *pid)
+{
+  return GNUNET_CONTAINER_multipeermap_get (links, pid);
+}
+
+
 /**
  * Details about what to notify monitors about.
  */
@@ -2744,16 +3080,7 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
   struct Neighbour *n = dvh->next_hop;
   struct DistanceVector *dv = dvh->dv;
   struct PendingAcknowledgement *pa;
-  struct PendingMessage *pm;
 
-  while (NULL != (pm = dvh->pending_msg_head))
-  {
-    GNUNET_CONTAINER_MDLL_remove (dvh,
-                                  dvh->pending_msg_head,
-                                  dvh->pending_msg_tail,
-                                  pm);
-    pm->dvh = NULL;
-  }
   while (NULL != (pa = dvh->pa_head))
   {
     GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
@@ -2765,6 +3092,41 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
 }
 
 
+/**
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
+ *
+ * @param cls a `struct VirtualLink`
+ */
+static void
+check_link_down (void *cls);
+
+
+/**
+ * Send message to CORE clients that we lost a connection.
+ *
+ * @param pid peer the connection was for
+ */
+static void
+cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about disconnect from %s\n",
+              GNUNET_i2s (pid));
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
+    struct GNUNET_MQ_Envelope *env;
+    struct DisconnectInfoMessage *dim;
+
+    if (CT_CORE != tc->type)
+      continue;
+    env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+    dim->peer = *pid;
+    GNUNET_MQ_send (tc->mq, env);
+  }
+}
+
+
 /**
  * Free entry in #dv_routes.  First frees all hops to the target, and
  * if there are no entries left, frees @a dv as well.
@@ -2780,11 +3142,33 @@ free_dv_route (struct DistanceVector *dv)
     free_distance_vector_hop (dvh);
   if (NULL == dv->dv_head)
   {
+    struct VirtualLink *vl;
+
     GNUNET_assert (
       GNUNET_YES ==
       GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
+    if (NULL != (vl = dv->vl))
+    {
+      GNUNET_assert (dv == vl->dv);
+      vl->dv = NULL;
+      if (NULL == vl->n)
+      {
+        cores_send_disconnect_info (&dv->target);
+        free_virtual_link (vl);
+      }
+      else
+      {
+        GNUNET_SCHEDULER_cancel (vl->visibility_task);
+        vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+      }
+      dv->vl = NULL;
+    }
+
     if (NULL != dv->timeout_task)
+    {
       GNUNET_SCHEDULER_cancel (dv->timeout_task);
+      dv->timeout_task = NULL;
+    }
     GNUNET_free (dv);
   }
 }
@@ -2964,6 +3348,7 @@ static void
 free_neighbour (struct Neighbour *neighbour)
 {
   struct DistanceVectorHop *dvh;
+  struct VirtualLink *vl;
 
   GNUNET_assert (NULL == neighbour->queue_head);
   GNUNET_assert (GNUNET_YES ==
@@ -3003,6 +3388,22 @@ free_neighbour (struct Neighbour *neighbour)
     GNUNET_PEERSTORE_store_cancel (neighbour->sc);
     neighbour->sc = NULL;
   }
+  if (NULL != (vl = neighbour->vl))
+  {
+    GNUNET_assert (neighbour == vl->n);
+    vl->n = NULL;
+    if (NULL == vl->dv)
+    {
+      cores_send_disconnect_info (&vl->target);
+      free_virtual_link (vl);
+    }
+    else
+    {
+      GNUNET_SCHEDULER_cancel (vl->visibility_task);
+      vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+    }
+    neighbour->vl = NULL;
+  }
   GNUNET_free (neighbour);
 }
 
@@ -3047,31 +3448,6 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
 }
 
 
-/**
- * Send message to CORE clients that we lost a connection.
- *
- * @param pid peer the connection was for
- */
-static void
-cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Informing CORE clients about disconnect from %s\n",
-              GNUNET_i2s (pid));
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
-  {
-    struct GNUNET_MQ_Envelope *env;
-    struct DisconnectInfoMessage *dim;
-
-    if (CT_CORE != tc->type)
-      continue;
-    env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
-    dim->peer = *pid;
-    GNUNET_MQ_send (tc->mq, env);
-  }
-}
-
-
 /**
  * We believe we are ready to transmit a message on a queue. Gives the
  * message to the communicator for transmission (updating the tracker,
@@ -3084,25 +3460,16 @@ transmit_on_queue (void *cls);
 
 
 /**
- * Schedule next run of #transmit_on_queue().  Does NOTHING if
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards!  This function must only
- * be called if the message queue is non-empty!
+ * Called whenever something changed that might effect when we
+ * try to do the next transmission on @a queue using #transmit_on_queue().
  *
  * @param queue the queue to do scheduling for
- * @param inside_job set to #GNUNET_YES if called from
- *            #transmit_on_queue() itself and NOT setting
- *            the task means running immediately
+ * @param p task priority to use, if @a queue is scheduled
  */
 static void
-schedule_transmit_on_queue (struct Queue *queue, int inside_job)
+schedule_transmit_on_queue (struct Queue *queue,
+                            enum GNUNET_SCHEDULER_Priority p)
 {
-  struct Neighbour *n = queue->neighbour;
-  struct PendingMessage *pm = n->pending_msg_head;
-  struct GNUNET_TIME_Relative out_delay;
-
-  GNUNET_assert (NULL != pm);
   if (queue->tc->details.communicator.total_queue_length >=
       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
   {
@@ -3111,6 +3478,7 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
       "# Transmission throttled due to communicator queue limit",
       1,
       GNUNET_NO);
+    queue->idle = GNUNET_NO;
     return;
   }
   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
@@ -3119,35 +3487,18 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
                               "# Transmission throttled due to queue queue limit",
                               1,
                               GNUNET_NO);
+    queue->idle = GNUNET_NO;
     return;
   }
-
-  out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt);
-  if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
-  {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Schedule transmission <%llu> on queue %llu of %s decides to run immediately\n",
-      pm->logging_uuid,
-      (unsigned long long) queue->qid,
-      GNUNET_i2s (&n->pid));
-    return; /* we should run immediately! */
-  }
-  /* queue has changed since we were scheduled, reschedule again */
+  /* queue might indeed be ready, schedule it */
+  if (NULL != queue->transmit_task)
+    GNUNET_SCHEDULER_cancel (queue->transmit_task);
   queue->transmit_task =
-    GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
-  if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Next transmission <%llu> on queue `%s' in %s (high delay)\n",
-                pm->logging_uuid,
-                queue->address,
-                GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Next transmission <%llu> on queue `%s' in %s\n",
-                pm->logging_uuid,
-                queue->address,
-                GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
+    GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Considering transmission on queue `%s' to %s\n",
+              queue->address,
+              GNUNET_i2s (&queue->neighbour->pid));
 }
 
 
@@ -3172,15 +3523,21 @@ check_link_down (void *cls)
        pos = pos->next_dv)
     dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
   if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+  {
+    vl->dv->vl = NULL;
     vl->dv = NULL;
+  }
   q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
   for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
     q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
-  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+  {
+    vl->n->vl = NULL;
     vl->n = NULL;
+  }
   if ((NULL == vl->n) && (NULL == vl->dv))
   {
-    cores_send_disconnect_info (&dv->target);
+    cores_send_disconnect_info (&vl->target);
     free_virtual_link (vl);
     return;
   }
@@ -3245,7 +3602,7 @@ free_queue (struct Queue *queue)
   if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
                    tc->details.communicator.total_queue_length))
   {
-    /* Communicator dropped below threshold, resume all queues */
+    /* Communicator dropped below threshold, resume all _other_ queues */
     GNUNET_STATISTICS_update (
       GST_stats,
       "# Transmission throttled due to communicator queue limit",
@@ -3253,12 +3610,12 @@ free_queue (struct Queue *queue)
       GNUNET_NO);
     for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
          s = s->next_client)
-      schedule_transmit_on_queue (s, GNUNET_NO);
+      schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   GNUNET_free (queue);
 
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+  vl = lookup_virtual_link (&neighbour->pid);
   if ((NULL != vl) && (neighbour == vl->n))
   {
     GNUNET_SCHEDULER_cancel (vl->visibility_task);
@@ -3487,85 +3844,6 @@ check_client_send (void *cls, const struct OutboundMessage *obm)
 }
 
 
-/**
- * Free fragment tree below @e root, excluding @e root itself.
- * FIXME: this does NOT seem to have the intended semantics
- * based on how this is called. Seems we generally DO expect
- * @a root to be free'ed itself as well!
- *
- * @param root root of the tree to free
- */
-static void
-free_fragment_tree (struct PendingMessage *root)
-{
-  struct PendingMessage *frag;
-
-  while (NULL != (frag = root->head_frag))
-  {
-    struct PendingAcknowledgement *pa;
-
-    free_fragment_tree (frag);
-    while (NULL != (pa = frag->pa_head))
-    {
-      GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
-      pa->pm = NULL;
-    }
-    GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
-    GNUNET_free (frag);
-  }
-}
-
-
-/**
- * Release memory associated with @a pm and remove @a pm from associated
- * data structures.  @a pm must be a top-level pending message and not
- * a fragment in the tree.  The entire tree is freed (if applicable).
- *
- * @param pm the pending message to free
- */
-static void
-free_pending_message (struct PendingMessage *pm)
-{
-  struct TransportClient *tc = pm->client;
-  struct Neighbour *target = pm->target;
-  struct DistanceVectorHop *dvh = pm->dvh;
-  struct PendingAcknowledgement *pa;
-
-  if (NULL != tc)
-  {
-    GNUNET_CONTAINER_MDLL_remove (client,
-                                  tc->details.core.pending_msg_head,
-                                  tc->details.core.pending_msg_tail,
-                                  pm);
-  }
-  if (NULL != dvh)
-  {
-    GNUNET_CONTAINER_MDLL_remove (dvh,
-                                  dvh->pending_msg_head,
-                                  dvh->pending_msg_tail,
-                                  pm);
-  }
-  GNUNET_CONTAINER_MDLL_remove (neighbour,
-                                target->pending_msg_head,
-                                target->pending_msg_tail,
-                                pm);
-  while (NULL != (pa = pm->pa_head))
-  {
-    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
-    pa->pm = NULL;
-  }
-
-  free_fragment_tree (pm);
-  if (NULL != pm->qe)
-  {
-    GNUNET_assert (pm == pm->qe->pm);
-    pm->qe->pm = NULL;
-  }
-  GNUNET_free_non_null (pm->bpm);
-  GNUNET_free (pm);
-}
-
-
 /**
  * Send a response to the @a pm that we have processed a "send"
  * request.  Sends a confirmation to the "core" client responsible for
@@ -3577,90 +3855,24 @@ static void
 client_send_response (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
-  struct Neighbour *target = pm->target;
+  struct VirtualLink *vl = pm->vl;
   struct GNUNET_MQ_Envelope *env;
   struct SendOkMessage *som;
 
   if (NULL != tc)
   {
     env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->peer = target->pid;
+    som->peer = vl->target;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Confirming transmission of <%llu> to %s\n",
                 pm->logging_uuid,
-                GNUNET_i2s (&pm->target->pid));
+                GNUNET_i2s (&vl->target));
     GNUNET_MQ_send (tc->mq, env);
   }
   free_pending_message (pm);
 }
 
 
-/**
- * Create a DV Box message.
- *
- * @param total_hops how many hops did the message take so far
- * @param num_hops length of the @a hops array
- * @param origin origin of the message
- * @param hops next peer(s) to the destination, including destination
- * @param payload encrypted (!) payload of the box
- * @param payload_size number of bytes in @a payload
- * @return boxed message (caller must #GNUNET_free() it).
- */
-static struct TransportDVBoxMessage *
-create_dv_box (uint16_t total_hops,
-               const struct GNUNET_PeerIdentity *origin,
-               const struct GNUNET_PeerIdentity *target,
-               uint16_t num_hops,
-               const struct GNUNET_PeerIdentity *hops,
-               const void *payload,
-               uint16_t payload_size)
-{
-  struct TransportDVBoxMessage *dvb;
-  struct GNUNET_PeerIdentity *dhops;
-
-  GNUNET_assert (UINT16_MAX <
-                 sizeof (struct TransportDVBoxMessage) +
-                   sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
-                   payload_size);
-  dvb = GNUNET_malloc (sizeof (struct TransportDVBoxMessage) +
-                       sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
-                       payload_size);
-  dvb->header.size =
-    htons (sizeof (struct TransportDVBoxMessage) +
-           sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) + payload_size);
-  dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
-  dvb->total_hops = htons (total_hops);
-  dvb->num_hops = htons (num_hops + 1);
-  dvb->origin = *origin;
-  dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
-  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
-  dhops[num_hops] = *target;
-  memcpy (&dhops[num_hops + 1], payload, payload_size);
-
-  if (GNUNET_EXTRA_LOGGING > 0)
-  {
-    char *path;
-
-    path = GNUNET_strdup (GNUNET_i2s (&dvb->origin));
-    for (unsigned int i = 0; i <= num_hops; i++)
-    {
-      char *tmp;
-
-      GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
-      GNUNET_free (path);
-      path = tmp;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Creating DVBox for %u bytes of payload via %s\n",
-                (unsigned int) payload_size,
-                path);
-    GNUNET_free (path);
-  }
-
-  return dvb;
-}
-
-
 /**
  * Pick @a hops_array_length random DV paths satisfying @a options
  *
@@ -3740,6 +3952,79 @@ pick_random_dv_hops (const struct DistanceVector *dv,
 }
 
 
+/**
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
+ *
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
+ */
+static void
+check_vl_transmission (struct VirtualLink *vl)
+{
+  struct Neighbour *n = vl->n;
+  struct DistanceVector *dv = vl->dv;
+  struct GNUNET_TIME_Absolute now;
+  int elig;
+
+  /* FIXME-FC: need to implement virtual link flow control! */
+
+  /* Check that we have an eligible pending message!
+     (cheaper than having #transmit_on_queue() find out!) */
+  elig = GNUNET_NO;
+  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+       pm = pm->next_vl)
+  {
+    if (NULL != pm->qe)
+      continue; /* not eligible, is in a queue! */
+    elig = GNUNET_YES;
+    break;
+  }
+  if (GNUNET_NO == elig)
+    return;
+
+  /* Notify queues at direct neighbours that we are interested */
+  now = GNUNET_TIME_absolute_get ();
+  if (NULL != n)
+  {
+    for (struct Queue *queue = n->queue_head; NULL != queue;
+         queue = queue->next_neighbour)
+      if ((GNUNET_YES == queue->idle) &&
+          (queue->validated_until.abs_value_us > now.abs_value_us))
+        schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+  }
+  /* Notify queues via DV that we are interested */
+  if (NULL != dv)
+  {
+    /* Do DV with lower scheduler priority, which effectively means that
+       IF a neighbour exists and is available, we prefer it. */
+    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+         pos = pos->next_dv)
+    {
+      struct Neighbour *nh = pos->next_hop;
+
+      if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+        continue; /* skip this one: path not validated */
+      for (struct Queue *queue = nh->queue_head; NULL != queue;
+           queue = queue->next_neighbour)
+        if ((GNUNET_YES == queue->idle) &&
+            (queue->validated_until.abs_value_us > now.abs_value_us))
+          schedule_transmit_on_queue (queue,
+                                      GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+    }
+  }
+}
+
+
 /**
  * Client asked for transmission to a peer.  Process the request.
  *
@@ -3752,14 +4037,7 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
   struct TransportClient *tc = cls;
   struct PendingMessage *pm;
   const struct GNUNET_MessageHeader *obmm;
-  struct Neighbour *target;
-  struct DistanceVector *dv;
-  struct DistanceVectorHop *dvh;
   uint32_t bytes_msg;
-  int was_empty;
-  const void *payload;
-  size_t payload_size;
-  struct TransportDVBoxMessage *dvb;
   struct VirtualLink *vl;
   enum GNUNET_MQ_PriorityPreferences pp;
 
@@ -3767,7 +4045,7 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
   bytes_msg = ntohs (obmm->size);
   pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+  vl = lookup_virtual_link (&obm->peer);
   if (NULL == vl)
   {
     /* Failure: don't have this peer as a neighbour (anymore).
@@ -3781,87 +4059,28 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                               GNUNET_NO);
     return;
   }
-  target = lookup_neighbour (&obm->peer);
-  if (NULL == target)
-    dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
-  else
-    dv = NULL;
-  GNUNET_assert ((NULL != target) || (NULL != dv));
-  if (NULL == target)
-  {
-    unsigned int res;
-    struct DistanceVectorHop *dvh;
-
-    res = pick_random_dv_hops (dv, RMO_NONE, &dvh, 1);
-    GNUNET_assert (1 == res);
-    target = dvh->next_hop;
-    /* FIXME: encrypt bytes_msg at &obm[1] to &obm->peer first! */
-    dvb = create_dv_box (0,
-                         &GST_my_identity,
-                         &obm->peer,
-                         dvh->distance,
-                         dvh->path,
-                         &obm[1],
-                         bytes_msg);
-    payload = dvb;
-    payload_size = ntohs (dvb->header.size);
-  }
-  else
-  {
-    dvh = NULL;
-    dvb = NULL;
-    payload = &obm[1];
-    payload_size = bytes_msg;
-  }
 
-  was_empty = (NULL == target->pending_msg_head);
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + payload_size);
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
   pm->logging_uuid = logging_uuid_gen++;
   pm->prefs = pp;
   pm->client = tc;
-  pm->target = target;
-  pm->bytes_msg = payload_size;
-  memcpy (&pm[1], payload, payload_size);
-  GNUNET_free_non_null (dvb);
-  dvb = NULL;
-  pm->dvh = dvh;
+  pm->vl = vl;
+  pm->bytes_msg = bytes_msg;
+  memcpy (&pm[1], obmm, bytes_msg);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending %u bytes as <%llu> to %s using %s\n",
+              "Sending %u bytes as <%llu> to %s\n",
               bytes_msg,
               pm->logging_uuid,
-              GNUNET_i2s (&obm->peer),
-              (NULL == target) ? "distance vector path" : "direct queue");
-  if (NULL != dvh)
-  {
-    GNUNET_CONTAINER_MDLL_insert (dvh,
-                                  dvh->pending_msg_head,
-                                  dvh->pending_msg_tail,
-                                  pm);
-  }
-  GNUNET_CONTAINER_MDLL_insert (neighbour,
-                                target->pending_msg_head,
-                                target->pending_msg_tail,
-                                pm);
+              GNUNET_i2s (&obm->peer));
   GNUNET_CONTAINER_MDLL_insert (client,
                                 tc->details.core.pending_msg_head,
                                 tc->details.core.pending_msg_tail,
                                 pm);
-  if (! was_empty)
-    return; /* all queues must already be busy */
-  for (struct Queue *queue = target->queue_head; NULL != queue;
-       queue = queue->next_neighbour)
-  {
-    /* try transmission on any queue that is idle */
-    if (NULL == queue->transmit_task)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Queue %llu to %s is idle, triggering transmission\n",
-                  (unsigned long long) queue->qid,
-                  GNUNET_i2s (&queue->neighbour->pid));
-      queue->transmit_task =
-        GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
-    }
-  }
+  GNUNET_CONTAINER_MDLL_insert (vl,
+                                vl->pending_msg_head,
+                                vl->pending_msg_tail,
+                                pm);
+  check_vl_transmission (vl);
 }
 
 
@@ -3941,7 +4160,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+  vl = lookup_virtual_link (&rom->peer);
   if (NULL == vl)
   {
     GNUNET_STATISTICS_update (GST_stats,
@@ -4015,13 +4234,6 @@ check_communicator_backchannel (
 
   (void) cls;
   msize = ntohs (cb->header.size) - sizeof (*cb);
-  if (((size_t) (UINT16_MAX - msize)) >
-      sizeof (struct TransportBackchannelEncapsulationMessage) +
-        sizeof (struct TransportBackchannelRequestPayloadP))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
   isize = ntohs (inbox->size);
   if (isize >= msize)
@@ -4032,7 +4244,7 @@ check_communicator_backchannel (
   is = (const char *) inbox;
   is += isize;
   msize -= isize;
-  GNUNET_assert (msize > 0);
+  GNUNET_assert (0 < msize);
   if ('\0' != is[msize - 1])
   {
     GNUNET_break (0);
@@ -4043,102 +4255,37 @@ check_communicator_backchannel (
 
 
 /**
- * Remove memory used by expired ephemeral keys.
- *
- * @param cls NULL
- */
-static void
-expire_ephemerals (void *cls)
-{
-  struct EphemeralCacheEntry *ece;
-
-  (void) cls;
-  ephemeral_task = NULL;
-  while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
-  {
-    if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
-               .rel_value_us)
-    {
-      free_ephemeral (ece);
-      continue;
-    }
-    ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
-                                              &expire_ephemerals,
-                                              NULL);
-    return;
-  }
-}
-
-
-/**
- * Lookup ephemeral key in our #ephemeral_map. If no valid one exists,
- * generate one, cache it and return it.
+ * Ensure ephemeral keys in our @a dv are current. If no current one exists,
+ * set it up.
  *
- * @param pid peer to look up ephemeral for
- * @param private_key[out] set to the private key
- * @param ephemeral_key[out] set to the key
- * @param ephemeral_sender_sig[out] set to the signature
- * @param monotime[out] set to the monotime used for the signature
+ * @param dv[in,out] virtual link to update ephemeral for
  */
 static void
-lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
-                  struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
-                  struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
-                  struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
-                  struct GNUNET_TIME_Absolute *monotime)
+update_ephemeral (struct DistanceVector *dv)
 {
-  struct EphemeralCacheEntry *ece;
   struct EphemeralConfirmationPS ec;
 
-  ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map, pid);
-  if ((NULL != ece) &&
-      (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
-              .rel_value_us))
-  {
-    free_ephemeral (ece);
-    ece = NULL;
-  }
-  if (NULL == ece)
-  {
-    ece = GNUNET_new (struct EphemeralCacheEntry);
-    ece->target = *pid;
-    ece->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
-    ece->ephemeral_validity =
-      GNUNET_TIME_absolute_add (ece->monotime, EPHEMERAL_VALIDITY);
-    GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
-    GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, &ece->ephemeral_key);
-    ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
-    ec.purpose.size = htonl (sizeof (ec));
-    ec.target = *pid;
-    ec.ephemeral_key = ece->ephemeral_key;
-    GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                                          &ec.purpose,
-                                                          &ece->sender_sig));
-    ece->hn =
-      GNUNET_CONTAINER_heap_insert (ephemeral_heap,
-                                    ece,
-                                    ece->ephemeral_validity.abs_value_us);
-    GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONTAINER_multipeermap_put (
-                     ephemeral_map,
-                     &ece->target,
-                     ece,
-                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    if (NULL == ephemeral_task)
-      ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
-                                                &expire_ephemerals,
-                                                NULL);
-  }
-  *private_key = ece->private_key;
-  *ephemeral_key = ece->ephemeral_key;
-  *ephemeral_sender_sig = ece->sender_sig;
-  *monotime = ece->monotime;
+  if (0 !=
+      GNUNET_TIME_absolute_get_remaining (dv->ephemeral_validity).rel_value_us)
+    return;
+  dv->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+  dv->ephemeral_validity =
+    GNUNET_TIME_absolute_add (dv->monotime, EPHEMERAL_VALIDITY);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CRYPTO_ecdhe_key_create2 (&dv->private_key));
+  GNUNET_CRYPTO_ecdhe_key_get_public (&dv->private_key, &dv->ephemeral_key);
+  ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+  ec.purpose.size = htonl (sizeof (ec));
+  ec.target = dv->target;
+  ec.ephemeral_key = dv->ephemeral_key;
+  GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                                        &ec.purpose,
+                                                        &dv->sender_sig));
 }
 
 
 /**
- * Send the control message @a payload on @a queue.
+ * Send the message @a payload on @a queue.
  *
  * @param queue the queue to use for transmission
  * @param pm pending message to update once transmission is done, may be NULL!
@@ -4156,6 +4303,7 @@ queue_send_msg (struct Queue *queue,
   struct GNUNET_TRANSPORT_SendMessageTo *smt;
   struct GNUNET_MQ_Envelope *env;
 
+  queue->idle = GNUNET_NO;
   GNUNET_log (
     GNUNET_ERROR_TYPE_DEBUG,
     "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
@@ -4187,6 +4335,11 @@ queue_send_msg (struct Queue *queue,
     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
     queue->queue_length++;
     queue->tc->details.communicator.total_queue_length++;
+    if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
+        queue->tc->details.communicator.total_queue_length)
+      queue->idle = GNUNET_NO;
+    if (QUEUE_LENGTH_LIMIT == queue->queue_length)
+      queue->idle = GNUNET_NO;
     GNUNET_MQ_send (queue->tc->mq, env);
   }
 }
@@ -4239,6 +4392,7 @@ route_via_neighbour (const struct Neighbour *n,
                               GNUNET_NO);
     return;
   }
+
   sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
   if (0 == (options & RMO_REDUNDANT))
     sel2 = candidates; /* picks none! */
@@ -4268,198 +4422,52 @@ route_via_neighbour (const struct Neighbour *n,
 
 
 /**
- * Given a distance vector path @a dvh route @a payload to
- * the ultimate destination respecting @a options.
- * Sets up the boxed message and queues it at the next hop.
- *
- * @param dvh choice of the path for the message
- * @param payload encrypted body to transmit
- * @param payload_len number of bytes in @a payload
- * @param options options to use for control
+ * Structure of the key material used to encrypt backchannel messages.
  */
-static void
-forward_via_dvh (const struct DistanceVectorHop *dvh,
-                 const void *payload,
-                 size_t payload_len,
-                 enum RouteMessageOptions options)
+struct DVKeyState
 {
-  struct TransportDVBoxMessage *dvb;
+  /**
+   * State of our block cipher.
+   */
+  gcry_cipher_hd_t cipher;
 
-  dvb = create_dv_box (0,
-                       &GST_my_identity,
-                       &dvh->dv->target,
-                       dvh->distance,
-                       dvh->path,
-                       payload,
-                       payload_len);
-  route_via_neighbour (dvh->next_hop, &dvb->header, options);
-  GNUNET_free (dvb);
-}
+  /**
+   * Actual key material.
+   */
+  struct
+  {
 
+    /**
+     * Key used for HMAC calculations (via #GNUNET_CRYPTO_hmac()).
+     */
+    struct GNUNET_CRYPTO_AuthKey hmac_key;
 
-/**
- * Pick a path of @a dv under constraints @a options and schedule
- * transmission of @a hdr.
- *
- * @param n neighbour to send to
- * @param hdr message to send as payload
- * @param options whether path must be confirmed or not
- *        and whether we may pick multiple (2) paths
- */
-static void
-route_via_dv (const struct DistanceVector *dv,
-              const struct GNUNET_MessageHeader *hdr,
-              enum RouteMessageOptions options)
-{
-  struct DistanceVectorHop *hops[2];
-  unsigned int res;
+    /**
+     * Symmetric key to use for encryption.
+     */
+    char aes_key[256 / 8];
 
-  res = pick_random_dv_hops (dv,
-                             options,
-                             hops,
-                             (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
-  if (0 == res)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Failed to route message, could not determine DV path\n");
-    return;
-  }
-  // FIXME: we should encrypt `hdr` here first!
-  for (unsigned int i = 0; i < res; i++)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Routing message of type %u to %s using DV (#%u/%u)\n",
-                ntohs (hdr->type),
-                GNUNET_i2s (&dv->target),
-                i + 1,
-                res + 1);
-    forward_via_dvh (hops[i],
-                     hdr,
-                     ntohs (
-                       hdr->size), /* FIXME: can't do this once encrypted... */
-                     options & (~RMO_REDUNDANT));
-  }
-}
+    /**
+     * Counter value to use during setup.
+     */
+    char aes_ctr[128 / 8];
+
+  } material;
+};
 
 
 /**
- * We need to transmit @a hdr to @a target.  If necessary, this may
- * involve DV routing.
+ * Given the key material in @a km and the initialization vector
+ * @a iv, setup the key material for the backchannel in @a key.
  *
- * @param target peer to receive @a hdr
- * @param hdr header of the message to route and #GNUNET_free()
- * @param options which transmission channels are allowed
+ * @param km raw master secret
+ * @param iv initialization vector
+ * @param key[out] symmetric cipher and HMAC state to generate
  */
 static void
-route_message (const struct GNUNET_PeerIdentity *target,
-               struct GNUNET_MessageHeader *hdr,
-               enum RouteMessageOptions options)
-{
-  struct VirtualLink *vl;
-  struct Neighbour *n;
-  struct DistanceVector *dv;
-
-  vl = GNUNET_CONTAINER_multipeermap_get (links, target);
-  n = vl->n;
-  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
-  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
-  {
-    /* if confirmed is required, and we do not have anything
-       confirmed, drop respective options */
-    if (NULL == n)
-      n = lookup_neighbour (target);
-    if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
-      dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
-  }
-  if ((NULL == n) && (NULL == dv))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Cannot route message of type %u to %s: no route\n",
-                ntohs (hdr->type),
-                GNUNET_i2s (target));
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# Messages dropped in routing: no acceptable method",
-                              1,
-                              GNUNET_NO);
-    GNUNET_free (hdr);
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Routing message of type %u to %s with options %X\n",
-              ntohs (hdr->type),
-              GNUNET_i2s (target),
-              (unsigned int) options);
-  /* If both dv and n are possible and we must choose:
-     flip a coin for the choice between the two; for now 50/50 */
-  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
-  {
-    if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 2))
-      n = NULL;
-    else
-      dv = NULL;
-  }
-  if ((NULL != n) && (NULL != dv))
-    options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
-                                  enough for redunancy, so clear the flag. */
-  if (NULL != n)
-  {
-    route_via_neighbour (n, hdr, options);
-  }
-  if (NULL != dv)
-  {
-    route_via_dv (dv, hdr, options);
-  }
-  GNUNET_free (hdr);
-}
-
-
-/**
- * Structure of the key material used to encrypt backchannel messages.
- */
-struct BackchannelKeyState
-{
-  /**
-   * State of our block cipher.
-   */
-  gcry_cipher_hd_t cipher;
-
-  /**
-   * Actual key material.
-   */
-  struct
-  {
-
-    /**
-     * Key used for HMAC calculations (via #GNUNET_CRYPTO_hmac()).
-     */
-    struct GNUNET_CRYPTO_AuthKey hmac_key;
-
-    /**
-     * Symmetric key to use for encryption.
-     */
-    char aes_key[256 / 8];
-
-    /**
-     * Counter value to use during setup.
-     */
-    char aes_ctr[128 / 8];
-
-  } material;
-};
-
-
-/**
- * Given the key material in @a km and the initialization vector
- * @a iv, setup the key material for the backchannel in @a key.
- *
- * @param km raw master secret
- * @param iv initialization vector
- * @param key[out] symmetric cipher and HMAC state to generate
- */
-static void
-bc_setup_key_state_from_km (const struct GNUNET_HashCode *km,
-                            const struct GNUNET_ShortHashCode *iv,
-                            struct BackchannelKeyState *key)
+dv_setup_key_state_from_km (const struct GNUNET_HashCode *km,
+                            const struct GNUNET_ShortHashCode *iv,
+                            struct DVKeyState *key)
 {
   /* must match #dh_key_derive_eph_pub */
   GNUNET_assert (GNUNET_YES ==
@@ -4502,14 +4510,14 @@ dh_key_derive_eph_pid (
   const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
   const struct GNUNET_PeerIdentity *target,
   const struct GNUNET_ShortHashCode *iv,
-  struct BackchannelKeyState *key)
+  struct DVKeyState *key)
 {
   struct GNUNET_HashCode km;
 
   GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_ecdh_eddsa (priv_ephemeral,
                                                          &target->public_key,
                                                          &km));
-  bc_setup_key_state_from_km (&km, iv, key);
+  dv_setup_key_state_from_km (&km, iv, key);
 }
 
 
@@ -4525,14 +4533,14 @@ dh_key_derive_eph_pid (
 static void
 dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
                        const struct GNUNET_ShortHashCode *iv,
-                       struct BackchannelKeyState *key)
+                       struct DVKeyState *key)
 {
   struct GNUNET_HashCode km;
 
   GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_eddsa_ecdh (GST_my_private_key,
                                                          pub_ephemeral,
                                                          &km));
-  bc_setup_key_state_from_km (&km, iv, key);
+  dv_setup_key_state_from_km (&km, iv, key);
 }
 
 
@@ -4546,7 +4554,7 @@ dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
  * @param data_size number of bytes in @a data
  */
 static void
-bc_hmac (const struct BackchannelKeyState *key,
+dv_hmac (const struct DVKeyState *key,
          struct GNUNET_HashCode *hmac,
          const void *data,
          size_t data_size)
@@ -4565,10 +4573,7 @@ bc_hmac (const struct BackchannelKeyState *key,
  * @param in_size number of bytes of input in @a in and available at @a dst
  */
 static void
-bc_encrypt (struct BackchannelKeyState *key,
-            const void *in,
-            void *dst,
-            size_t in_size)
+dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
 {
   GNUNET_assert (0 ==
                  gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
@@ -4585,7 +4590,7 @@ bc_encrypt (struct BackchannelKeyState *key,
  * @param out_size number of bytes of input in @a ciph and available in @a out
  */
 static void
-bc_decrypt (struct BackchannelKeyState *key,
+dv_decrypt (struct DVKeyState *key,
             void *out,
             const void *ciph,
             size_t out_size)
@@ -4601,15 +4606,242 @@ bc_decrypt (struct BackchannelKeyState *key,
  * @param key key material to clean up (memory must not be free'd!)
  */
 static void
-bc_key_clean (struct BackchannelKeyState *key)
+dv_key_clean (struct DVKeyState *key)
 {
   gcry_cipher_close (key->cipher);
   GNUNET_CRYPTO_zero_keys (&key->material, sizeof (key->material));
 }
 
 
+/**
+ * Function to call to further operate on the now DV encapsulated
+ * message @a hdr, forwarding it via @a next_hop under respect of
+ * @a options.
+ *
+ * @param cls closure
+ * @param next_hop next hop of the DV path
+ * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
+ * @param options options of the original message
+ */
+typedef void (*DVMessageHandler) (void *cls,
+                                  struct Neighbour *next_hop,
+                                  const struct GNUNET_MessageHeader *hdr,
+                                  enum RouteMessageOptions options);
+
+/**
+ * Pick a path of @a dv under constraints @a options and schedule
+ * transmission of @a hdr.
+ *
+ * @param target neighbour to ultimately send to
+ * @param num_dvhs length of the @a dvhs array
+ * @param dvhs array of hops to send the message to
+ * @param hdr message to send as payload
+ * @param use function to call with the encapsulated message
+ * @param use_cls closure for @a use
+ * @param options whether path must be confirmed or not, to be passed to @a use
+ */
+static void
+encapsulate_for_dv (struct DistanceVector *dv,
+                    unsigned int num_dvhs,
+                    struct DistanceVectorHop **dvhs,
+                    const struct GNUNET_MessageHeader *hdr,
+                    DVMessageHandler use,
+                    void *use_cls,
+                    enum RouteMessageOptions options)
+{
+  struct TransportDVBoxMessage box_hdr;
+  struct TransportDVBoxPayloadP payload_hdr;
+  uint16_t enc_body_size = ntohs (hdr->size);
+  char enc[sizeof (struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
+  struct TransportDVBoxPayloadP *enc_payload_hdr =
+    (struct TransportDVBoxPayloadP *) enc;
+  struct DVKeyState key;
+
+  /* Encrypt payload */
+  box_hdr.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
+  box_hdr.total_hops = htons (0);
+  update_ephemeral (dv);
+  box_hdr.ephemeral_key = dv->ephemeral_key;
+  payload_hdr.sender_sig = dv->sender_sig;
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                              &box_hdr.iv,
+                              sizeof (box_hdr.iv));
+  dh_key_derive_eph_pid (&dv->private_key, &dv->target, &box_hdr.iv, &key);
+  payload_hdr.sender = GST_my_identity;
+  payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
+  dv_encrypt (&key, &payload_hdr, enc_payload_hdr, sizeof (payload_hdr));
+  dv_encrypt (&key,
+              hdr,
+              &enc[sizeof (struct TransportDVBoxPayloadP)],
+              enc_body_size);
+  dv_hmac (&key, &box_hdr.hmac, enc, sizeof (enc));
+  dv_key_clean (&key);
+
+  /* For each selected path, take the pre-computed header and body
+     and add the path in the middle of the message; then send it. */
+  for (unsigned int i = 0; i < num_dvhs; i++)
+  {
+    struct DistanceVectorHop *dvh = dvhs[i];
+    unsigned int num_hops = dvh->distance + 1;
+    char buf[sizeof (struct TransportDVBoxMessage) +
+             sizeof (struct GNUNET_PeerIdentity) * num_hops +
+             sizeof (struct TransportDVBoxPayloadP) +
+             enc_body_size] GNUNET_ALIGN;
+    struct GNUNET_PeerIdentity *dhops;
+
+    box_hdr.header.size = htons (sizeof (buf));
+    box_hdr.num_hops = htons (num_hops);
+    memcpy (buf, &box_hdr, sizeof (box_hdr));
+    dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof (box_hdr)];
+    memcpy (dhops,
+            dvh->path,
+            dvh->distance * sizeof (struct GNUNET_PeerIdentity));
+    dhops[dvh->distance] = dv->target;
+    if (GNUNET_EXTRA_LOGGING > 0)
+    {
+      char *path;
+
+      path = GNUNET_strdup (GNUNET_i2s (&GST_my_identity));
+      for (unsigned int i = 0; i <= num_hops; i++)
+      {
+        char *tmp;
+
+        GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
+        GNUNET_free (path);
+        path = tmp;
+      }
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
+                  ntohs (hdr->type),
+                  GNUNET_i2s (&dv->target),
+                  i + 1,
+                  num_dvhs + 1,
+                  path);
+      GNUNET_free (path);
+    }
+
+    memcpy (&dhops[num_hops], enc, sizeof (enc));
+    use (use_cls,
+         dvh->next_hop,
+         (const struct GNUNET_MessageHeader *) buf,
+         options);
+  }
+}
+
+
+/**
+ * Wrapper around #route_via_neighbour() that matches the
+ * #DVMessageHandler structure.
+ *
+ * @param cls unused
+ * @param next_hop where to send next
+ * @param hdr header of the message to send
+ * @param options message options for queue selection
+ */
+static void
+send_dv_to_neighbour (void *cls,
+                      struct Neighbour *next_hop,
+                      const struct GNUNET_MessageHeader *hdr,
+                      enum RouteMessageOptions options)
+{
+  (void) cls;
+  route_via_neighbour (next_hop, hdr, options);
+}
+
+
+/**
+ * We need to transmit @a hdr to @a target.  If necessary, this may
+ * involve DV routing.
+ *
+ * @param target peer to receive @a hdr
+ * @param hdr header of the message to route and #GNUNET_free()
+ * @param options which transmission channels are allowed
+ */
+static void
+route_message (const struct GNUNET_PeerIdentity *target,
+               const struct GNUNET_MessageHeader *hdr,
+               enum RouteMessageOptions options)
+{
+  struct VirtualLink *vl;
+  struct Neighbour *n;
+  struct DistanceVector *dv;
+
+  vl = lookup_virtual_link (target);
+  n = vl->n;
+  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
+  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
+  {
+    /* if confirmed is required, and we do not have anything
+       confirmed, drop respective options */
+    if (NULL == n)
+      n = lookup_neighbour (target);
+    if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+      dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
+  }
+  if ((NULL == n) && (NULL == dv))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Cannot route message of type %u to %s: no route\n",
+                ntohs (hdr->type),
+                GNUNET_i2s (target));
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Messages dropped in routing: no acceptable method",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Routing message of type %u to %s with options %X\n",
+              ntohs (hdr->type),
+              GNUNET_i2s (target),
+              (unsigned int) options);
+  /* If both dv and n are possible and we must choose:
+     flip a coin for the choice between the two; for now 50/50 */
+  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
+  {
+    if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 2))
+      n = NULL;
+    else
+      dv = NULL;
+  }
+  if ((NULL != n) && (NULL != dv))
+    options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
+                                  enough for redunancy, so clear the flag. */
+  if (NULL != n)
+  {
+    route_via_neighbour (n, hdr, options);
+  }
+  if (NULL != dv)
+  {
+    struct DistanceVectorHop *hops[2];
+    unsigned int res;
+
+    res = pick_random_dv_hops (dv,
+                               options,
+                               hops,
+                               (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
+    if (0 == res)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Failed to route message, could not determine DV path\n");
+      return;
+    }
+    encapsulate_for_dv (dv,
+                        res,
+                        hops,
+                        hdr,
+                        &send_dv_to_neighbour,
+                        NULL,
+                        options & (~RMO_REDUNDANT));
+  }
+}
+
+
 /**
  * Communicator requests backchannel transmission.  Process the request.
+ * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *`
+ * (which for now has exactly the same format, only a different message type)
+ * and passes it on for routing.
  *
  * @param cls the client
  * @param cb the send message that was sent
@@ -4620,62 +4852,33 @@ handle_communicator_backchannel (
   const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
 {
   struct TransportClient *tc = cls;
-  struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
-  struct GNUNET_TIME_Absolute monotime;
-  struct TransportBackchannelEncapsulationMessage *enc;
-  struct TransportBackchannelRequestPayloadP ppay;
-  struct BackchannelKeyState key;
-  char *mpos;
-  uint16_t msize;
-
-  {
-    const struct GNUNET_MessageHeader *inbox;
-    const char *is;
-
-    inbox = (const struct GNUNET_MessageHeader *) &cb[1];
-    /* 0-termination of 'is' was checked already in
-       #check_communicator_backchannel() */
-    is = (const char *) &cb[1];
-    is += ntohs (inbox->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Preparing backchannel transmission to %s:%s of type %u\n",
-                GNUNET_i2s (&cb->pid),
-                is,
-                ntohs (inbox->size));
-  }
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &cb[1];
+  uint16_t isize = ntohs (inbox->size);
+  const char *is = ((const char *) &cb[1]) + isize;
+  char
+    mbuf[isize +
+         sizeof (struct TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN;
+  struct TransportBackchannelEncapsulationMessage *be =
+    (struct TransportBackchannelEncapsulationMessage *) mbuf;
+
+  /* 0-termination of 'is' was checked already in
+     #check_communicator_backchannel() */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Preparing backchannel transmission to %s:%s of type %u\n",
+              GNUNET_i2s (&cb->pid),
+              is,
+              ntohs (inbox->size));
   /* encapsulate and encrypt message */
-
-  /* FIXME: this should be done with the DV logic for all
-     DV messages, NOT here only for backchannel! */
-  msize = ntohs (cb->header.size) - sizeof (*cb) +
-          sizeof (struct TransportBackchannelRequestPayloadP);
-  enc = GNUNET_malloc (sizeof (*enc) + msize);
-  enc->header.type =
+  be->header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
-  enc->header.size = htons (sizeof (*enc) + msize);
-  enc->target = cb->pid;
-  lookup_ephemeral (&cb->pid,
-                    &private_key,
-                    &enc->ephemeral_key,
-                    &ppay.sender_sig,
-                    &monotime);
-  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                              &enc->iv,
-                              sizeof (enc->iv));
-  dh_key_derive_eph_pid (&private_key, &cb->pid, &enc->iv, &key);
-  ppay.monotonic_time = GNUNET_TIME_absolute_hton (monotime);
-  mpos = (char *) &enc[1];
-  bc_encrypt (&key, &ppay, mpos, sizeof (ppay));
-  bc_encrypt (&key,
-              &cb[1],
-              &mpos[sizeof (ppay)],
-              ntohs (cb->header.size) - sizeof (*cb));
-  bc_hmac (&key,
-           &enc->hmac,
-           mpos,
-           sizeof (ppay) + ntohs (cb->header.size) - sizeof (*cb));
-  bc_key_clean (&key);
-  route_message (&cb->pid, &enc->header, RMO_DV_ALLOWED);
+  be->header.size = htons (sizeof (mbuf));
+  memcpy (&be[1], inbox, isize);
+  memcpy (&mbuf[sizeof (struct TransportBackchannelEncapsulationMessage) +
+                isize],
+          is,
+          strlen (is) + 1);
+  route_message (&cb->pid, &be->header, RMO_DV_ALLOWED);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -4871,6 +5074,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg);
 
 
+/**
+ * Function called when we are done giving a message of a certain
+ * size to CORE and should thus decrement the number of bytes of
+ * RAM reserved for that peer's MQ.
+ *
+ * @param cls a `struct CoreSentContext`
+ */
+static void
+core_env_sent_cb (void *cls)
+{
+  struct CoreSentContext *ctx = cls;
+  struct VirtualLink *vl = ctx->vl;
+
+  if (NULL == vl)
+  {
+    /* lost the link in the meantime, ignore */
+    GNUNET_free (ctx);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx);
+  GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
+  vl->incoming_fc_window_size_ram -= ctx->size;
+  vl->incoming_fc_window_size_used += ctx->isize;
+  /* TODO-M1 */
+  GNUNET_free (ctx);
+}
+
+
 /**
  * Communicator gave us an unencapsulated message to pass as-is to
  * CORE.  Process the request.
@@ -4897,7 +5128,7 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     GNUNET_SERVICE_client_drop (client);
     return;
   }
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+  vl = lookup_virtual_link (&cmc->im.sender);
   if (NULL == vl)
   {
     /* FIXME: sender is giving us messages for CORE but we don't have
@@ -4916,30 +5147,65 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     finish_cmc_handling (cmc);
     return;
   }
+  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (FC arithmetic overflow)",
+                              1,
+                              GNUNET_NO);
+
+    finish_cmc_handling (cmc);
+    return;
+  }
+  if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (FC window overflow)",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+
   /* Forward to all CORE clients */
   have_core = GNUNET_NO;
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
     struct GNUNET_MQ_Envelope *env;
     struct InboundMessage *im;
+    struct CoreSentContext *ctx;
 
     if (CT_CORE != tc->type)
       continue;
-    have_core = GNUNET_YES;
+    vl->incoming_fc_window_size_ram += size;
     env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+    ctx = GNUNET_new (struct CoreSentContext);
+    ctx->vl = vl;
+    ctx->size = size;
+    ctx->isize = (GNUNET_NO == have_core) ? size : 0;
+    have_core = GNUNET_YES;
+    GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx);
+    GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx);
     im->peer = cmc->im.sender;
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
+    vl->core_recv_window--;
   }
-  vl->core_recv_window--;
   if (GNUNET_NO == have_core)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Dropped message to CORE: no CORE client connected!\n");
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Delivered message from %s of type %u to CORE\n",
-                GNUNET_i2s (&cmc->im.sender),
-                ntohs (mh->type));
+    /* Nevertheless, count window as used, as it is from the
+       perspective of the other peer! */
+    vl->incoming_fc_window_size_used += size;
+    /* TODO-M1 */
+    finish_cmc_handling (cmc);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivered message from %s of type %u to CORE\n",
+              GNUNET_i2s (&cmc->im.sender),
+              ntohs (mh->type));
   if (vl->core_recv_window > 0)
   {
     finish_cmc_handling (cmc);
@@ -5012,7 +5278,11 @@ static void
 transmit_cummulative_ack_cb (void *cls)
 {
   struct AcknowledgementCummulator *ac = cls;
-  struct TransportReliabilityAckMessage *ack;
+  char buf[sizeof (struct TransportReliabilityAckMessage) +
+           ac->ack_counter *
+             sizeof (struct TransportCummulativeAckPayloadP)] GNUNET_ALIGN;
+  struct TransportReliabilityAckMessage *ack =
+    (struct TransportReliabilityAckMessage *) buf;
   struct TransportCummulativeAckPayloadP *ap;
 
   ac->task = NULL;
@@ -5021,9 +5291,6 @@ transmit_cummulative_ack_cb (void *cls)
               ac->ack_counter,
               GNUNET_i2s (&ac->target));
   GNUNET_assert (0 < ac->ack_counter);
-  ack = GNUNET_malloc (sizeof (*ack) +
-                       ac->ack_counter *
-                         sizeof (struct TransportCummulativeAckPayloadP));
   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
   ack->header.size =
     htons (sizeof (*ack) +
@@ -5436,47 +5703,73 @@ update_dvh_performance (struct DistanceVectorHop *dvh,
 
 
 /**
- * The @a pa was acknowledged, process the acknowledgement.
+ * We have completed transmission of @a pm, remove it from
+ * the transmission queues (and if it is a fragment, continue
+ * up the tree as necessary).
  *
- * @param pa the pending acknowledgement that was satisfied
- * @param ack_delay artificial delay from cummulative acks created by the
- * other peer
+ * @param pm pending message that was transmitted
  */
 static void
-handle_acknowledged (struct PendingAcknowledgement *pa,
-                     struct GNUNET_TIME_Relative ack_delay)
+completed_pending_message (struct PendingMessage *pm)
 {
-  struct PendingMessage *pm = pa->pm;
-  struct GNUNET_TIME_Relative delay;
+  struct PendingMessage *pos;
 
-  delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
-  if (delay.rel_value_us > ack_delay.rel_value_us)
-    delay = GNUNET_TIME_UNIT_ZERO;
+  switch (pm->pmt)
+  {
+  case PMT_CORE:
+  case PMT_RELIABILITY_BOX:
+    /* Full message sent, we are done */
+    client_send_response (pm);
+    return;
+  case PMT_FRAGMENT_BOX:
+    /* Fragment sent over reliabile channel */
+    free_fragment_tree (pm);
+    pos = pm->frag_parent;
+    GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+    GNUNET_free (pm);
+    /* check if subtree is done */
+    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
+           (pos != pm))
+    {
+      pm = pos;
+      pos = pm->frag_parent;
+      GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+      GNUNET_free (pm);
+    }
+
+    /* Was this the last applicable fragmment? */
+    if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
+        (pos->frag_off == pos->bytes_msg))
+      client_send_response (pos);
+    return;
+  }
+}
+
+
+/**
+ * The @a pa was acknowledged, process the acknowledgement.
+ *
+ * @param pa the pending acknowledgement that was satisfied
+ * @param ack_delay artificial delay from cummulative acks created by the
+ * other peer
+ */
+static void
+handle_acknowledged (struct PendingAcknowledgement *pa,
+                     struct GNUNET_TIME_Relative ack_delay)
+{
+  struct GNUNET_TIME_Relative delay;
+
+  delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
+  if (delay.rel_value_us > ack_delay.rel_value_us)
+    delay = GNUNET_TIME_UNIT_ZERO;
   else
     delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
   if (NULL != pa->queue)
     update_queue_performance (pa->queue, delay, pa->message_size);
   if (NULL != pa->dvh)
     update_dvh_performance (pa->dvh, delay, pa->message_size);
-  if (NULL != pm)
-  {
-    if (NULL != pm->frag_parent)
-    {
-      pm = pm->frag_parent;
-      free_fragment_tree (pa->pm);
-    }
-    while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
-    {
-      struct PendingMessage *parent = pm->frag_parent;
-
-      free_fragment_tree (pm);
-      pm = parent;
-    }
-    if (NULL != pm->head_frag)
-      pm = NULL; /* we are done, otherwise free 'pm' below */
-  }
-  if (NULL != pm)
-    free_pending_message (pm);
+  if (NULL != pa->pm)
+    completed_pending_message (pa->pm);
   free_pending_acknowledgement (pa);
 }
 
@@ -5576,12 +5869,22 @@ check_backchannel_encapsulation (
   void *cls,
   const struct TransportBackchannelEncapsulationMessage *be)
 {
-  uint16_t size = ntohs (be->header.size);
+  uint16_t size = ntohs (be->header.size) - sizeof (*be);
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &be[1];
+  const char *is;
+  uint16_t isize;
 
   (void) cls;
-  if ((size - sizeof (*be)) <
-      (sizeof (struct TransportBackchannelRequestPayloadP) +
-       sizeof (struct GNUNET_MessageHeader)))
+  if (ntohs (inbox->size) >= size)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  isize = ntohs (inbox->size);
+  is = ((const char *) inbox) + isize;
+  size -= isize;
+  if ('\0' != is[size - 1])
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
@@ -5591,40 +5894,27 @@ check_backchannel_encapsulation (
 
 
 /**
- * We received the plaintext @a msg from backtalker @a b. Forward
- * it to the respective communicator.
+ * Communicator gave us a backchannel encapsulation.  Process the request.
+ * (We are the destination of the backchannel here.)
  *
- * @param b a backtalker
- * @param msg a message, consisting of a `struct GNUNET_MessageHeader`
- *        followed by the target name of the communicator
- * @param msg_size number of bytes in @a msg
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param be the message that was received
  */
 static void
-forward_backchannel_payload (struct Backtalker *b,
-                             const void *msg,
-                             size_t msg_size)
+handle_backchannel_encapsulation (
+  void *cls,
+  const struct TransportBackchannelEncapsulationMessage *be)
 {
+  struct CommunicatorMessageContext *cmc = cls;
   struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
   struct GNUNET_MQ_Envelope *env;
   struct TransportClient *tc;
-  const struct GNUNET_MessageHeader *mh;
-  const char *target_communicator;
-  uint16_t mhs;
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &be[1];
+  uint16_t isize = ntohs (inbox->size);
+  const char *target_communicator = ((const char *) inbox) + isize;
 
-  /* Determine target_communicator and check @a msg is well-formed */
-  mh = msg;
-  mhs = ntohs (mh->size);
-  if (mhs <= msg_size)
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  target_communicator = &((const char *) msg)[ntohs (mh->size)];
-  if ('\0' != target_communicator[msg_size - mhs - 1])
-  {
-    GNUNET_break_op (0);
-    return;
-  }
   /* Find client providing this communicator */
   for (tc = clients_head; NULL != tc; tc = tc->next)
     if ((CT_COMMUNICATOR == tc->type) &&
@@ -5646,1471 +5936,1709 @@ forward_backchannel_payload (struct Backtalker *b,
   /* Finally, deliver backchannel message to communicator */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Delivering backchannel message from %s of type %u to %s\n",
-              GNUNET_i2s (&b->pid),
-              ntohs (mh->type),
+              GNUNET_i2s (&cmc->im.sender),
+              ntohs (inbox->type),
               target_communicator);
   env = GNUNET_MQ_msg_extra (
     cbi,
-    msg_size,
+    isize,
     GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
-  cbi->pid = b->pid;
-  memcpy (&cbi[1], msg, msg_size);
+  cbi->pid = cmc->im.sender;
+  memcpy (&cbi[1], inbox, isize);
   GNUNET_MQ_send (tc->mq, env);
 }
 
 
 /**
- * Free data structures associated with @a b.
+ * Task called when we should check if any of the DV paths
+ * we have learned to a target are due for garbage collection.
  *
- * @param b data structure to release
+ * Collects stale paths, and possibly frees the entire DV
+ * entry if no paths are left. Otherwise re-schedules itself.
+ *
+ * @param cls a `struct DistanceVector`
  */
 static void
-free_backtalker (struct Backtalker *b)
+path_cleanup_cb (void *cls)
 {
-  if (NULL != b->get)
-  {
-    GNUNET_PEERSTORE_iterate_cancel (b->get);
-    b->get = NULL;
-    GNUNET_assert (NULL != b->cmc);
-    finish_cmc_handling (b->cmc);
-    b->cmc = NULL;
-  }
-  if (NULL != b->task)
+  struct DistanceVector *dv = cls;
+  struct DistanceVectorHop *pos;
+
+  dv->timeout_task = NULL;
+  while (NULL != (pos = dv->dv_head))
   {
-    GNUNET_SCHEDULER_cancel (b->task);
-    b->task = NULL;
+    GNUNET_assert (dv == pos->dv);
+    if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
+      break;
+    free_distance_vector_hop (pos);
   }
-  if (NULL != b->sc)
+  if (NULL == pos)
   {
-    GNUNET_PEERSTORE_store_cancel (b->sc);
-    b->sc = NULL;
+    free_dv_route (dv);
+    return;
   }
-  GNUNET_assert (
-    GNUNET_YES ==
-    GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
-  GNUNET_free (b);
-}
-
-
-/**
- * Callback to free backtalker records.
- *
- * @param cls NULL
- * @param pid unused
- * @param value a `struct Backtalker`
- * @return #GNUNET_OK (always)
- */
-static int
-free_backtalker_cb (void *cls,
-                    const struct GNUNET_PeerIdentity *pid,
-                    void *value)
-{
-  struct Backtalker *b = value;
-
-  (void) cls;
-  (void) pid;
-  free_backtalker (b);
-  return GNUNET_OK;
+  dv->timeout_task =
+    GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
 }
 
 
 /**
- * Function called when it is time to clean up a backtalker.
+ * The @a hop is a validated path to the respective target
+ * peer and we should tell core about it -- and schedule
+ * a job to revoke the state.
  *
- * @param cls a `struct Backtalker`
+ * @param hop a path to some peer that is the reason for activation
  */
 static void
-backtalker_timeout_cb (void *cls)
+activate_core_visible_dv_path (struct DistanceVectorHop *hop)
 {
-  struct Backtalker *b = cls;
+  struct DistanceVector *dv = hop->dv;
+  struct VirtualLink *vl;
 
-  b->task = NULL;
-  if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
+  vl = lookup_virtual_link (&dv->target);
+  if (NULL != vl)
   {
-    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    /* Link was already up, remember dv is also now available and we are done */
+    vl->dv = dv;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Virtual link to %s could now also use DV!\n",
+                GNUNET_i2s (&dv->target));
     return;
   }
-  GNUNET_assert (NULL == b->sc);
-  free_backtalker (b);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating new virtual link to %s using DV!\n",
+              GNUNET_i2s (&dv->target));
+  vl = GNUNET_new (struct VirtualLink);
+  vl->message_uuid_ctr =
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+  vl->target = dv->target;
+  vl->dv = dv;
+  dv->vl = vl;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
+     before, so tell CORE about it (finally!) */
+  cores_send_connect_info (&dv->target);
 }
 
 
 /**
- * Function called with the monotonic time of a backtalker
- * by PEERSTORE. Updates the time and continues processing.
+ * We have learned a @a path through the network to some other peer, add it to
+ * our DV data structure (returning #GNUNET_YES on success).
  *
- * @param cls a `struct Backtalker`
- * @param record the information found, NULL for the last call
- * @param emsg error message
+ * We do not add paths if we have a sufficient number of shorter
+ * paths to this target already (returning #GNUNET_NO).
+ *
+ * We also do not add problematic paths, like those where we lack the first
+ * hop in our neighbour list (i.e. due to a topology change) or where some
+ * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
+ *
+ * @param path the path we learned, path[0] should be us,
+ *             and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
+ * @param path_len number of entries on the @a path, at least three!
+ * @param network_latency how long does the message take from us to
+ * `path[path_len-1]`? set to "forever" if unknown
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
+ * @return #GNUNET_YES on success,
+ *         #GNUNET_NO if we have better path(s) to the target
+ *         #GNUNET_SYSERR if the path is useless and/or invalid
+ *                         (i.e. path[1] not a direct neighbour
+ *                        or path[i+1] is a direct neighbour for i>0)
  */
-static void
-backtalker_monotime_cb (void *cls,
-                        const struct GNUNET_PEERSTORE_Record *record,
-                        const char *emsg)
+static int
+learn_dv_path (const struct GNUNET_PeerIdentity *path,
+               unsigned int path_len,
+               struct GNUNET_TIME_Relative network_latency,
+               struct GNUNET_TIME_Absolute path_valid_until)
 {
-  struct Backtalker *b = cls;
-  struct GNUNET_TIME_AbsoluteNBO *mtbe;
-  struct GNUNET_TIME_Absolute mt;
+  struct DistanceVectorHop *hop;
+  struct DistanceVector *dv;
+  struct Neighbour *next_hop;
+  unsigned int shorter_distance;
 
-  (void) emsg;
-  if (NULL == record)
-  {
-    /* we're done with #backtalker_monotime_cb() invocations,
-       continue normal processing */
-    b->get = NULL;
-    GNUNET_assert (NULL != b->cmc);
-    finish_cmc_handling (b->cmc);
-    b->cmc = NULL;
-    if (0 != b->body_size)
-      forward_backchannel_payload (b, &b[1], b->body_size);
-    return;
-  }
-  if (sizeof (*mtbe) != record->value_size)
+  if (path_len < 3)
   {
+    /* what a boring path! not allowed! */
     GNUNET_break (0);
-    return;
+    return GNUNET_SYSERR;
   }
-  mtbe = record->value;
-  mt = GNUNET_TIME_absolute_ntoh (*mtbe);
-  if (mt.abs_value_us > b->monotonic_time.abs_value_us)
+  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
+  next_hop = lookup_neighbour (&path[1]);
+  if (NULL == next_hop)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Backtalker message from %s dropped, monotime in the past\n",
-                GNUNET_i2s (&b->pid));
-    GNUNET_STATISTICS_update (
-      GST_stats,
-      "# Backchannel messages dropped: monotonic time not increasing",
-      1,
-      GNUNET_NO);
-    b->monotonic_time = mt;
-    /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
-     */
-    b->body_size = 0;
-    return;
+    /* next hop must be a neighbour, otherwise this whole thing is useless! */
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-}
-
+  for (unsigned int i = 2; i < path_len; i++)
+    if (NULL != lookup_neighbour (&path[i]))
+    {
+      /* Useless path: we have a direct connection to some hop
+         in the middle of the path, so this one is not even
+         terribly useful for redundancy */
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Path of %u hops useless: directly link to hop %u (%s)\n",
+                  path_len,
+                  i,
+                  GNUNET_i2s (&path[i]));
+      GNUNET_STATISTICS_update (GST_stats,
+                                "# Useless DV path ignored: hop is neighbour",
+                                1,
+                                GNUNET_NO);
+      return GNUNET_SYSERR;
+    }
+  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
+  if (NULL == dv)
+  {
+    dv = GNUNET_new (struct DistanceVector);
+    dv->target = path[path_len - 1];
+    dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
+                                                     &path_cleanup_cb,
+                                                     dv);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     dv_routes,
+                     &dv->target,
+                     dv,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  }
+  /* Check if we have this path already! */
+  shorter_distance = 0;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+  {
+    if (pos->distance < path_len - 2)
+      shorter_distance++;
+    /* Note that the distances in 'pos' excludes us (path[0]) and
+       the next_hop (path[1]), so we need to subtract two
+       and check next_hop explicitly */
+    if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop))
+    {
+      int match = GNUNET_YES;
 
-/**
- * Function called by PEERSTORE when the store operation of
- * a backtalker's monotonic time is complete.
- *
- * @param cls the `struct Backtalker`
- * @param success #GNUNET_OK on success
- */
-static void
-backtalker_monotime_store_cb (void *cls, int success)
-{
-  struct Backtalker *b = cls;
+      for (unsigned int i = 0; i < pos->distance; i++)
+      {
+        if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
+        {
+          match = GNUNET_NO;
+          break;
+        }
+      }
+      if (GNUNET_YES == match)
+      {
+        struct GNUNET_TIME_Relative last_timeout;
 
-  if (GNUNET_OK != success)
+        /* Re-discovered known path, update timeout */
+        GNUNET_STATISTICS_update (GST_stats,
+                                  "# Known DV path refreshed",
+                                  1,
+                                  GNUNET_NO);
+        last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
+        pos->timeout =
+          GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+        pos->path_valid_until =
+          GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
+        GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
+        GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
+        if (0 <
+            GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
+          activate_core_visible_dv_path (pos);
+        if (last_timeout.rel_value_us <
+            GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
+                                           DV_PATH_DISCOVERY_FREQUENCY)
+              .rel_value_us)
+        {
+          /* Some peer send DV learn messages too often, we are learning
+             the same path faster than it would be useful; do not forward! */
+          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                      "Rediscovered path too quickly, not forwarding further\n");
+          return GNUNET_NO;
+        }
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Refreshed known path to %s, forwarding further\n",
+                    GNUNET_i2s (&dv->target));
+        return GNUNET_YES;
+      }
+    }
+  }
+  /* Count how many shorter paths we have (incl. direct
+     neighbours) before simply giving up on this one! */
+  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to store backtalker's monotonic time in PEERSTORE!\n");
+    /* We have a shorter path already! */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Have many shorter DV paths %s, not forwarding further\n",
+                GNUNET_i2s (&dv->target));
+    return GNUNET_NO;
   }
-  b->sc = NULL;
-  b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+  /* create new DV path entry */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Discovered new DV path to %s\n",
+              GNUNET_i2s (&dv->target));
+  hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
+                       sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
+  hop->next_hop = next_hop;
+  hop->dv = dv;
+  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
+  memcpy (&hop[1],
+          &path[2],
+          sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
+  hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+  hop->path_valid_until = path_valid_until;
+  hop->distance = path_len - 2;
+  hop->pd.aged_rtt = network_latency;
+  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
+  GNUNET_CONTAINER_MDLL_insert (neighbour,
+                                next_hop->dv_head,
+                                next_hop->dv_tail,
+                                hop);
+  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
+    activate_core_visible_dv_path (hop);
+  return GNUNET_YES;
 }
 
 
 /**
- * The backtalker @a b monotonic time changed. Update PEERSTORE.
+ * Communicator gave us a DV learn message.  Check the message.
  *
- * @param b a backtalker with updated monotonic time
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvl the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
  */
-static void
-update_backtalker_monotime (struct Backtalker *b)
+static int
+check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
 {
-  struct GNUNET_TIME_AbsoluteNBO mtbe;
+  uint16_t size = ntohs (dvl->header.size);
+  uint16_t num_hops = ntohs (dvl->num_hops);
+  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
 
-  if (NULL != b->sc)
+  (void) cls;
+  if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
   {
-    GNUNET_PEERSTORE_store_cancel (b->sc);
-    b->sc = NULL;
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  else
+  if (num_hops > MAX_DV_HOPS_ALLOWED)
   {
-    GNUNET_SCHEDULER_cancel (b->task);
-    b->task = NULL;
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  mtbe = GNUNET_TIME_absolute_hton (b->monotonic_time);
-  b->sc =
-    GNUNET_PEERSTORE_store (peerstore,
-                            "transport",
-                            &b->pid,
-                            GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
-                            &mtbe,
-                            sizeof (mtbe),
-                            GNUNET_TIME_UNIT_FOREVER_ABS,
-                            GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                            &backtalker_monotime_store_cb,
-                            b);
+  for (unsigned int i = 0; i < num_hops; i++)
+  {
+    if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+    if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  }
+  return GNUNET_YES;
 }
 
 
 /**
- * Communicator gave us a backchannel encapsulation.  Process the request.
- * (We are not the origin of the backchannel here, the communicator simply
- * received a backchannel message and we are expected to forward it.)
+ * Build and forward a DV learn message to @a next_hop.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param be the message that was received
+ * @param next_hop peer to send the message to
+ * @param msg message received
+ * @param bi_history bitmask specifying hops on path that were bidirectional
+ * @param nhops length of the @a hops array
+ * @param hops path the message traversed so far
+ * @param in_time when did we receive the message, used to calculate network
+ * delay
  */
 static void
-handle_backchannel_encapsulation (
-  void *cls,
-  const struct TransportBackchannelEncapsulationMessage *be)
+forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
+                  const struct TransportDVLearnMessage *msg,
+                  uint16_t bi_history,
+                  uint16_t nhops,
+                  const struct DVPathEntryP *hops,
+                  struct GNUNET_TIME_Absolute in_time)
 {
-  struct CommunicatorMessageContext *cmc = cls;
-  struct BackchannelKeyState key;
-  struct GNUNET_HashCode hmac;
-  const char *hdr;
-  size_t hdr_len;
+  struct DVPathEntryP *dhops;
+  char buf[sizeof (struct TransportDVLearnMessage) +
+           (nhops + 1) * sizeof (struct DVPathEntryP)] GNUNET_ALIGN;
+  struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
+  struct GNUNET_TIME_Relative nnd;
 
-  if (0 != GNUNET_memcmp (&be->target, &GST_my_identity))
+  /* compute message for forwarding */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Forwarding DV learn message originating from %s to %s\n",
+              GNUNET_i2s (&msg->initiator),
+              GNUNET_i2s2 (next_hop));
+  GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
+  fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
+  fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
+                            (nhops + 1) * sizeof (struct DVPathEntryP));
+  fwd->num_hops = htons (nhops + 1);
+  fwd->bidirectional = htons (bi_history);
+  nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
+                                  GNUNET_TIME_relative_ntoh (
+                                    msg->non_network_delay));
+  fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
+  fwd->init_sig = msg->init_sig;
+  fwd->initiator = msg->initiator;
+  fwd->challenge = msg->challenge;
+  dhops = (struct DVPathEntryP *) &fwd[1];
+  GNUNET_memcpy (dhops, hops, sizeof (struct DVPathEntryP) * nhops);
+  dhops[nhops].hop = GST_my_identity;
   {
-    /* not for me, try to route to target */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Forwarding backtalk to %s\n",
-                GNUNET_i2s (&be->target));
-    route_message (&be->target,
-                   GNUNET_copy_message (&be->header),
-                   RMO_DV_ALLOWED);
-    finish_cmc_handling (cmc);
-    return;
-  }
-  /* FIXME: this should be done when decrypting _any_ DV
-     message, not only for backchannels! */
-  dh_key_derive_eph_pub (&be->ephemeral_key, &be->iv, &key);
-  hdr = (const char *) &be[1];
-  hdr_len = ntohs (be->header.size) - sizeof (*be);
-  bc_hmac (&key, &hmac, hdr, hdr_len);
-  if (0 != GNUNET_memcmp (&hmac, &be->hmac))
-  {
-    /* HMAC missmatch, disard! */
-    GNUNET_break_op (0);
-    finish_cmc_handling (cmc);
-    return;
-  }
-  /* begin actual decryption */
-  {
-    struct Backtalker *b;
-    struct GNUNET_TIME_Absolute monotime;
-    struct TransportBackchannelRequestPayloadP ppay;
-    char body[hdr_len - sizeof (ppay)];
-
-    GNUNET_assert (hdr_len >=
-                   sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
-    bc_decrypt (&key, &ppay, hdr, sizeof (ppay));
-    bc_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
-    bc_key_clean (&key);
-    monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Decrypted backtalk from %s\n",
-                GNUNET_i2s (&ppay.sender));
-    b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
-    if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
-    {
-      GNUNET_STATISTICS_update (
-        GST_stats,
-        "# Backchannel messages dropped: monotonic time not increasing",
-        1,
-        GNUNET_NO);
-      finish_cmc_handling (cmc);
-      return;
-    }
-    if ((NULL == b) ||
-        (0 != GNUNET_memcmp (&b->last_ephemeral, &be->ephemeral_key)))
-    {
-      /* Check signature */
-      struct EphemeralConfirmationPS ec;
-
-      ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
-      ec.purpose.size = htonl (sizeof (ec));
-      ec.target = GST_my_identity;
-      ec.ephemeral_key = be->ephemeral_key;
-      if (
-        GNUNET_OK !=
-        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
-                                    &ec.purpose,
-                                    &ppay.sender_sig,
-                                    &ppay.sender.public_key))
-      {
-        /* Signature invalid, disard! */
-        GNUNET_break_op (0);
-        finish_cmc_handling (cmc);
-        return;
-      }
-    }
-    if (NULL != b)
-    {
-      /* update key cache and mono time */
-      b->last_ephemeral = be->ephemeral_key;
-      b->monotonic_time = monotime;
-      update_backtalker_monotime (b);
-      forward_backchannel_payload (b, body, sizeof (body));
-      b->timeout =
-        GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
-      finish_cmc_handling (cmc);
-      return;
-    }
-    /* setup data structure to cache signature AND check
-       monotonic time with PEERSTORE before forwarding backchannel payload */
-    b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
-    b->pid = ppay.sender;
-    b->body_size = sizeof (body);
-    memcpy (&b[1], body, sizeof (body));
-    GNUNET_assert (GNUNET_YES ==
-                   GNUNET_CONTAINER_multipeermap_put (
-                     backtalkers,
-                     &b->pid,
-                     b,
-                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    b->monotonic_time = monotime; /* NOTE: to be checked still! */
-    b->cmc = cmc;
-    b->timeout =
-      GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
-    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
-    b->get =
-      GNUNET_PEERSTORE_iterate (peerstore,
-                                "transport",
-                                &b->pid,
-                                GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
-                                &backtalker_monotime_cb,
-                                b);
+    struct DvHopPS dhp = {.purpose.purpose =
+                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                          .purpose.size = htonl (sizeof (dhp)),
+                          .pred = dhops[nhops - 1].hop,
+                          .succ = *next_hop,
+                          .challenge = msg->challenge};
+
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                             &dhp.purpose,
+                                             &dhops[nhops].hop_sig));
   }
+  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
 }
 
 
 /**
- * Task called when we should check if any of the DV paths
- * we have learned to a target are due for garbage collection.
- *
- * Collects stale paths, and possibly frees the entire DV
- * entry if no paths are left. Otherwise re-schedules itself.
+ * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
  *
- * @param cls a `struct DistanceVector`
+ * @param sender_monotonic_time monotonic time of the initiator
+ * @param init the signer
+ * @param challenge the challenge that was signed
+ * @param init_sig signature presumably by @a init
+ * @return #GNUNET_OK if the signature is valid
  */
-static void
-path_cleanup_cb (void *cls)
+static int
+validate_dv_initiator_signature (
+  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
+  const struct GNUNET_PeerIdentity *init,
+  const struct ChallengeNonceP *challenge,
+  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
 {
-  struct DistanceVector *dv = cls;
-  struct DistanceVectorHop *pos;
+  struct DvInitPS ip = {.purpose.purpose = htonl (
+                          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+                        .purpose.size = htonl (sizeof (ip)),
+                        .monotonic_time = sender_monotonic_time,
+                        .challenge = *challenge};
 
-  dv->timeout_task = NULL;
-  while (NULL != (pos = dv->dv_head))
-  {
-    GNUNET_assert (dv == pos->dv);
-    if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
-      break;
-    free_distance_vector_hop (pos);
-  }
-  if (NULL == pos)
+  if (
+    GNUNET_OK !=
+    GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
+                                &ip.purpose,
+                                init_sig,
+                                &init->public_key))
   {
-    free_dv_route (dv);
-    return;
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  dv->timeout_task =
-    GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
+  return GNUNET_OK;
 }
 
 
 /**
- * The @a hop is a validated path to the respective target
- * peer and we should tell core about it -- and schedule
- * a job to revoke the state.
- *
- * @param hop a path to some peer that is the reason for activation
+ * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
  */
-static void
-activate_core_visible_dv_path (struct DistanceVectorHop *hop)
+struct NeighbourSelectionContext
 {
-  struct DistanceVector *dv = hop->dv;
-  struct VirtualLink *vl;
+  /**
+   * Original message we received.
+   */
+  const struct TransportDVLearnMessage *dvl;
 
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
-  if (NULL != vl)
-  {
-    /* Link was already up, remember dv is also now available and we are done */
-    vl->dv = dv;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Virtual link to %s could now also use DV!\n",
-                GNUNET_i2s (&dv->target));
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Creating new virtual link to %s using DV!\n",
-              GNUNET_i2s (&dv->target));
-  vl = GNUNET_new (struct VirtualLink);
-  vl->target = dv->target;
-  vl->dv = dv;
-  vl->core_recv_window = RECV_WINDOW_SIZE;
-  vl->visibility_task =
-    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CONTAINER_multipeermap_put (
-                  links,
-                  &vl->target,
-                  vl,
-                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  /* We lacked a confirmed connection to the target
-     before, so tell CORE about it (finally!) */
-  cores_send_connect_info (&dv->target);
-}
+  /**
+   * The hops taken.
+   */
+  const struct DVPathEntryP *hops;
+
+  /**
+   * Time we received the message.
+   */
+  struct GNUNET_TIME_Absolute in_time;
+
+  /**
+   * Offsets of the selected peers.
+   */
+  uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
+
+  /**
+   * Number of peers eligible for selection.
+   */
+  unsigned int num_eligible;
+
+  /**
+   * Number of peers that were selected for forwarding.
+   */
+  unsigned int num_selections;
+
+  /**
+   * Number of hops in @e hops
+   */
+  uint16_t nhops;
+
+  /**
+   * Bitmap of bidirectional connections encountered.
+   */
+  uint16_t bi_history;
+};
 
 
 /**
- * We have learned a @a path through the network to some other peer, add it to
- * our DV data structure (returning #GNUNET_YES on success).
- *
- * We do not add paths if we have a sufficient number of shorter
- * paths to this target already (returning #GNUNET_NO).
- *
- * We also do not add problematic paths, like those where we lack the first
- * hop in our neighbour list (i.e. due to a topology change) or where some
- * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
+ * Function called for each neighbour during #handle_dv_learn.
  *
- * @param path the path we learned, path[0] should be us,
- *             and then path contains a valid path from us to
- * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
- * @param path_len number of entries on the @a path, at least three!
- * @param network_latency how long does the message take from us to
- * `path[path_len-1]`? set to "forever" if unknown
- * @param path_valid_until how long is this path considered validated? Maybe
- * be zero.
- * @return #GNUNET_YES on success,
- *         #GNUNET_NO if we have better path(s) to the target
- *         #GNUNET_SYSERR if the path is useless and/or invalid
- *                         (i.e. path[1] not a direct neighbour
- *                        or path[i+1] is a direct neighbour for i>0)
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
  */
 static int
-learn_dv_path (const struct GNUNET_PeerIdentity *path,
-               unsigned int path_len,
-               struct GNUNET_TIME_Relative network_latency,
-               struct GNUNET_TIME_Absolute path_valid_until)
+dv_neighbour_selection (void *cls,
+                        const struct GNUNET_PeerIdentity *pid,
+                        void *value)
 {
-  struct DistanceVectorHop *hop;
-  struct DistanceVector *dv;
-  struct Neighbour *next_hop;
-  unsigned int shorter_distance;
+  struct NeighbourSelectionContext *nsc = cls;
 
-  if (path_len < 3)
-  {
-    /* what a boring path! not allowed! */
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
-  next_hop = lookup_neighbour (&path[1]);
-  if (NULL == next_hop)
+  (void) value;
+  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+    return GNUNET_YES; /* skip initiator */
+  for (unsigned int i = 0; i < nsc->nhops; i++)
+    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+      return GNUNET_YES; /* skip peers on path */
+  nsc->num_eligible++;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Function called for each neighbour during #handle_dv_learn.
+ * We call #forward_dv_learn() on the neighbour(s) selected
+ * during #dv_neighbour_selection().
+ *
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
+ */
+static int
+dv_neighbour_transmission (void *cls,
+                           const struct GNUNET_PeerIdentity *pid,
+                           void *value)
+{
+  struct NeighbourSelectionContext *nsc = cls;
+
+  (void) value;
+  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+    return GNUNET_YES; /* skip initiator */
+  for (unsigned int i = 0; i < nsc->nhops; i++)
+    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+      return GNUNET_YES; /* skip peers on path */
+  for (unsigned int i = 0; i < nsc->num_selections; i++)
+  {
+    if (nsc->selections[i] == nsc->num_eligible)
+    {
+      forward_dv_learn (pid,
+                        nsc->dvl,
+                        nsc->bi_history,
+                        nsc->nhops,
+                        nsc->hops,
+                        nsc->in_time);
+      break;
+    }
+  }
+  nsc->num_eligible++;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Computes the number of neighbours we should forward a DVInit
+ * message to given that it has so far taken @a hops_taken hops
+ * though the network and that the number of neighbours we have
+ * in total is @a neighbour_count, out of which @a eligible_count
+ * are not yet on the path.
+ *
+ * NOTE: technically we might want to include NSE in the formula to
+ * get a better grip on the overall network size. However, for now
+ * using NSE here would create a dependency issue in the build system.
+ * => Left for later, hardcoded to 50 for now.
+ *
+ * The goal of the fomula is that we want to reach a total of LOG(NSE)
+ * peers via DV (`target_total`).  We want the reach to be spread out
+ * over various distances to the origin, with a bias towards shorter
+ * distances.
+ *
+ * We make the strong assumption that the network topology looks
+ * "similar" at other hops, in particular the @a neighbour_count
+ * should be comparable at other hops.
+ *
+ * If the local neighbourhood is densely connected, we expect that @a
+ * eligible_count is close to @a neighbour_count minus @a hops_taken
+ * as a lot of the path is already known. In that case, we should
+ * forward to few(er) peers to try to find a path out of the
+ * neighbourhood. OTOH, if @a eligible_count is close to @a
+ * neighbour_count, we should forward to many peers as we are either
+ * still close to the origin (i.e.  @a hops_taken is small) or because
+ * we managed to get beyond a local cluster.  We express this as
+ * the `boost_factor` using the square of the fraction of eligible
+ * neighbours (so if only 50% are eligible, we boost by 1/4, but if
+ * 99% are eligible, the 'boost' will be almost 1).
+ *
+ * Second, the more hops we have taken, the larger the problem of an
+ * exponential traffic explosion gets.  So we take the `target_total`,
+ * and compute our degree such that at each distance d 2^{-d} peers
+ * are selected (corrected by the `boost_factor`).
+ *
+ * @param hops_taken number of hops DVInit has travelled so far
+ * @param neighbour_count number of neighbours we have in total
+ * @param eligible_count number of neighbours we could in
+ *        theory forward to
+ */
+static unsigned int
+calculate_fork_degree (unsigned int hops_taken,
+                       unsigned int neighbour_count,
+                       unsigned int eligible_count)
+{
+  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
+  double eligible_ratio =
+    ((double) eligible_count) / ((double) neighbour_count);
+  double boost_factor = eligible_ratio * eligible_ratio;
+  unsigned int rnd;
+  double left;
+
+  if (hops_taken >= 64)
   {
-    /* next hop must be a neighbour, otherwise this whole thing is useless! */
     GNUNET_break (0);
-    return GNUNET_SYSERR;
+    return 0; /* precaution given bitshift below */
   }
-  for (unsigned int i = 2; i < path_len; i++)
-    if (NULL != lookup_neighbour (&path[i]))
+  for (unsigned int i = 1; i < hops_taken; i++)
+  {
+    /* For each hop, subtract the expected number of targets
+       reached at distance d (so what remains divided by 2^d) */
+    target_total -= (target_total * boost_factor / (1LLU << i));
+  }
+  rnd =
+    (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
+  /* round up or down probabilistically depending on how close we were
+     when floor()ing to rnd */
+  left = target_total - (double) rnd;
+  if (UINT32_MAX * left >
+      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
+    rnd++; /* round up */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
+              hops_taken,
+              rnd,
+              eligible_count,
+              neighbour_count);
+  return rnd;
+}
+
+
+/**
+ * Function called when peerstore is done storing a DV monotonic time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param success #GNUNET_YES if peerstore was successful
+ */
+static void
+neighbour_store_dvmono_cb (void *cls, int success)
+{
+  struct Neighbour *n = cls;
+
+  n->sc = NULL;
+  if (GNUNET_YES != success)
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to store other peer's monotonic time in peerstore!\n");
+}
+
+
+/**
+ * Communicator gave us a DV learn message.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param dvl the message that was received
+ */
+static void
+handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
+  int bi_hop;
+  uint16_t nhops;
+  uint16_t bi_history;
+  const struct DVPathEntryP *hops;
+  int do_fwd;
+  int did_initiator;
+  struct GNUNET_TIME_Absolute in_time;
+  struct Neighbour *n;
+
+  nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
+  bi_history = ntohs (dvl->bidirectional);
+  hops = (const struct DVPathEntryP *) &dvl[1];
+  if (0 == nhops)
+  {
+    /* sanity check */
+    if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
+    {
+      GNUNET_break (0);
+      finish_cmc_handling (cmc);
+      return;
+    }
+  }
+  else
+  {
+    /* sanity check */
+    if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
+    {
+      GNUNET_break (0);
+      finish_cmc_handling (cmc);
+      return;
+    }
+  }
+
+  GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
+  cc = cmc->tc->details.communicator.cc;
+  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
+            cc); // FIXME: add bi-directional flag to cc?
+  in_time = GNUNET_TIME_absolute_get ();
+
+  /* continue communicator here, everything else can happen asynchronous! */
+  finish_cmc_handling (cmc);
+
+  n = lookup_neighbour (&dvl->initiator);
+  if (NULL != n)
+  {
+    if ((n->dv_monotime_available == GNUNET_YES) &&
+        (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
+         n->last_dv_learn_monotime.abs_value_us))
     {
-      /* Useless path: we have a direct connection to some hop
-         in the middle of the path, so this one is not even
-         terribly useful for redundancy */
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  "Path of %u hops useless: directly link to hop %u (%s)\n",
-                  path_len,
-                  i,
-                  GNUNET_i2s (&path[i]));
       GNUNET_STATISTICS_update (GST_stats,
-                                "# Useless DV path ignored: hop is neighbour",
+                                "# DV learn discarded due to time travel",
                                 1,
                                 GNUNET_NO);
-      return GNUNET_SYSERR;
+      return;
     }
-  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
-  if (NULL == dv)
+    if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
+                                                      &dvl->initiator,
+                                                      &dvl->challenge,
+                                                      &dvl->init_sig))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
+    if (GNUNET_YES == n->dv_monotime_available)
+    {
+      if (NULL != n->sc)
+        GNUNET_PEERSTORE_store_cancel (n->sc);
+      n->sc =
+        GNUNET_PEERSTORE_store (peerstore,
+                                "transport",
+                                &dvl->initiator,
+                                GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+                                &dvl->monotonic_time,
+                                sizeof (dvl->monotonic_time),
+                                GNUNET_TIME_UNIT_FOREVER_ABS,
+                                GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                                &neighbour_store_dvmono_cb,
+                                n);
+    }
+  }
+  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
+     If signature verification load too high, implement random drop strategy */
+  for (unsigned int i = 0; i < nhops; i++)
   {
-    dv = GNUNET_new (struct DistanceVector);
-    dv->target = path[path_len - 1];
-    dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
-                                                     &path_cleanup_cb,
-                                                     dv);
-    GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONTAINER_multipeermap_put (
-                     dv_routes,
-                     &dv->target,
-                     dv,
-                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    struct DvHopPS dhp = {.purpose.purpose =
+                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                          .purpose.size = htonl (sizeof (dhp)),
+                          .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+                          .succ = (nhops == i + 1) ? GST_my_identity
+                                                   : hops[i + 1].hop,
+                          .challenge = dvl->challenge};
+
+    if (GNUNET_OK !=
+        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
+                                    &dhp.purpose,
+                                    &hops[i].hop_sig,
+                                    &hops[i].hop.public_key))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
   }
-  /* Check if we have this path already! */
-  shorter_distance = 0;
-  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-       pos = pos->next_dv)
+
+  if (GNUNET_EXTRA_LOGGING > 0)
   {
-    if (pos->distance < path_len - 2)
-      shorter_distance++;
-    /* Note that the distances in 'pos' excludes us (path[0]) and
-       the next_hop (path[1]), so we need to subtract two
-       and check next_hop explicitly */
-    if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop))
+    char *path;
+
+    path = GNUNET_strdup (GNUNET_i2s (&dvl->initiator));
+    for (unsigned int i = 0; i < nhops; i++)
     {
-      int match = GNUNET_YES;
+      char *tmp;
 
-      for (unsigned int i = 0; i < pos->distance; i++)
+      GNUNET_asprintf (&tmp,
+                       "%s%s%s",
+                       path,
+                       (bi_history & (1 << (nhops - i))) ? "<->" : "-->",
+                       GNUNET_i2s (&hops[i].hop));
+      GNUNET_free (path);
+      path = tmp;
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received DVInit via %s%s%s\n",
+                path,
+                bi_hop ? "<->" : "-->",
+                GNUNET_i2s (&GST_my_identity));
+    GNUNET_free (path);
+  }
+
+  do_fwd = GNUNET_YES;
+  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
+  {
+    struct GNUNET_PeerIdentity path[nhops + 1];
+    struct GNUNET_TIME_Relative host_latency_sum;
+    struct GNUNET_TIME_Relative latency;
+    struct GNUNET_TIME_Relative network_latency;
+
+    /* We initiated this, learn the forward path! */
+    path[0] = GST_my_identity;
+    path[1] = hops[0].hop;
+    host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
+
+    // Need also something to lookup initiation time
+    // to compute RTT! -> add RTT argument here?
+    latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
+    // (based on dvl->challenge, we can identify time of origin!)
+
+    network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
+    /* assumption: latency on all links is the same */
+    network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
+
+    for (unsigned int i = 2; i <= nhops; i++)
+    {
+      struct GNUNET_TIME_Relative ilat;
+
+      /* assumption: linear latency increase per hop */
+      ilat = GNUNET_TIME_relative_multiply (network_latency, i);
+      path[i] = hops[i - 1].hop;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Learned path with %u hops to %s with latency %s\n",
+                  i,
+                  GNUNET_i2s (&path[i]),
+                  GNUNET_STRINGS_relative_time_to_string (ilat, GNUNET_YES));
+      learn_dv_path (path,
+                     i,
+                     ilat,
+                     GNUNET_TIME_relative_to_absolute (
+                       ADDRESS_VALIDATION_LIFETIME));
+    }
+    /* as we initiated, do not forward again (would be circular!) */
+    do_fwd = GNUNET_NO;
+    return;
+  }
+  if (bi_hop)
+  {
+    /* last hop was bi-directional, we could learn something here! */
+    struct GNUNET_PeerIdentity path[nhops + 2];
+
+    path[0] = GST_my_identity;
+    path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
+    for (unsigned int i = 0; i < nhops; i++)
+    {
+      int iret;
+
+      if (0 == (bi_history & (1 << i)))
+        break; /* i-th hop not bi-directional, stop learning! */
+      if (i == nhops)
       {
-        if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
-        {
-          match = GNUNET_NO;
-          break;
-        }
+        path[i + 2] = dvl->initiator;
       }
-      if (GNUNET_YES == match)
+      else
       {
-        struct GNUNET_TIME_Relative last_timeout;
+        path[i + 2] = hops[nhops - i - 2].hop;
+      }
 
-        /* Re-discovered known path, update timeout */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Learned inverse path with %u hops to %s\n",
+                  i + 1,
+                  GNUNET_i2s (&path[i + 2]));
+      iret = learn_dv_path (path,
+                            i + 2,
+                            GNUNET_TIME_UNIT_FOREVER_REL,
+                            GNUNET_TIME_UNIT_ZERO_ABS);
+      if (GNUNET_SYSERR == iret)
+      {
+        /* path invalid or too long to be interesting for US, thus should also
+           not be interesting to our neighbours, cut path when forwarding to
+           'i' hops, except of course for the one that goes back to the
+           initiator */
         GNUNET_STATISTICS_update (GST_stats,
-                                  "# Known DV path refreshed",
+                                  "# DV learn not forwarded due invalidity of path",
                                   1,
                                   GNUNET_NO);
-        last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
-        pos->timeout =
-          GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
-        pos->path_valid_until =
-          GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
-        GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
-        GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
-        if (0 <
-            GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
-          activate_core_visible_dv_path (pos);
-        if (last_timeout.rel_value_us <
-            GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
-                                           DV_PATH_DISCOVERY_FREQUENCY)
-              .rel_value_us)
-        {
-          /* Some peer send DV learn messages too often, we are learning
-             the same path faster than it would be useful; do not forward! */
-          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                      "Rediscovered path too quickly, not forwarding further\n");
-          return GNUNET_NO;
-        }
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "Refreshed known path to %s, forwarding further\n",
-                    GNUNET_i2s (&dv->target));
-        return GNUNET_YES;
+        do_fwd = GNUNET_NO;
+        break;
+      }
+      if ((GNUNET_NO == iret) && (nhops == i + 1))
+      {
+        /* we have better paths, and this is the longest target,
+           so there cannot be anything interesting later */
+        GNUNET_STATISTICS_update (GST_stats,
+                                  "# DV learn not forwarded, got better paths",
+                                  1,
+                                  GNUNET_NO);
+        do_fwd = GNUNET_NO;
+        break;
       }
     }
   }
-  /* Count how many shorter paths we have (incl. direct
-     neighbours) before simply giving up on this one! */
-  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
+
+  if (MAX_DV_HOPS_ALLOWED == nhops)
+  {
+    /* At limit, we're out of here! */
+    finish_cmc_handling (cmc);
+    return;
+  }
+
+  /* Forward to initiator, if path non-trivial and possible */
+  bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
+  did_initiator = GNUNET_NO;
+  if ((1 < nhops) &&
+      (GNUNET_YES ==
+       GNUNET_CONTAINER_multipeermap_contains (neighbours, &dvl->initiator)))
+  {
+    /* send back to origin! */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending DVL back to initiator %s\n",
+                GNUNET_i2s (&dvl->initiator));
+    forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
+    did_initiator = GNUNET_YES;
+  }
+  /* We forward under two conditions: either we still learned something
+     ourselves (do_fwd), or the path was darn short and thus the initiator is
+     likely to still be very interested in this (and we did NOT already
+     send it back to the initiator) */
+  if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
+                   (GNUNET_NO == did_initiator)))
   {
-    /* We have a shorter path already! */
+    /* Pick random neighbours that are not yet on the path */
+    struct NeighbourSelectionContext nsc;
+    unsigned int n_cnt;
+
+    n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
+    nsc.nhops = nhops;
+    nsc.dvl = dvl;
+    nsc.bi_history = bi_history;
+    nsc.hops = hops;
+    nsc.in_time = in_time;
+    nsc.num_eligible = 0;
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &dv_neighbour_selection,
+                                           &nsc);
+    if (0 == nsc.num_eligible)
+      return; /* done here, cannot forward to anyone else */
+    nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
+    nsc.num_selections =
+      GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Have many shorter DV paths %s, not forwarding further\n",
-                GNUNET_i2s (&dv->target));
-    return GNUNET_NO;
+                "Forwarding DVL to %u other peers\n",
+                nsc.num_selections);
+    for (unsigned int i = 0; i < nsc.num_selections; i++)
+      nsc.selections[i] =
+        (nsc.num_selections == n_cnt)
+          ? i /* all were selected, avoid collisions by chance */
+          : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
+    nsc.num_eligible = 0;
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &dv_neighbour_transmission,
+                                           &nsc);
   }
-  /* create new DV path entry */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Discovered new DV path to %s\n",
-              GNUNET_i2s (&dv->target));
-  hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
-                       sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
-  hop->next_hop = next_hop;
-  hop->dv = dv;
-  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
-  memcpy (&hop[1],
-          &path[2],
-          sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
-  hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
-  hop->path_valid_until = path_valid_until;
-  hop->distance = path_len - 2;
-  hop->pd.aged_rtt = network_latency;
-  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
-  GNUNET_CONTAINER_MDLL_insert (neighbour,
-                                next_hop->dv_head,
-                                next_hop->dv_tail,
-                                hop);
-  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
-    activate_core_visible_dv_path (hop);
-  return GNUNET_YES;
 }
 
 
 /**
- * Communicator gave us a DV learn message.  Check the message.
+ * Communicator gave us a DV box.  Check the message.
  *
  * @param cls a `struct CommunicatorMessageContext`
- * @param dvl the send message that was sent
+ * @param dvb the send message that was sent
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
+check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
-  uint16_t size = ntohs (dvl->header.size);
-  uint16_t num_hops = ntohs (dvl->num_hops);
-  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
+  uint16_t size = ntohs (dvb->header.size);
+  uint16_t num_hops = ntohs (dvb->num_hops);
+  const struct GNUNET_PeerIdentity *hops =
+    (const struct GNUNET_PeerIdentity *) &dvb[1];
 
   (void) cls;
-  if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (num_hops > MAX_DV_HOPS_ALLOWED)
+  if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) +
+               sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
+  /* This peer must not be on the path */
   for (unsigned int i = 0; i < num_hops; i++)
-  {
-    if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
+    if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
     {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
-    if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  }
   return GNUNET_YES;
 }
 
 
 /**
- * Build and forward a DV learn message to @a next_hop.
+ * Create a DV Box message and queue it for transmission to
+ * @ea next_hop.
  *
- * @param next_hop peer to send the message to
- * @param msg message received
- * @param bi_history bitmask specifying hops on path that were bidirectional
- * @param nhops length of the @a hops array
- * @param hops path the message traversed so far
- * @param in_time when did we receive the message, used to calculate network
- * delay
+ * @param next_hop peer to receive the message next
+ * @param total_hops how many hops did the message take so far
+ * @param num_hops length of the @a hops array
+ * @param origin origin of the message
+ * @param hops next peer(s) to the destination, including destination
+ * @param payload payload of the box
+ * @param payload_size number of bytes in @a payload
  */
 static void
-forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
-                  const struct TransportDVLearnMessage *msg,
-                  uint16_t bi_history,
-                  uint16_t nhops,
-                  const struct DVPathEntryP *hops,
-                  struct GNUNET_TIME_Absolute in_time)
+forward_dv_box (struct Neighbour *next_hop,
+                const struct TransportDVBoxMessage *hdr,
+                uint16_t total_hops,
+                uint16_t num_hops,
+                const struct GNUNET_PeerIdentity *hops,
+                const void *enc_payload,
+                uint16_t enc_payload_size)
 {
-  struct DVPathEntryP *dhops;
-  struct TransportDVLearnMessage *fwd;
-  struct GNUNET_TIME_Relative nnd;
-
-  /* compute message for forwarding */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Forwarding DV learn message originating from %s to %s\n",
-              GNUNET_i2s (&msg->initiator),
-              GNUNET_i2s2 (next_hop));
-  GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
-  fwd = GNUNET_malloc (sizeof (struct TransportDVLearnMessage) +
-                       (nhops + 1) * sizeof (struct DVPathEntryP));
-  fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
-  fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
-                            (nhops + 1) * sizeof (struct DVPathEntryP));
-  fwd->num_hops = htons (nhops + 1);
-  fwd->bidirectional = htons (bi_history);
-  nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
-                                  GNUNET_TIME_relative_ntoh (
-                                    msg->non_network_delay));
-  fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
-  fwd->init_sig = msg->init_sig;
-  fwd->initiator = msg->initiator;
-  fwd->challenge = msg->challenge;
-  dhops = (struct DVPathEntryP *) &fwd[1];
-  GNUNET_memcpy (dhops, hops, sizeof (struct DVPathEntryP) * nhops);
-  dhops[nhops].hop = GST_my_identity;
-  {
-    struct DvHopPS dhp = {.purpose.purpose =
-                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
-                          .purpose.size = htonl (sizeof (dhp)),
-                          .pred = dhops[nhops - 1].hop,
-                          .succ = *next_hop,
-                          .challenge = msg->challenge};
+  char buf[sizeof (struct TransportDVBoxMessage) +
+           num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size];
+  struct GNUNET_PeerIdentity *dhops =
+    (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
 
-    GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                             &dhp.purpose,
-                                             &dhops[nhops].hop_sig));
-  }
-  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+  memcpy (buf, hdr, sizeof (*hdr));
+  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
+  route_message (&next_hop->pid,
+                 (const struct GNUNET_MessageHeader *) buf,
+                 RMO_NONE);
 }
 
 
 /**
- * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
+ * Free data structures associated with @a b.
  *
- * @param sender_monotonic_time monotonic time of the initiator
- * @param init the signer
- * @param challenge the challenge that was signed
- * @param init_sig signature presumably by @a init
- * @return #GNUNET_OK if the signature is valid
- */
-static int
-validate_dv_initiator_signature (
-  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
-  const struct GNUNET_PeerIdentity *init,
-  const struct ChallengeNonceP *challenge,
-  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
-{
-  struct DvInitPS ip = {.purpose.purpose = htonl (
-                          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
-                        .purpose.size = htonl (sizeof (ip)),
-                        .monotonic_time = sender_monotonic_time,
-                        .challenge = *challenge};
-
-  if (
-    GNUNET_OK !=
-    GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
-                                &ip.purpose,
-                                init_sig,
-                                &init->public_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
+ * @param b data structure to release
  */
-struct NeighbourSelectionContext
-{
-  /**
-   * Original message we received.
-   */
-  const struct TransportDVLearnMessage *dvl;
-
-  /**
-   * The hops taken.
-   */
-  const struct DVPathEntryP *hops;
-
-  /**
-   * Time we received the message.
-   */
-  struct GNUNET_TIME_Absolute in_time;
-
-  /**
-   * Offsets of the selected peers.
-   */
-  uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
-
-  /**
-   * Number of peers eligible for selection.
-   */
-  unsigned int num_eligible;
-
-  /**
-   * Number of peers that were selected for forwarding.
-   */
-  unsigned int num_selections;
-
-  /**
-   * Number of hops in @e hops
-   */
-  uint16_t nhops;
-
-  /**
-   * Bitmap of bidirectional connections encountered.
-   */
-  uint16_t bi_history;
-};
+static void
+free_backtalker (struct Backtalker *b)
+{
+  if (NULL != b->get)
+  {
+    GNUNET_PEERSTORE_iterate_cancel (b->get);
+    b->get = NULL;
+    GNUNET_assert (NULL != b->cmc);
+    finish_cmc_handling (b->cmc);
+    b->cmc = NULL;
+  }
+  if (NULL != b->task)
+  {
+    GNUNET_SCHEDULER_cancel (b->task);
+    b->task = NULL;
+  }
+  if (NULL != b->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (b->sc);
+    b->sc = NULL;
+  }
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
+  GNUNET_free (b);
+}
 
 
 /**
- * Function called for each neighbour during #handle_dv_learn.
+ * Callback to free backtalker records.
  *
- * @param cls a `struct NeighbourSelectionContext *`
- * @param pid identity of the peer
- * @param value a `struct Neighbour`
- * @return #GNUNET_YES (always)
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct Backtalker`
+ * @return #GNUNET_OK (always)
  */
 static int
-dv_neighbour_selection (void *cls,
-                        const struct GNUNET_PeerIdentity *pid,
-                        void *value)
+free_backtalker_cb (void *cls,
+                    const struct GNUNET_PeerIdentity *pid,
+                    void *value)
 {
-  struct NeighbourSelectionContext *nsc = cls;
+  struct Backtalker *b = value;
 
-  (void) value;
-  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
-    return GNUNET_YES; /* skip initiator */
-  for (unsigned int i = 0; i < nsc->nhops; i++)
-    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
-      return GNUNET_YES; /* skip peers on path */
-  nsc->num_eligible++;
-  return GNUNET_YES;
+  (void) cls;
+  (void) pid;
+  free_backtalker (b);
+  return GNUNET_OK;
 }
 
 
 /**
- * Function called for each neighbour during #handle_dv_learn.
- * We call #forward_dv_learn() on the neighbour(s) selected
- * during #dv_neighbour_selection().
+ * Function called when it is time to clean up a backtalker.
  *
- * @param cls a `struct NeighbourSelectionContext *`
- * @param pid identity of the peer
- * @param value a `struct Neighbour`
- * @return #GNUNET_YES (always)
+ * @param cls a `struct Backtalker`
  */
-static int
-dv_neighbour_transmission (void *cls,
-                           const struct GNUNET_PeerIdentity *pid,
-                           void *value)
+static void
+backtalker_timeout_cb (void *cls)
 {
-  struct NeighbourSelectionContext *nsc = cls;
+  struct Backtalker *b = cls;
 
-  (void) value;
-  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
-    return GNUNET_YES; /* skip initiator */
-  for (unsigned int i = 0; i < nsc->nhops; i++)
-    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
-      return GNUNET_YES; /* skip peers on path */
-  for (unsigned int i = 0; i < nsc->num_selections; i++)
+  b->task = NULL;
+  if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
   {
-    if (nsc->selections[i] == nsc->num_eligible)
-    {
-      forward_dv_learn (pid,
-                        nsc->dvl,
-                        nsc->bi_history,
-                        nsc->nhops,
-                        nsc->hops,
-                        nsc->in_time);
-      break;
-    }
+    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    return;
   }
-  nsc->num_eligible++;
-  return GNUNET_YES;
+  GNUNET_assert (NULL == b->sc);
+  free_backtalker (b);
 }
 
 
 /**
- * Computes the number of neighbours we should forward a DVInit
- * message to given that it has so far taken @a hops_taken hops
- * though the network and that the number of neighbours we have
- * in total is @a neighbour_count, out of which @a eligible_count
- * are not yet on the path.
- *
- * NOTE: technically we might want to include NSE in the formula to
- * get a better grip on the overall network size. However, for now
- * using NSE here would create a dependency issue in the build system.
- * => Left for later, hardcoded to 50 for now.
- *
- * The goal of the fomula is that we want to reach a total of LOG(NSE)
- * peers via DV (`target_total`).  We want the reach to be spread out
- * over various distances to the origin, with a bias towards shorter
- * distances.
- *
- * We make the strong assumption that the network topology looks
- * "similar" at other hops, in particular the @a neighbour_count
- * should be comparable at other hops.
- *
- * If the local neighbourhood is densely connected, we expect that @a
- * eligible_count is close to @a neighbour_count minus @a hops_taken
- * as a lot of the path is already known. In that case, we should
- * forward to few(er) peers to try to find a path out of the
- * neighbourhood. OTOH, if @a eligible_count is close to @a
- * neighbour_count, we should forward to many peers as we are either
- * still close to the origin (i.e.  @a hops_taken is small) or because
- * we managed to get beyond a local cluster.  We express this as
- * the `boost_factor` using the square of the fraction of eligible
- * neighbours (so if only 50% are eligible, we boost by 1/4, but if
- * 99% are eligible, the 'boost' will be almost 1).
- *
- * Second, the more hops we have taken, the larger the problem of an
- * exponential traffic explosion gets.  So we take the `target_total`,
- * and compute our degree such that at each distance d 2^{-d} peers
- * are selected (corrected by the `boost_factor`).
+ * Function called with the monotonic time of a backtalker
+ * by PEERSTORE. Updates the time and continues processing.
  *
- * @param hops_taken number of hops DVInit has travelled so far
- * @param neighbour_count number of neighbours we have in total
- * @param eligible_count number of neighbours we could in
- *        theory forward to
+ * @param cls a `struct Backtalker`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
  */
-static unsigned int
-calculate_fork_degree (unsigned int hops_taken,
-                       unsigned int neighbour_count,
-                       unsigned int eligible_count)
+static void
+backtalker_monotime_cb (void *cls,
+                        const struct GNUNET_PEERSTORE_Record *record,
+                        const char *emsg)
 {
-  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
-  double eligible_ratio =
-    ((double) eligible_count) / ((double) neighbour_count);
-  double boost_factor = eligible_ratio * eligible_ratio;
-  unsigned int rnd;
-  double left;
+  struct Backtalker *b = cls;
+  struct GNUNET_TIME_AbsoluteNBO *mtbe;
+  struct GNUNET_TIME_Absolute mt;
 
-  if (hops_taken >= 64)
+  (void) emsg;
+  if (NULL == record)
+  {
+    /* we're done with #backtalker_monotime_cb() invocations,
+       continue normal processing */
+    b->get = NULL;
+    GNUNET_assert (NULL != b->cmc);
+    if (0 != b->body_size)
+      demultiplex_with_cmc (b->cmc,
+                            (const struct GNUNET_MessageHeader *) &b[1]);
+    else
+      finish_cmc_handling (b->cmc);
+    b->cmc = NULL;
+    return;
+  }
+  if (sizeof (*mtbe) != record->value_size)
   {
     GNUNET_break (0);
-    return 0; /* precaution given bitshift below */
+    return;
   }
-  for (unsigned int i = 1; i < hops_taken; i++)
+  mtbe = record->value;
+  mt = GNUNET_TIME_absolute_ntoh (*mtbe);
+  if (mt.abs_value_us > b->monotonic_time.abs_value_us)
   {
-    /* For each hop, subtract the expected number of targets
-       reached at distance d (so what remains divided by 2^d) */
-    target_total -= (target_total * boost_factor / (1LLU << i));
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Backtalker message from %s dropped, monotime in the past\n",
+                GNUNET_i2s (&b->pid));
+    GNUNET_STATISTICS_update (
+      GST_stats,
+      "# Backchannel messages dropped: monotonic time not increasing",
+      1,
+      GNUNET_NO);
+    b->monotonic_time = mt;
+    /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+     */
+    b->body_size = 0;
+    return;
   }
-  rnd =
-    (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
-  /* round up or down probabilistically depending on how close we were
-     when floor()ing to rnd */
-  left = target_total - (double) rnd;
-  if (UINT32_MAX * left >
-      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
-    rnd++; /* round up */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
-              hops_taken,
-              rnd,
-              eligible_count,
-              neighbour_count);
-  return rnd;
 }
 
 
 /**
- * Function called when peerstore is done storing a DV monotonic time.
+ * Function called by PEERSTORE when the store operation of
+ * a backtalker's monotonic time is complete.
  *
- * @param cls a `struct Neighbour`
- * @param success #GNUNET_YES if peerstore was successful
+ * @param cls the `struct Backtalker`
+ * @param success #GNUNET_OK on success
  */
 static void
-neighbour_store_dvmono_cb (void *cls, int success)
+backtalker_monotime_store_cb (void *cls, int success)
 {
-  struct Neighbour *n = cls;
+  struct Backtalker *b = cls;
 
-  n->sc = NULL;
-  if (GNUNET_YES != success)
+  if (GNUNET_OK != success)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to store other peer's monotonic time in peerstore!\n");
+                "Failed to store backtalker's monotonic time in PEERSTORE!\n");
+  }
+  b->sc = NULL;
+  b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+}
+
+
+/**
+ * The backtalker @a b monotonic time changed. Update PEERSTORE.
+ *
+ * @param b a backtalker with updated monotonic time
+ */
+static void
+update_backtalker_monotime (struct Backtalker *b)
+{
+  struct GNUNET_TIME_AbsoluteNBO mtbe;
+
+  if (NULL != b->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (b->sc);
+    b->sc = NULL;
+  }
+  else
+  {
+    GNUNET_SCHEDULER_cancel (b->task);
+    b->task = NULL;
+  }
+  mtbe = GNUNET_TIME_absolute_hton (b->monotonic_time);
+  b->sc =
+    GNUNET_PEERSTORE_store (peerstore,
+                            "transport",
+                            &b->pid,
+                            GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+                            &mtbe,
+                            sizeof (mtbe),
+                            GNUNET_TIME_UNIT_FOREVER_ABS,
+                            GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                            &backtalker_monotime_store_cb,
+                            b);
 }
 
 
 /**
- * Communicator gave us a DV learn message.  Process the request.
+ * Communicator gave us a DV box.  Process the request.
  *
  * @param cls a `struct CommunicatorMessageContext` (must call
  * #finish_cmc_handling() when done)
- * @param dvl the message that was received
+ * @param dvb the message that was received
  */
 static void
-handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
+handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
-  int bi_hop;
-  uint16_t nhops;
-  uint16_t bi_history;
-  const struct DVPathEntryP *hops;
-  int do_fwd;
-  int did_initiator;
-  struct GNUNET_TIME_Absolute in_time;
-  struct Neighbour *n;
-
-  nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
-  bi_history = ntohs (dvl->bidirectional);
-  hops = (const struct DVPathEntryP *) &dvl[1];
-  if (0 == nhops)
-  {
-    /* sanity check */
-    if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
-    {
-      GNUNET_break (0);
-      finish_cmc_handling (cmc);
-      return;
-    }
-  }
-  else
-  {
-    /* sanity check */
-    if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
-    {
-      GNUNET_break (0);
-      finish_cmc_handling (cmc);
-      return;
-    }
-  }
-
-  GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
-  cc = cmc->tc->details.communicator.cc;
-  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
-            cc); // FIXME: add bi-directional flag to cc?
-  in_time = GNUNET_TIME_absolute_get ();
-
-  /* continue communicator here, everything else can happen asynchronous! */
-  finish_cmc_handling (cmc);
-
-  n = lookup_neighbour (&dvl->initiator);
-  if (NULL != n)
-  {
-    if ((n->dv_monotime_available == GNUNET_YES) &&
-        (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
-         n->last_dv_learn_monotime.abs_value_us))
-    {
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# DV learn discarded due to time travel",
-                                1,
-                                GNUNET_NO);
-      return;
-    }
-    if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
-                                                      &dvl->initiator,
-                                                      &dvl->challenge,
-                                                      &dvl->init_sig))
-    {
-      GNUNET_break_op (0);
-      return;
-    }
-    n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
-    if (GNUNET_YES == n->dv_monotime_available)
-    {
-      if (NULL != n->sc)
-        GNUNET_PEERSTORE_store_cancel (n->sc);
-      n->sc =
-        GNUNET_PEERSTORE_store (peerstore,
-                                "transport",
-                                &dvl->initiator,
-                                GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
-                                &dvl->monotonic_time,
-                                sizeof (dvl->monotonic_time),
-                                GNUNET_TIME_UNIT_FOREVER_ABS,
-                                GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                                &neighbour_store_dvmono_cb,
-                                n);
-    }
-  }
-  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
-     If signature verification load too high, implement random drop strategy */
-  for (unsigned int i = 0; i < nhops; i++)
-  {
-    struct DvHopPS dhp = {.purpose.purpose =
-                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
-                          .purpose.size = htonl (sizeof (dhp)),
-                          .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
-                          .succ = (nhops - 1 == i) ? GST_my_identity
-                                                   : hops[i + 1].hop,
-                          .challenge = dvl->challenge};
-
-    if (GNUNET_OK !=
-        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
-                                    &dhp.purpose,
-                                    &hops[i].hop_sig,
-                                    &hops[i].hop.public_key))
-    {
-      GNUNET_break_op (0);
-      return;
-    }
-  }
+  uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
+  uint16_t num_hops = ntohs (dvb->num_hops);
+  const struct GNUNET_PeerIdentity *hops =
+    (const struct GNUNET_PeerIdentity *) &dvb[1];
+  const char *enc_payload = (const char *) &hops[num_hops];
+  uint16_t enc_payload_size =
+    size - (num_hops * sizeof (struct GNUNET_PeerIdentity));
+  struct DVKeyState key;
+  struct GNUNET_HashCode hmac;
+  const char *hdr;
+  size_t hdr_len;
 
   if (GNUNET_EXTRA_LOGGING > 0)
   {
     char *path;
 
-    path = GNUNET_strdup (GNUNET_i2s (&dvl->initiator));
-    for (unsigned int i = 0; i < nhops; i++)
+    path = GNUNET_strdup (GNUNET_i2s (&GST_my_identity));
+    for (unsigned int i = 0; i < num_hops; i++)
     {
       char *tmp;
 
-      GNUNET_asprintf (&tmp,
-                       "%s%s%s",
-                       path,
-                       (bi_history & (1 << (nhops - i))) ? "<->" : "-->",
-                       GNUNET_i2s (&hops[i].hop));
+      GNUNET_asprintf (&tmp, "%s->%s", path, GNUNET_i2s (&hops[i]));
       GNUNET_free (path);
       path = tmp;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received DVInit via %s%s%s\n",
-                path,
-                bi_hop ? "<->" : "-->",
-                GNUNET_i2s (&GST_my_identity));
+                "Received DVBox with remainig path %s\n",
+                path);
     GNUNET_free (path);
   }
 
-  do_fwd = GNUNET_YES;
-  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
-  {
-    struct GNUNET_PeerIdentity path[nhops + 1];
-    struct GNUNET_TIME_Relative host_latency_sum;
-    struct GNUNET_TIME_Relative latency;
-    struct GNUNET_TIME_Relative network_latency;
-
-    /* We initiated this, learn the forward path! */
-    path[0] = GST_my_identity;
-    path[1] = hops[0].hop;
-    host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
-
-    // Need also something to lookup initiation time
-    // to compute RTT! -> add RTT argument here?
-    latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
-    // (based on dvl->challenge, we can identify time of origin!)
-
-    network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
-    /* assumption: latency on all links is the same */
-    network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
-
-    for (unsigned int i = 2; i <= nhops; i++)
-    {
-      struct GNUNET_TIME_Relative ilat;
-
-      /* assumption: linear latency increase per hop */
-      ilat = GNUNET_TIME_relative_multiply (network_latency, i);
-      path[i] = hops[i - 1].hop;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Learned path with %u hops to %s with latency %s\n",
-                  i,
-                  GNUNET_i2s (&path[i]),
-                  GNUNET_STRINGS_relative_time_to_string (ilat, GNUNET_YES));
-      learn_dv_path (path,
-                     i,
-                     ilat,
-                     GNUNET_TIME_relative_to_absolute (
-                       ADDRESS_VALIDATION_LIFETIME));
-    }
-    /* as we initiated, do not forward again (would be circular!) */
-    do_fwd = GNUNET_NO;
-    return;
-  }
-  if (bi_hop)
+  if (num_hops > 0)
   {
-    /* last hop was bi-directional, we could learn something here! */
-    struct GNUNET_PeerIdentity path[nhops + 2];
-
-    path[0] = GST_my_identity;
-    path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
-    for (unsigned int i = 0; i < nhops; i++)
+    /* We're trying from the end of the hops array, as we may be
+       able to find a shortcut unknown to the origin that way */
+    for (int i = num_hops - 1; i >= 0; i--)
     {
-      int iret;
+      struct Neighbour *n;
 
-      if (0 == (bi_history & (1 << i)))
-        break; /* i-th hop not bi-directional, stop learning! */
-      if (i == nhops)
-      {
-        path[i + 2] = dvl->initiator;
-      }
-      else
+      if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
       {
-        path[i + 2] = hops[nhops - i - 2].hop;
+        GNUNET_break_op (0);
+        finish_cmc_handling (cmc);
+        return;
       }
-
+      n = lookup_neighbour (&hops[i]);
+      if (NULL == n)
+        continue;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Learned inverse path with %u hops to %s\n",
-                  i + 1,
-                  GNUNET_i2s (&path[i + 2]));
-      iret = learn_dv_path (path,
-                            i + 2,
-                            GNUNET_TIME_UNIT_FOREVER_REL,
-                            GNUNET_TIME_UNIT_ZERO_ABS);
-      if (GNUNET_SYSERR == iret)
-      {
-        /* path invalid or too long to be interesting for US, thus should also
-           not be interesting to our neighbours, cut path when forwarding to
-           'i' hops, except of course for the one that goes back to the
-           initiator */
-        GNUNET_STATISTICS_update (GST_stats,
-                                  "# DV learn not forwarded due invalidity of path",
-                                  1,
-                                  GNUNET_NO);
-        do_fwd = GNUNET_NO;
-        break;
-      }
-      if ((GNUNET_NO == iret) && (nhops == i + 1))
-      {
-        /* we have better paths, and this is the longest target,
-           so there cannot be anything interesting later */
-        GNUNET_STATISTICS_update (GST_stats,
-                                  "# DV learn not forwarded, got better paths",
-                                  1,
-                                  GNUNET_NO);
-        do_fwd = GNUNET_NO;
-        break;
-      }
+                  "Skipping %u/%u hops ahead while routing DV Box\n",
+                  i,
+                  num_hops);
+      forward_dv_box (n,
+                      dvb,
+                      ntohs (dvb->total_hops) + 1,
+                      num_hops - i - 1, /* number of hops left */
+                      &hops[i + 1], /* remaining hops */
+                      enc_payload,
+                      enc_payload_size);
+      GNUNET_STATISTICS_update (GST_stats,
+                                "# DV hops skipped routing boxes",
+                                i,
+                                GNUNET_NO);
+      GNUNET_STATISTICS_update (GST_stats,
+                                "# DV boxes routed (total)",
+                                1,
+                                GNUNET_NO);
+      finish_cmc_handling (cmc);
+      return;
     }
+    /* Woopsie, next hop not in neighbours, drop! */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# DV Boxes dropped: next hop unknown",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
   }
+  /* We are the target. Unbox and handle message. */
+  GNUNET_STATISTICS_update (GST_stats,
+                            "# DV boxes opened (ultimate target)",
+                            1,
+                            GNUNET_NO);
+  cmc->total_hops = ntohs (dvb->total_hops);
 
-  if (MAX_DV_HOPS_ALLOWED == nhops)
+  dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, &key);
+  hdr = (const char *) &dvb[1];
+  hdr_len = ntohs (dvb->header.size) - sizeof (*dvb);
+  dv_hmac (&key, &hmac, hdr, hdr_len);
+  if (0 != GNUNET_memcmp (&hmac, &dvb->hmac))
   {
-    /* At limit, we're out of here! */
+    /* HMAC missmatch, disard! */
+    GNUNET_break_op (0);
     finish_cmc_handling (cmc);
     return;
   }
-
-  /* Forward to initiator, if path non-trivial and possible */
-  bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
-  did_initiator = GNUNET_NO;
-  if ((1 < nhops) &&
-      (GNUNET_YES ==
-       GNUNET_CONTAINER_multipeermap_contains (neighbours, &dvl->initiator)))
+  /* begin actual decryption */
   {
-    /* send back to origin! */
+    struct Backtalker *b;
+    struct GNUNET_TIME_Absolute monotime;
+    struct TransportDVBoxPayloadP ppay;
+    char body[hdr_len - sizeof (ppay)] GNUNET_ALIGN;
+    const struct GNUNET_MessageHeader *mh =
+      (const struct GNUNET_MessageHeader *) body;
+
+    GNUNET_assert (hdr_len >=
+                   sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
+    dv_decrypt (&key, &ppay, hdr, sizeof (ppay));
+    dv_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
+    dv_key_clean (&key);
+    if (ntohs (mh->size) != sizeof (body))
+    {
+      GNUNET_break_op (0);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    /* need to prevent box-in-a-box (and DV_LEARN) so check inbox type! */
+    switch (ntohs (mh->type))
+    {
+    case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX:
+      GNUNET_break_op (0);
+      finish_cmc_handling (cmc);
+      return;
+    case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN:
+      GNUNET_break_op (0);
+      finish_cmc_handling (cmc);
+      return;
+    default:
+      /* permitted, continue */
+      break;
+    }
+    monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sending DVL back to initiator %s\n",
-                GNUNET_i2s (&dvl->initiator));
-    forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
-    did_initiator = GNUNET_YES;
-  }
-  /* We forward under two conditions: either we still learned something
-     ourselves (do_fwd), or the path was darn short and thus the initiator is
-     likely to still be very interested in this (and we did NOT already
-     send it back to the initiator) */
-  if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
-                   (GNUNET_NO == did_initiator)))
-  {
-    /* Pick random neighbours that are not yet on the path */
-    struct NeighbourSelectionContext nsc;
-    unsigned int n_cnt;
+                "Decrypted backtalk from %s\n",
+                GNUNET_i2s (&ppay.sender));
+    b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
+    if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
+    {
+      GNUNET_STATISTICS_update (
+        GST_stats,
+        "# Backchannel messages dropped: monotonic time not increasing",
+        1,
+        GNUNET_NO);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    if ((NULL == b) ||
+        (0 != GNUNET_memcmp (&b->last_ephemeral, &dvb->ephemeral_key)))
+    {
+      /* Check signature */
+      struct EphemeralConfirmationPS ec;
 
-    n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
-    nsc.nhops = nhops;
-    nsc.dvl = dvl;
-    nsc.bi_history = bi_history;
-    nsc.hops = hops;
-    nsc.in_time = in_time;
-    nsc.num_eligible = 0;
-    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
-                                           &dv_neighbour_selection,
-                                           &nsc);
-    if (0 == nsc.num_eligible)
-      return; /* done here, cannot forward to anyone else */
-    nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
-    nsc.num_selections =
-      GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
+      ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+      ec.purpose.size = htonl (sizeof (ec));
+      ec.target = GST_my_identity;
+      ec.ephemeral_key = dvb->ephemeral_key;
+      if (
+        GNUNET_OK !=
+        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
+                                    &ec.purpose,
+                                    &ppay.sender_sig,
+                                    &ppay.sender.public_key))
+      {
+        /* Signature invalid, disard! */
+        GNUNET_break_op (0);
+        finish_cmc_handling (cmc);
+        return;
+      }
+    }
+    /* Update sender, we now know the real origin! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Forwarding DVL to %u other peers\n",
-                nsc.num_selections);
-    for (unsigned int i = 0; i < nsc.num_selections; i++)
-      nsc.selections[i] =
-        (nsc.num_selections == n_cnt)
-          ? i /* all were selected, avoid collisions by chance */
-          : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
-    nsc.num_eligible = 0;
-    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
-                                           &dv_neighbour_transmission,
-                                           &nsc);
-  }
+                "DVBox received for me from %s via %s\n",
+                GNUNET_i2s2 (&ppay.sender),
+                GNUNET_i2s (&cmc->im.sender));
+    cmc->im.sender = ppay.sender;
+
+    if (NULL != b)
+    {
+      /* update key cache and mono time */
+      b->last_ephemeral = dvb->ephemeral_key;
+      b->monotonic_time = monotime;
+      update_backtalker_monotime (b);
+      b->timeout =
+        GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+
+      demultiplex_with_cmc (cmc, mh);
+      return;
+    }
+    /* setup data structure to cache signature AND check
+       monotonic time with PEERSTORE before forwarding backchannel payload */
+    b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
+    b->pid = ppay.sender;
+    b->body_size = sizeof (body);
+    memcpy (&b[1], body, sizeof (body));
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     backtalkers,
+                     &b->pid,
+                     b,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    b->monotonic_time = monotime; /* NOTE: to be checked still! */
+    b->cmc = cmc;
+    b->timeout =
+      GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    b->get =
+      GNUNET_PEERSTORE_iterate (peerstore,
+                                "transport",
+                                &b->pid,
+                                GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+                                &backtalker_monotime_cb,
+                                b);
+  } /* end actual decryption */
 }
 
 
 /**
- * Communicator gave us a DV box.  Check the message.
+ * Client notified us about transmission from a peer.  Process the request.
  *
- * @param cls a `struct CommunicatorMessageContext`
- * @param dvb the send message that was sent
+ * @param cls a `struct TransportClient` which sent us the message
+ * @param obm the send message that was sent
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
+check_incoming_msg (void *cls,
+                    const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
-  uint16_t size = ntohs (dvb->header.size);
-  uint16_t num_hops = ntohs (dvb->num_hops);
-  const struct GNUNET_PeerIdentity *hops =
-    (const struct GNUNET_PeerIdentity *) &dvb[1];
-  const struct GNUNET_MessageHeader *inbox =
-    (const struct GNUNET_MessageHeader *) &hops[num_hops];
-  uint16_t isize;
-  uint16_t itype;
+  struct TransportClient *tc = cls;
 
-  (void) cls;
-  if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) +
-               sizeof (struct GNUNET_MessageHeader))
+  if (CT_COMMUNICATOR != tc->type)
   {
-    GNUNET_break_op (0);
+    GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  isize = ntohs (inbox->size);
-  if (size !=
-      sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+  GNUNET_MQ_check_boxed_message (im);
+  return GNUNET_OK;
+}
+
+
+#if 0
+/**
+ * We received a @a challenge from another peer, check if we can
+ * increase the flow control window to that peer.
+ *
+ * @param vl virtual link
+ * @param challenge the challenge we received
+ * @param sender_time when did the peer send the message?
+ * @param last_window_consum_limit maximum number of kb the sender
+ *        promises to use of the previous window (if any)
+ */
+static void
+update_fc_window (struct VirtualLink *vl,
+                  struct GNUNET_TIME_Absolute sender_time,
+                  uint32_t last_window_consum_limit)
+{
+  // FIXME: update to new FC logic
+  if (0 == GNUNET_memcmp (challenge, &vl->n_challenge))
+  {
+    uint32_t avail;
+
+    /* Challenge identical to last one, update
+       @a last_window_consum_limit (to minimum) */
+    vl->last_fc_window_size_remaining =
+      GNUNET_MIN (last_window_consum_limit, vl->last_fc_window_size_remaining);
+    /* window could have shrunk! */
+    if (vl->available_fc_window_size > vl->last_fc_window_size_remaining)
+      avail = vl->available_fc_window_size - vl->last_fc_window_size_remaining;
+    else
+      avail = 0;
+    /* guard against integer overflow */
+    if (vl->incoming_fc_window_size_used + avail >=
+        vl->incoming_fc_window_size_used)
+      vl->incoming_fc_window_size = vl->incoming_fc_window_size_used + avail;
+    else
+      vl->incoming_fc_window_size = UINT32_MAX;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Updated window to %u/%u kb (%u used) for virtual link to %s!\n",
+                vl->incoming_fc_window_size,
+                vl->available_fc_window_size,
+                vl->incoming_fc_window_size_used,
+                GNUNET_i2s (&vl->target));
+    return;
   }
-  itype = ntohs (inbox->type);
-  if ((GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
-      (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype))
+  if (vl->n_challenge_time.abs_value_us >= sender_time.abs_value_us)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Challenges ignored: sender time not increasing",
+                              1,
+                              GNUNET_NO);
+    return;
   }
-  if (0 == GNUNET_memcmp (&dvb->origin, &GST_my_identity))
+  /* new challenge! */
+  if (vl->incoming_fc_window_size_used > last_window_consum_limit)
   {
+    /* lying peer: it already used more than it promised it would ever use! */
     GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_YES;
+    last_window_consum_limit = vl->incoming_fc_window_size_used;
+  }
+  /* What remains is at most the difference between what we already processed
+     and what the sender promises to limit itself to. */
+  vl->last_fc_window_size_remaining =
+    last_window_consum_limit - vl->incoming_fc_window_size_used;
+  vl->n_challenge = *challenge;
+  vl->n_challenge_time = sender_time;
+  vl->incoming_fc_window_size_used = 0;
+  /* window could have shrunk! */
+  if (vl->available_fc_window_size > vl->last_fc_window_size_remaining)
+    vl->incoming_fc_window_size =
+      vl->available_fc_window_size - vl->last_fc_window_size_remaining;
+  else
+    vl->incoming_fc_window_size = 0;
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "New window at %u/%u kb (%u left on previous) for virtual link to %s!\n",
+    vl->incoming_fc_window_size,
+    vl->available_fc_window_size,
+    vl->last_fc_window_size_remaining,
+    GNUNET_i2s (&vl->target));
+}
+#endif
+
+
+/**
+ * Closure for #check_known_address.
+ */
+struct CheckKnownAddressContext
+{
+  /**
+   * Set to the address we are looking for.
+   */
+  const char *address;
+
+  /**
+   * Set to a matching validation state, if one was found.
+   */
+  struct ValidationState *vs;
+};
+
+
+/**
+ * Test if the validation state in @a value matches the
+ * address from @a cls.
+ *
+ * @param cls a `struct CheckKnownAddressContext`
+ * @param pid unused (must match though)
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
+ */
+static int
+check_known_address (void *cls,
+                     const struct GNUNET_PeerIdentity *pid,
+                     void *value)
+{
+  struct CheckKnownAddressContext *ckac = cls;
+  struct ValidationState *vs = value;
+
+  (void) pid;
+  if (0 != strcmp (vs->address, ckac->address))
+    return GNUNET_OK;
+  ckac->vs = vs;
+  return GNUNET_NO;
 }
 
 
 /**
- * Create a DV Box message and queue it for transmission to
- * @ea next_hop.
+ * Task run periodically to validate some address based on #validation_heap.
+ *
+ * @param cls NULL
+ */
+static void
+validation_start_cb (void *cls);
+
+
+/**
+ * Set the time for next_challenge of @a vs to @a new_time.
+ * Updates the heap and if necessary reschedules the job.
  *
- * @param next_hop peer to receive the message next
- * @param total_hops how many hops did the message take so far
- * @param num_hops length of the @a hops array
- * @param origin origin of the message
- * @param hops next peer(s) to the destination, including destination
- * @param payload payload of the box
- * @param payload_size number of bytes in @a payload
+ * @param vs validation state to update
+ * @param new_time new time for revalidation
  */
 static void
-forward_dv_box (struct Neighbour *next_hop,
-                uint16_t total_hops,
-                uint16_t num_hops,
-                const struct GNUNET_PeerIdentity *origin,
-                const struct GNUNET_PeerIdentity *hops,
-                const void *payload,
-                uint16_t payload_size)
+update_next_challenge_time (struct ValidationState *vs,
+                            struct GNUNET_TIME_Absolute new_time)
 {
-  struct TransportDVBoxMessage *dvb;
+  struct GNUNET_TIME_Relative delta;
 
-  dvb = create_dv_box (total_hops,
-                       origin,
-                       &hops[num_hops - 1] /* == target */,
-                       num_hops - 1 /* do not count target twice */,
-                       hops,
-                       payload,
-                       payload_size);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Routing DV Box of %u bytes from %s at %u/%u hops via %s\n",
-              payload_size,
-              GNUNET_i2s (origin),
-              (unsigned int) num_hops,
-              (unsigned int) total_hops,
-              GNUNET_i2s2 (&next_hop->pid));
-  route_message (&next_hop->pid, &dvb->header, RMO_NONE);
-  GNUNET_free (dvb);
+  if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
+    return; /* be lazy */
+  vs->next_challenge = new_time;
+  if (NULL == vs->hn)
+    vs->hn =
+      GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
+  else
+    GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
+  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
+      (NULL != validation_task))
+    return;
+  if (NULL != validation_task)
+    GNUNET_SCHEDULER_cancel (validation_task);
+  /* randomize a bit */
+  delta.rel_value_us =
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                              MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
+  new_time = GNUNET_TIME_absolute_add (new_time, delta);
+  validation_task =
+    GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
 }
 
 
 /**
- * Communicator gave us a DV box.  Process the request.
+ * Start address validation.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param dvb the message that was received
+ * @param pid peer the @a address is for
+ * @param address an address to reach @a pid (presumably)
  */
 static void
-handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
+start_address_validation (const struct GNUNET_PeerIdentity *pid,
+                          const char *address)
 {
-  struct CommunicatorMessageContext *cmc = cls;
-  uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
-  uint16_t num_hops = ntohs (dvb->num_hops);
-  const struct GNUNET_PeerIdentity *hops =
-    (const struct GNUNET_PeerIdentity *) &dvb[1];
-  const struct GNUNET_MessageHeader *inbox =
-    (const struct GNUNET_MessageHeader *) &hops[num_hops];
-
-  if (GNUNET_EXTRA_LOGGING > 0)
-  {
-    char *path;
-
-    path = GNUNET_strdup (GNUNET_i2s (&GST_my_identity));
-    for (unsigned int i = 0; i < num_hops; i++)
-    {
-      char *tmp;
-
-      GNUNET_asprintf (&tmp, "%s->%s", path, GNUNET_i2s (&hops[i]));
-      GNUNET_free (path);
-      path = tmp;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received DVBox with remainig path %s\n",
-                path);
-    GNUNET_free (path);
-  }
+  struct GNUNET_TIME_Absolute now;
+  struct ValidationState *vs;
+  struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
 
-  if (num_hops > 0)
+  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                                     pid,
+                                                     &check_known_address,
+                                                     &ckac);
+  if (NULL != (vs = ckac.vs))
   {
-    /* We're trying from the end of the hops array, as we may be
-       able to find a shortcut unknown to the origin that way */
-    for (int i = num_hops - 1; i >= 0; i--)
+    /* if 'vs' is not currently valid, we need to speed up retrying the
+     * validation */
+    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
     {
-      struct Neighbour *n;
-
-      if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
-      {
-        GNUNET_break_op (0);
-        finish_cmc_handling (cmc);
-        return;
-      }
-      n = lookup_neighbour (&hops[i]);
-      if (NULL == n)
-        continue;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Skipping %u/%u hops ahead while routing DV Box\n",
-                  i,
-                  num_hops);
-      forward_dv_box (n,
-                      ntohs (dvb->total_hops) + 1,
-                      num_hops - i - 1, /* number of hops left */
-                      &dvb->origin,
-                      &hops[i + 1], /* remaining hops */
-                      (const void *) &dvb[1],
-                      size);
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# DV hops skipped routing boxes",
-                                i,
-                                GNUNET_NO);
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# DV boxes routed (total)",
-                                1,
-                                GNUNET_NO);
-      finish_cmc_handling (cmc);
-      return;
+      /* reduce backoff as we got a fresh advertisement */
+      vs->challenge_backoff =
+        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
+                                  GNUNET_TIME_relative_divide (vs->challenge_backoff,
+                                                               2));
+      update_next_challenge_time (vs,
+                                  GNUNET_TIME_relative_to_absolute (
+                                    vs->challenge_backoff));
     }
-    /* Woopsie, next hop not in neighbours, drop! */
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# DV Boxes dropped: next hop unknown",
-                              1,
-                              GNUNET_NO);
-    finish_cmc_handling (cmc);
     return;
   }
-  /* We are the target. Unbox and handle message. */
+  now = GNUNET_TIME_absolute_get ();
+  vs = GNUNET_new (struct ValidationState);
+  vs->pid = *pid;
+  vs->valid_until =
+    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
+  vs->first_challenge_use = now;
+  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                              &vs->challenge,
+                              sizeof (vs->challenge));
+  vs->address = GNUNET_strdup (address);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "DVBox received for me from %s\n",
-              GNUNET_i2s (&dvb->origin));
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# DV boxes opened (ultimate target)",
-                            1,
-                            GNUNET_NO);
-  cmc->im.sender = dvb->origin;
-  cmc->total_hops = ntohs (dvb->total_hops);
-  // FIXME: should *decrypt* inbox here; needs BackchannelEncapsulation!
-  // FIXME: need to prevent box-in-a-box, so check inbox type!
-  demultiplex_with_cmc (cmc, inbox);
+              "Starting address validation `%s' of peer %s using challenge %s\n",
+              address,
+              GNUNET_i2s (pid),
+              GNUNET_sh2s (&vs->challenge.value));
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_put (
+                   validation_map,
+                   &vs->pid,
+                   vs,
+                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  update_next_challenge_time (vs, now);
 }
 
 
 /**
- * Client notified us about transmission from a peer.  Process the request.
+ * Function called by PEERSTORE for each matching record.
  *
- * @param cls a `struct TransportClient` which sent us the message
- * @param obm the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param cls closure, a `struct IncomingRequest`
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
  */
-static int
-check_incoming_msg (void *cls,
-                    const struct GNUNET_TRANSPORT_IncomingMessage *im)
+static void
+handle_hello_for_incoming (void *cls,
+                           const struct GNUNET_PEERSTORE_Record *record,
+                           const char *emsg)
 {
-  struct TransportClient *tc = cls;
+  struct IncomingRequest *ir = cls;
+  const char *val;
 
-  if (CT_COMMUNICATOR != tc->type)
+  if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Got failure from PEERSTORE: %s\n",
+                emsg);
+    return;
+  }
+  val = record->value;
+  if ((0 == record->value_size) || ('\0' != val[record->value_size - 1]))
   {
     GNUNET_break (0);
-    return GNUNET_SYSERR;
+    return;
   }
-  GNUNET_MQ_check_boxed_message (im);
-  return GNUNET_OK;
+  start_address_validation (&ir->pid, (const char *) record->value);
 }
 
 
@@ -7128,41 +7656,82 @@ handle_validation_challenge (
   const struct TransportValidationChallengeMessage *tvc)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct TransportValidationResponseMessage *tvr;
+  struct TransportValidationResponseMessage tvr;
+  struct VirtualLink *vl;
+  struct GNUNET_TIME_RelativeNBO validity_duration;
+  struct IncomingRequest *ir;
+  struct Neighbour *n;
 
+  /* DV-routed messages are not allowed for validation challenges */
   if (cmc->total_hops > 0)
   {
-    /* DV routing is not allowed for validation challenges! */
     GNUNET_break_op (0);
     finish_cmc_handling (cmc);
     return;
   }
+  validity_duration = cmc->im.expected_address_validity;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received address validation challenge %s\n",
               GNUNET_sh2s (&tvc->challenge.value));
-  tvr = GNUNET_new (struct TransportValidationResponseMessage);
-  tvr->header.type =
+  /* If we have a virtual link, we use this mechanism to signal the
+     size of the flow control window, and to allow the sender
+     to ask for increases. If for us the virtual link is still down,
+     we will always give a window size of zero. */
+  tvr.header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
-  tvr->header.size = htons (sizeof (*tvr));
-  tvr->challenge = tvc->challenge;
-  tvr->origin_time = tvc->sender_time;
-  tvr->validity_duration = cmc->im.expected_address_validity;
+  tvr.header.size = htons (sizeof (tvr));
+  tvr.reserved = htonl (0);
+  tvr.challenge = tvc->challenge;
+  tvr.origin_time = tvc->sender_time;
+  tvr.validity_duration = validity_duration;
   {
     /* create signature */
     struct TransportValidationPS tvp =
       {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
        .purpose.size = htonl (sizeof (tvp)),
-       .validity_duration = tvr->validity_duration,
+       .validity_duration = validity_duration,
        .challenge = tvc->challenge};
 
     GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
                                                           &tvp.purpose,
-                                                          &tvr->signature));
+                                                          &tvr.signature));
   }
   route_message (&cmc->im.sender,
-                 &tvr->header,
+                 &tvr.header,
                  RMO_ANYTHING_GOES | RMO_REDUNDANT);
   finish_cmc_handling (cmc);
+
+  vl = lookup_virtual_link (&cmc->im.sender);
+  if (NULL != vl)
+    return;
+
+  /* For us, the link is still down, but we need bi-directional
+     connections (for flow-control and for this to be useful for
+     CORE), so we must try to bring the link up! */
+
+  /* (1) Check existing queues, if any, we may be lucky! */
+  n = lookup_neighbour (&cmc->im.sender);
+  if (NULL != n)
+    for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+      start_address_validation (&cmc->im.sender, q->address);
+  /* (2) Also try to see if we have addresses in PEERSTORE for this peer
+     we could use */
+  for (ir = ir_head; NULL != ir; ir = ir->next)
+    if (0 == GNUNET_memcmp (&ir->pid, &cmc->im.sender))
+      return; /* we are already trying */
+  ir = GNUNET_new (struct IncomingRequest);
+  ir->pid = cmc->im.sender;
+  GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
+  ir->wc = GNUNET_PEERSTORE_watch (peerstore,
+                                   "transport",
+                                   &ir->pid,
+                                   GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
+                                   &handle_hello_for_incoming,
+                                   ir);
+  ir_total++;
+  /* Bound attempts we do in parallel here, might otherwise get excessive */
+  while (ir_total > MAX_INCOMING_REQUEST)
+    free_incoming_request (ir_head);
 }
 
 
@@ -7218,60 +7787,15 @@ check_known_challenge (void *cls,
 static void
 peerstore_store_validation_cb (void *cls, int success)
 {
-  struct ValidationState *vs = cls;
-
-  vs->sc = NULL;
-  if (GNUNET_YES == success)
-    return;
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# Peerstore failed to store foreign address",
-                            1,
-                            GNUNET_NO);
-}
-
-
-/**
- * Task run periodically to validate some address based on #validation_heap.
- *
- * @param cls NULL
- */
-static void
-validation_start_cb (void *cls);
-
-
-/**
- * Set the time for next_challenge of @a vs to @a new_time.
- * Updates the heap and if necessary reschedules the job.
- *
- * @param vs validation state to update
- * @param new_time new time for revalidation
- */
-static void
-update_next_challenge_time (struct ValidationState *vs,
-                            struct GNUNET_TIME_Absolute new_time)
-{
-  struct GNUNET_TIME_Relative delta;
-
-  if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
-    return; /* be lazy */
-  vs->next_challenge = new_time;
-  if (NULL == vs->hn)
-    vs->hn =
-      GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
-  else
-    GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
-  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
-      (NULL != validation_task))
+  struct ValidationState *vs = cls;
+
+  vs->sc = NULL;
+  if (GNUNET_YES == success)
     return;
-  if (NULL != validation_task)
-    GNUNET_SCHEDULER_cancel (validation_task);
-  /* randomize a bit */
-  delta.rel_value_us =
-    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
-                              MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
-  new_time = GNUNET_TIME_absolute_add (new_time, delta);
-  validation_task =
-    GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
+  GNUNET_STATISTICS_update (GST_stats,
+                            "# Peerstore failed to store foreign address",
+                            1,
+                            GNUNET_NO);
 }
 
 
@@ -7425,14 +7949,22 @@ handle_validation_response (
   q->validated_until = vs->validated_until;
   q->pd.aged_rtt = vs->validation_rtt;
   n = q->neighbour;
-  vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+  vl = lookup_virtual_link (&vs->pid);
   if (NULL != vl)
   {
     /* Link was already up, remember n is also now available and we are done */
-    vl->n = n;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Virtual link to %s could now also direct neighbour!\n",
-                GNUNET_i2s (&vs->pid));
+    if (NULL == vl->n)
+    {
+      vl->n = n;
+      n->vl = vl;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Virtual link to %s could now also direct neighbour!\n",
+                  GNUNET_i2s (&vs->pid));
+    }
+    else
+    {
+      GNUNET_assert (n == vl->n);
+    }
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -7441,7 +7973,10 @@ handle_validation_response (
   vl = GNUNET_new (struct VirtualLink);
   vl->target = n->pid;
   vl->n = n;
+  n->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+  vl->my_challenge = tvr->challenge;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
   GNUNET_break (GNUNET_YES ==
@@ -7579,7 +8114,7 @@ set_pending_message_uuid (struct PendingMessage *pm)
 {
   if (pm->msg_uuid_set)
     return;
-  pm->msg_uuid.uuid = pm->target->message_uuid_ctr++;
+  pm->msg_uuid.uuid = pm->vl->message_uuid_ctr++;
   pm->msg_uuid_set = GNUNET_YES;
 }
 
@@ -7655,7 +8190,7 @@ fragment_message (struct Queue *queue,
               "Fragmenting message %llu <%llu> to %s for MTU %u\n",
               (unsigned long long) pm->msg_uuid.uuid,
               pm->logging_uuid,
-              GNUNET_i2s (&pm->target->pid),
+              GNUNET_i2s (&pm->vl->target),
               (unsigned int) mtu);
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
 
@@ -7702,7 +8237,7 @@ fragment_message (struct Queue *queue,
       GNUNET_malloc (sizeof (struct PendingMessage) +
                      sizeof (struct TransportFragmentBoxMessage) + fragsize);
     frag->logging_uuid = logging_uuid_gen++;
-    frag->target = pm->target;
+    frag->vl = pm->vl;
     frag->frag_parent = ff;
     frag->timeout = pm->timeout;
     frag->bytes_msg = sizeof (struct TransportFragmentBoxMessage) + fragsize;
@@ -7773,14 +8308,14 @@ reliability_box_message (struct Queue *queue,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Preparing reliability box for message <%llu> to %s on queue %s\n",
               pm->logging_uuid,
-              GNUNET_i2s (&pm->target->pid),
+              GNUNET_i2s (&pm->vl->target),
               queue->address);
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
 
   bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
                        pm->bytes_msg);
   bpm->logging_uuid = logging_uuid_gen++;
-  bpm->target = pm->target;
+  bpm->vl = pm->vl;
   bpm->frag_parent = pm;
   GNUNET_CONTAINER_MDLL_insert (frag, pm->head_frag, pm->tail_frag, bpm);
   bpm->timeout = pm->timeout;
@@ -7812,7 +8347,7 @@ static void
 update_pm_next_attempt (struct PendingMessage *pm,
                         struct GNUNET_TIME_Absolute next_attempt)
 {
-  struct Neighbour *neighbour = pm->target;
+  struct VirtualLink *vl = pm->vl;
 
   pm->next_attempt = next_attempt;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -7825,17 +8360,17 @@ update_pm_next_attempt (struct PendingMessage *pm,
     struct PendingMessage *pos;
 
     /* re-insert sort in neighbour list */
-    GNUNET_CONTAINER_MDLL_remove (neighbour,
-                                  neighbour->pending_msg_head,
-                                  neighbour->pending_msg_tail,
+    GNUNET_CONTAINER_MDLL_remove (vl,
+                                  vl->pending_msg_head,
+                                  vl->pending_msg_tail,
                                   pm);
-    pos = neighbour->pending_msg_tail;
+    pos = vl->pending_msg_tail;
     while ((NULL != pos) &&
            (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
-      pos = pos->prev_neighbour;
-    GNUNET_CONTAINER_MDLL_insert_after (neighbour,
-                                        neighbour->pending_msg_head,
-                                        neighbour->pending_msg_tail,
+      pos = pos->prev_vl;
+    GNUNET_CONTAINER_MDLL_insert_after (vl,
+                                        vl->pending_msg_head,
+                                        vl->pending_msg_tail,
                                         pos,
                                         pm);
   }
@@ -7860,10 +8395,169 @@ update_pm_next_attempt (struct PendingMessage *pm,
 
 
 /**
- * We believe we are ready to transmit a message on a queue.
- * Gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * Context for #select_best_pending_from_link().
+ */
+struct PendingMessageScoreContext
+{
+  /**
+   * Set to the best message that was found, NULL for none.
+   */
+  struct PendingMessage *best;
+
+  /**
+   * DVH that @e best should take, or NULL for direct transmission.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * What is the estimated total overhead for this message?
+   */
+  size_t real_overhead;
+
+  /**
+   * Number of pending messages we seriously considered this time.
+   */
+  unsigned int consideration_counter;
+
+  /**
+   * Did we have to fragment?
+   */
+  int frag;
+
+  /**
+   * Did we have to reliability box?
+   */
+  int relb;
+};
+
+
+/**
+ * Select the best pending message from @a vl for transmission
+ * via @a queue.
+ *
+ * @param sc[in,out] best message so far (NULL for none), plus scoring data
+ * @param queue the queue that will be used for transmission
+ * @param vl the virtual link providing the messages
+ * @param dvh path we are currently considering, or NULL for none
+ * @param overhead number of bytes of overhead to be expected
+ *        from DV encapsulation (0 for without DV)
+ */
+static void
+select_best_pending_from_link (struct PendingMessageScoreContext *sc,
+                               struct Queue *queue,
+                               struct VirtualLink *vl,
+                               struct DistanceVectorHop *dvh,
+                               size_t overhead)
+{
+  struct GNUNET_TIME_Absolute now;
+
+  now = GNUNET_TIME_absolute_get ();
+  for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos;
+       pos = pos->next_vl)
+  {
+    size_t real_overhead = overhead;
+    int frag;
+    int relb;
+
+    if (pos->next_attempt.abs_value_us > now.abs_value_us)
+      break; /* too early for all messages, they are sorted by next_attempt */
+    if (NULL != pos->qe)
+      continue; /* not eligible */
+    sc->consideration_counter++;
+    /* determine if we have to reliability-box, if so add reliability box
+       overhead */
+    relb = GNUNET_NO;
+    if ((GNUNET_NO == frag) &&
+        (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) &&
+        (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc))
+    {
+      relb = GNUNET_YES;
+      real_overhead += sizeof (struct TransportReliabilityBoxMessage);
+    }
+    /* determine if we have to fragment, if so add fragmentation
+       overhead! */
+    frag = GNUNET_NO;
+    if ( ( (0 != queue->mtu) &&
+           (pos->bytes_msg + real_overhead > queue->mtu) ) ||
+         (pos->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
+         (NULL != pos->head_frag /* fragments already exist, should
+                                    respect that even if MTU is 0 for
+                                    this queue */) )
+    {
+      frag = GNUNET_YES;
+      relb = GNUNET_NO; /* if we fragment, we never also reliability box */
+      if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+      {
+        /* FIXME-OPTIMIZE: we could use an optimized, shorter fragmentation
+           header without the ACK UUID when using a *reliable* channel! */
+      }
+      real_overhead = overhead + sizeof (struct TransportFragmentBoxMessage);
+    }
+
+    /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
+       message would beat it! */
+    if (NULL != sc->best)
+    {
+      /* CHECK if pos fits queue BETTER (=smaller) than pm, if not: continue;
+         OPTIMIZE-ME: This is a heuristic, which so far has NOT been
+         experimentally validated. There may be some huge potential for
+         improvement here. Also, we right now only compare how well the
+         given message fits _this_ queue, and do not consider how well other
+         queues might suit the message. Taking other queues into consideration
+         may further improve the result, but could also be expensive
+         in terms of CPU time.  */
+      long long sc_score = sc->frag * 40 + sc->relb * 20 + sc->real_overhead;
+      long long pm_score = frag * 40 + relb * 20 + real_overhead;
+      long long time_delta =
+        (sc->best->next_attempt.abs_value_us - pos->next_attempt.abs_value_us) /
+        1000LL;
+
+      /* "time_delta" considers which message has been 'ready' for transmission
+         for longer, if a message has a preference for low latency, increase
+         the weight of the time_delta by 10x if it is favorable for that message */
+      if ((0 != (pos->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+          (0 != (sc->best->prefs & GNUNET_MQ_PREF_LOW_LATENCY)))
+        time_delta *= 10; /* increase weight (always, both are low latency) */
+      else if ((0 != (pos->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+               (time_delta > 0))
+        time_delta *=
+          10; /* increase weight, favors 'pos', which is low latency */
+      else if ((0 != (sc->best->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+               (time_delta < 0))
+        time_delta *=
+          10; /* increase weight, favors 'sc->best', which is low latency */
+      if (0 != queue->mtu)
+      {
+        /* Grant bonus if we are bellow MTU, larger bonus the closer we will
+           be to the MTU */
+        if (queue->mtu > sc->real_overhead + sc->best->bytes_msg)
+          sc_score -= queue->mtu - (sc->real_overhead + sc->best->bytes_msg);
+        if (queue->mtu > real_overhead + pos->bytes_msg)
+          pm_score -= queue->mtu - (real_overhead + pos->bytes_msg);
+      }
+      if (sc_score + time_delta > pm_score)
+        continue; /* sc_score larger, keep sc->best */
+    }
+    sc->best = pos;
+    sc->dvh = dvh;
+    sc->frag = frag;
+    sc->relb = relb;
+  }
+}
+
+
+/**
+ * We believe we are ready to transmit a `struct PendingMessage` on a
+ * queue, the big question is which one!  We need to see if there is
+ * one pending that is allowed by flow control and congestion control
+ * and (ideally) matches our queue's performance profile.
+ *
+ * If such a message is found, we give the message to the communicator
+ * for transmission (updating the tracker, and re-scheduling ourselves
+ * if applicable).
+ *
+ * If no such message is found, the queue's `idle` field must be set
+ * to #GNUNET_YES.
  *
  * @param cls the `struct Queue` to process transmissions for
  */
@@ -7872,125 +8566,98 @@ transmit_on_queue (void *cls)
 {
   struct Queue *queue = cls;
   struct Neighbour *n = queue->neighbour;
+  struct PendingMessageScoreContext sc;
   struct PendingMessage *pm;
-  struct PendingMessage *s;
-  uint32_t overhead;
 
   queue->transmit_task = NULL;
-  if (NULL == (pm = n->pending_msg_head))
+  if (NULL == n->vl)
   {
-    /* no message pending, nothing to do here! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "No messages waiting on queue %s to %s, going to sleep\n",
-                queue->address,
-                GNUNET_i2s (&n->pid));
+                "Virtual link `%s' is down, cannot have PM for queue `%s'\n",
+                GNUNET_i2s (&n->pid),
+                queue->address);
+    queue->idle = GNUNET_YES;
     return;
   }
-  if (NULL != pm->qe)
+  memset (&sc, 0, sizeof (sc));
+  select_best_pending_from_link (&sc, queue, n->vl, NULL, 0);
+  if (NULL == sc.best)
+  {
+    /* Also look at DVH that have the n as first hop! */
+    for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh;
+         dvh = dvh->next_neighbour)
+    {
+      select_best_pending_from_link (&sc,
+                                     queue,
+                                     dvh->dv->vl,
+                                     dvh,
+                                     sizeof (struct GNUNET_PeerIdentity) *
+                                         (1 + dvh->distance) +
+                                       sizeof (struct TransportDVBoxMessage) +
+                                       sizeof (struct TransportDVBoxPayloadP));
+    }
+  }
+  if (NULL == sc.best)
   {
-    /* message still pending with communciator!
-       LOGGING-FIXME: Use stats? Should this not be rare? */
+    /* no message pending, nothing to do here! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Waiting on communicator for queue %s to %s, going to sleep\n",
+                "No pending messages, queue `%s' to %s now idle\n",
                 queue->address,
                 GNUNET_i2s (&n->pid));
+    queue->idle = GNUNET_YES;
     return;
   }
-  schedule_transmit_on_queue (queue, GNUNET_YES);
-  if (NULL != queue->transmit_task)
+
+  /* Given selection in `sc`, do transmission */
+  pm = sc.best;
+  if (GNUNET_YES == sc.frag)
   {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Scheduled transmission on queue %s to %s for later, going to sleep\n",
-      queue->address,
-      GNUNET_i2s (&n->pid));
-    return; /* do it later */
-  }
-  overhead = 0;
-  if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    overhead += sizeof (struct TransportReliabilityBoxMessage);
-  s = pm;
-  if ( ( (0 != queue->mtu) &&
-        (pm->bytes_msg + overhead > queue->mtu) ) ||
-       (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
-       (NULL != pm->head_frag /* fragments already exist, should
-                                respect that even if MTU is 0 for
-                                this queue */) )
-    s = fragment_message (queue, pm->dvh, s);
-  if (NULL == s)
-  {
-    /* Fragmentation failed, try next message... */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
-                queue->address,
-                GNUNET_i2s (&n->pid),
-                pm->logging_uuid);
-    schedule_transmit_on_queue (queue, GNUNET_NO);
-    return;
+    pm = fragment_message (queue, sc.dvh, sc.best);
+    if (NULL == pm)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
+                  queue->address,
+                  GNUNET_i2s (&n->pid),
+                  pm->logging_uuid);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+    }
   }
-  if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
-    s = reliability_box_message (queue, pm->dvh, s);
-  if (NULL == s)
+  else if (GNUNET_YES == sc.relb)
   {
-    /* Reliability boxing failed, try next message... */
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
-      queue->address,
-      GNUNET_i2s (&n->pid),
-      pm->logging_uuid);
-    schedule_transmit_on_queue (queue, GNUNET_NO);
-    return;
+    pm = reliability_box_message (queue, sc.dvh, sc.best);
+    if (NULL == pm)
+    {
+      /* Reliability boxing failed, try next message... */
+      GNUNET_log (
+        GNUNET_ERROR_TYPE_DEBUG,
+        "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
+        queue->address,
+        GNUNET_i2s (&n->pid),
+        pm->logging_uuid);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      return;
+    }
   }
+  else
+    pm = sc.best; /* no boxing required */
 
-  /* Pass 's' for transission to the communicator */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Passing message <%llu> to queue %s for peer %s\n",
-              s->logging_uuid,
-              queue->address,
-              GNUNET_i2s (&n->pid));
-  queue_send_msg (queue, s, &s[1], s->bytes_msg);
-  // FIXME: do something similar to the logic below
-  // in defragmentation / reliability ACK handling!
+  /* Pass 'pm' for transission to the communicator */
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "Passing message <%llu> to queue %s for peer %s (considered %u others)\n",
+    pm->logging_uuid,
+    queue->address,
+    GNUNET_i2s (&n->pid),
+    sc.consideration_counter);
+  queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
 
   /* Check if this transmission somehow conclusively finished handing 'pm'
      even without any explicit ACKs */
-  if ((PMT_CORE == s->pmt) &&
+  if ((PMT_CORE == pm->pmt) ||
       (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
   {
-    /* Full message sent, and over reliabile channel */
-    client_send_response (pm);
-  }
-  else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
-            queue->tc->details.communicator.cc) &&
-           (PMT_FRAGMENT_BOX == s->pmt))
-  {
-    struct PendingMessage *pos;
-
-    /* Fragment sent over reliabile channel */
-    free_fragment_tree (s);
-    pos = s->frag_parent;
-    GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
-    GNUNET_free (s);
-    /* check if subtree is done */
-    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
-           (pos != pm))
-    {
-      s = pos;
-      pos = s->frag_parent;
-      GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
-      GNUNET_free (s);
-    }
-
-    /* Was this the last applicable fragmment? */
-    if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (pm);
-  }
-  else if (PMT_CORE != pm->pmt)
-  {
-    /* This was an acknowledgement of some type, always free */
-    free_pending_message (pm);
+    completed_pending_message (pm);
   }
   else
   {
@@ -7999,17 +8666,18 @@ transmit_on_queue (void *cls)
        characteristics (i.e. RTT); it takes one RTT for the message to
        arrive and the ACK to come back in the best case; but the other
        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
-       retransmitting.  Note that in the future this heuristic should
-       likely be improved further (measure RTT stability, consider
-       message urgency and size when delaying ACKs, etc.) */
-    update_pm_next_attempt (s,
+       retransmitting.
+
+       OPTIMIZE: Note that in the future this heuristic should likely
+       be improved further (measure RTT stability, consider message
+       urgency and size when delaying ACKs, etc.) */
+    update_pm_next_attempt (pm,
                             GNUNET_TIME_relative_to_absolute (
                               GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
                                                              4)));
   }
-
   /* finally, re-schedule queue transmission task itself */
-  schedule_transmit_on_queue (queue, GNUNET_NO);
+  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
 }
 
 
@@ -8124,7 +8792,7 @@ handle_send_message_ack (void *cls,
     for (struct Queue *queue = tc->details.communicator.queue_head;
          NULL != queue;
          queue = queue->next_client)
-      schedule_transmit_on_queue (queue, GNUNET_NO);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
   {
@@ -8133,33 +8801,21 @@ handle_send_message_ack (void *cls,
                               "# Transmission throttled due to queue queue limit",
                               -1,
                               GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue, GNUNET_NO);
+    schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
 
   if (NULL != (pm = qe->pm))
   {
-    struct Neighbour *n;
+    struct VirtualLink *vl;
 
     GNUNET_assert (qe == pm->qe);
     pm->qe = NULL;
     /* If waiting for this communicator may have blocked transmission
        of pm on other queues for this neighbour, force schedule
        transmit on queue for queues of the neighbour */
-    n = pm->target;
-    if (n->pending_msg_head == pm)
-    {
-      for (struct Queue *queue = n->queue_head; NULL != queue;
-           queue = queue->next_neighbour)
-        schedule_transmit_on_queue (queue, GNUNET_NO);
-    }
-    if (GNUNET_OK != ntohl (sma->status))
-    {
-      GNUNET_log (
-        GNUNET_ERROR_TYPE_INFO,
-        "Queue failed in transmission <%llu>, will try retransmission immediately\n",
-        pm->logging_uuid);
-      update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
-    }
+    vl = pm->vl;
+    if (vl->pending_msg_head == pm)
+      check_vl_transmission (vl);
   }
   GNUNET_free (qe);
 }
@@ -8313,7 +8969,7 @@ validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
 {
   struct TransportValidationChallengeMessage tvc;
 
-  vs->last_challenge_use = GNUNET_TIME_absolute_get ();
+  vs->last_challenge_use = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
   tvc.header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
   tvc.header.size = htons (sizeof (tvc));
@@ -8651,8 +9307,6 @@ handle_add_queue_message (void *cls,
   if (NULL == neighbour)
   {
     neighbour = GNUNET_new (struct Neighbour);
-    neighbour->message_uuid_ctr =
-      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
     neighbour->pid = aqm->receiver;
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multipeermap_put (
@@ -8684,6 +9338,7 @@ handle_add_queue_message (void *cls,
   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
   queue->neighbour = neighbour;
+  queue->idle = GNUNET_YES;
   memcpy (&queue[1], addr, addr_len);
   /* notify monitors about new queue */
   {
@@ -8705,6 +9360,8 @@ handle_add_queue_message (void *cls,
                                                 &aqm->receiver,
                                                 &check_validation_request_pending,
                                                 queue);
+  /* look for traffic for this queue */
+  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   /* might be our first queue, try launching DV learning */
   if (NULL == dvlearn_task)
     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
@@ -8806,120 +9463,17 @@ handle_suggest_cancel (void *cls, const struct ExpressPreferenceMessage *msg)
 }
 
 
-/**
- * Closure for #check_known_address.
- */
-struct CheckKnownAddressContext
-{
-  /**
-   * Set to the address we are looking for.
-   */
-  const char *address;
-
-  /**
-   * Set to a matching validation state, if one was found.
-   */
-  struct ValidationState *vs;
-};
-
-
-/**
- * Test if the validation state in @a value matches the
- * address from @a cls.
- *
- * @param cls a `struct CheckKnownAddressContext`
- * @param pid unused (must match though)
- * @param value a `struct ValidationState`
- * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
- */
-static int
-check_known_address (void *cls,
-                     const struct GNUNET_PeerIdentity *pid,
-                     void *value)
-{
-  struct CheckKnownAddressContext *ckac = cls;
-  struct ValidationState *vs = value;
-
-  (void) pid;
-  if (0 != strcmp (vs->address, ckac->address))
-    return GNUNET_OK;
-  ckac->vs = vs;
-  return GNUNET_NO;
-}
-
-
-/**
- * Start address validation.
- *
- * @param pid peer the @a address is for
- * @param address an address to reach @a pid (presumably)
- */
-static void
-start_address_validation (const struct GNUNET_PeerIdentity *pid,
-                          const char *address)
-{
-  struct GNUNET_TIME_Absolute now;
-  struct ValidationState *vs;
-  struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
-
-  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                                     pid,
-                                                     &check_known_address,
-                                                     &ckac);
-  if (NULL != (vs = ckac.vs))
-  {
-    /* if 'vs' is not currently valid, we need to speed up retrying the
-     * validation */
-    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
-    {
-      /* reduce backoff as we got a fresh advertisement */
-      vs->challenge_backoff =
-        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
-                                  GNUNET_TIME_relative_divide (vs->challenge_backoff,
-                                                               2));
-      update_next_challenge_time (vs,
-                                  GNUNET_TIME_relative_to_absolute (
-                                    vs->challenge_backoff));
-    }
-    return;
-  }
-  now = GNUNET_TIME_absolute_get ();
-  vs = GNUNET_new (struct ValidationState);
-  vs->pid = *pid;
-  vs->valid_until =
-    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
-  vs->first_challenge_use = now;
-  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
-  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                              &vs->challenge,
-                              sizeof (vs->challenge));
-  vs->address = GNUNET_strdup (address);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Starting address validation `%s' of peer %s using challenge %s\n",
-              address,
-              GNUNET_i2s (pid),
-              GNUNET_sh2s (&vs->challenge.value));
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multipeermap_put (
-                   validation_map,
-                   &vs->pid,
-                   vs,
-                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  update_next_challenge_time (vs, now);
-}
-
-
 /**
  * Function called by PEERSTORE for each matching record.
  *
- * @param cls closure
+ * @param cls closure, a `struct PeerRequest`
  * @param record peerstore record information
  * @param emsg error message, or NULL if no errors
  */
 static void
-handle_hello (void *cls,
-              const struct GNUNET_PEERSTORE_Record *record,
-              const char *emsg)
+handle_hello_for_client (void *cls,
+                         const struct GNUNET_PEERSTORE_Record *record,
+                         const char *emsg)
 {
   struct PeerRequest *pr = cls;
   const char *val;
@@ -8991,7 +9545,7 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg)
                                    "transport",
                                    &pr->pid,
                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
-                                   &handle_hello,
+                                   &handle_hello_for_client,
                                    pr);
   GNUNET_SERVICE_client_continue (tc->client);
 }
@@ -9080,28 +9634,6 @@ free_dv_routes_cb (void *cls,
 }
 
 
-/**
- * Free ephemeral entry.
- *
- * @param cls NULL
- * @param pid unused
- * @param value a `struct EphemeralCacheEntry`
- * @return #GNUNET_OK (always)
- */
-static int
-free_ephemeral_cb (void *cls,
-                   const struct GNUNET_PeerIdentity *pid,
-                   void *value)
-{
-  struct EphemeralCacheEntry *ece = value;
-
-  (void) cls;
-  (void) pid;
-  free_ephemeral (ece);
-  return GNUNET_OK;
-}
-
-
 /**
  * Free validation state.
  *
@@ -9180,11 +9712,6 @@ do_shutdown (void *cls)
   struct LearnLaunchEntry *lle;
   (void) cls;
 
-  if (NULL != ephemeral_task)
-  {
-    GNUNET_SCHEDULER_cancel (ephemeral_task);
-    ephemeral_task = NULL;
-  }
   GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL);
   if (NULL != peerstore)
   {
@@ -9227,6 +9754,9 @@ do_shutdown (void *cls)
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
   validation_map = NULL;
+  while (NULL != ir_head)
+    free_incoming_request (ir_head);
+  GNUNET_assert (0 == ir_total);
   while (NULL != (lle = lle_head))
   {
     GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);
@@ -9239,13 +9769,6 @@ do_shutdown (void *cls)
   GNUNET_CONTAINER_multipeermap_iterate (dv_routes, &free_dv_routes_cb, NULL);
   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
   dv_routes = NULL;
-  GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
-                                         &free_ephemeral_cb,
-                                         NULL);
-  GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
-  ephemeral_map = NULL;
-  GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
-  ephemeral_heap = NULL;
 }
 
 
@@ -9272,9 +9795,6 @@ run (void *cls,
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
-  ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
-  ephemeral_heap =
-    GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
   dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
                                                        GNUNET_YES);
   validation_map = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);