tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
index 45d8086ef8fcaa964c1f2637ba437fcb6dca5450..a8f70986b3277d46f4be100c5ea10256d42487f1 100644 (file)
  * @author Christian Grothoff
  *
  * TODO:
- * - figure out how to transmit (selective) ACKs in case of uni-directional
- *   communicators (with/without core? DV-only?) When do we use ACKs?
- *   => communicators use selective ACKs for flow control
- *   => transport uses message-level ACKs for RTT, fragment confirmation
- *   => integrate DV into transport, use neither core nor communicators
- *      but rather give communicators transport-encapsulated messages
- *      (which could be core-data, background-channel traffic, or
- *       transport-to-transport traffic)
- *
  * Implement next:
- * - backchannel message encryption & decryption
- * - DV data structures:
- *   + using DV routes!
- *     - handling of DV-boxed messages that need to be forwarded
- *     - route_message implementation, including using DV data structures
- *       (but not when routing certain message types, like DV learn,
- *        MUST pay attention to content here -- or pass extra flags?)
- * - retransmission
- * - track RTT, distance, loss, etc. => requires extra data structures!
- *
- * Later:
- * - change transport-core API to provide proper flow control in both
- *   directions, allow multiple messages per peer simultaneously (tag
- *   confirmations with unique message ID), and replace quota-out with
- *   proper flow control;
- * - if messages are below MTU, consider adding ACKs and other stuff
- *   (requires planning at receiver, and additional MST-style demultiplex
- *    at receiver!)
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
+ *   reliability, etc.) per message!
+ * - review retransmission logic, right now there is no smartness there!
+ *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
+ *
+ * Optimizations:
+ * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
+ *   => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
+ * - queue_send_msg and route_message both by API design have to make copies
+ *   of the payload, and route_message on top of that requires a malloc/free.
+ *   Change design to approximate "zero" copy better... [CPU]
  * - could avoid copying body of message into each fragment and keep
  *   fragments as just pointers into the original message and only
  *   fully build fragments just before transmission (optimization, should
- *   reduce CPU and memory use)
- *
- * FIXME (without marks in the code!):
- * - proper use/initialization of timestamps in messages exchanged
- *   during DV learning
- * -
- *
- * Optimizations:
- * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
- *   against our pending message queue (requires additional per neighbour
- *   hash map to be maintained, avoids possible linear scan on pending msgs)
+ *   reduce CPU and memory use) [CPU, MEMORY]
+ * - if messages are below MTU, consider adding ACKs and other stuff
+ *   to the same transmission to avoid tiny messages (requires planning at
+ *   receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
+ * - When we passively learned DV (with unconfirmed freshness), we
+ *   right now add the path to our list but with a zero path_valid_until
+ *   time and only use it for unconfirmed routes.  However, we could consider
+ *   triggering an explicit validation mechansim ourselves, specifically routing
+ *   a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
 #include "gnunet_signatures.h"
 #include "transport.h"
 
+/**
+ * Maximum number of messages we acknowledge together in one
+ * cummulative ACK.  Larger values may save a bit of bandwidth.
+ */
+#define MAX_CUMMULATIVE_ACKS 64
 
 /**
  * What is the size we assume for a read operation in the
  */
 #define IN_PACKET_SIZE_WITHOUT_MTU 128
 
+/**
+ * Number of slots we keep of historic data for computation of
+ * goodput / message loss ratio.
+ */
+#define GOODPUT_AGING_SLOTS 4
+
+/**
+ * Maximum number of peers we select for forwarding DVInit
+ * messages at the same time (excluding initiator).
+ */
+#define MAX_DV_DISCOVERY_SELECTION 16
+
+/**
+ * Window size. How many messages to the same target do we pass
+ * to CORE without a RECV_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define RECV_WINDOW_SIZE 4
+
 /**
  * Minimum number of hops we should forward DV learn messages
  * even if they are NOT useful for us in hope of looping
 #define DV_PATH_VALIDITY_TIMEOUT \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
 
+/**
+ * How long do we cache backchannel (struct Backtalker) information
+ * after a backchannel goes inactive?
+ */
+#define BACKCHANNEL_INACTIVITY_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+
 /**
  * How long before paths expire would we like to (re)discover DV paths? Should
  * be below #DV_PATH_VALIDITY_TIMEOUT.
 #define MAX_VALIDATION_CHALLENGE_FREQ \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
 
+/**
+ * How long until we forget about historic accumulators and thus
+ * reset the ACK counter? Should exceed the maximum time an
+ * active connection experiences without an ACK.
+ */
+#define ACK_CUMMULATOR_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+
 /**
  * What is the non-randomized base frequency at which we
  * would initiate DV learn messages?
  */
 #define MAX_ADDRESS_VALID_UNTIL \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
+
 /**
  * How long do we consider an address valid if we just checked?
  */
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
+/**
+ * Unique identifier we attach to a message.
+ */
+struct MessageUUIDP
+{
+  /**
+   * Unique value, generated by incrementing the
+   * `message_uuid_ctr` of `struct Neighbour`.
+   */
+  uint64_t uuid GNUNET_PACKED;
+};
+
+
+/**
+ * Unique identifier to map an acknowledgement to a transmission.
+ */
+struct AcknowledgementUUIDP
+{
+  /**
+   * The UUID value.  Not actually a hash, but a random value.
+   */
+  struct GNUNET_ShortHashCode value;
+};
+
+
+/**
+ * Unique identifier we attach to a message.
+ */
+struct FragmentUUIDP
+{
+  /**
+   * Unique value identifying a fragment, in NBO.
+   */
+  uint32_t uuid GNUNET_PACKED;
+};
+
+
+/**
+ * Type of a nonce used for challenges.
+ */
+struct ChallengeNonceP
+{
+  /**
+   * The value of the nonce.  Note that this is NOT a hash.
+   */
+  struct GNUNET_ShortHashCode value;
+};
+
+
 /**
  * Outer layer of an encapsulated backchannel message.
  */
@@ -256,12 +332,9 @@ struct TransportBackchannelEncapsulationMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Distance the backchannel message has traveled, to be updated at
-   * each hop.  Used to bound the number of hops in case a backchannel
-   * message is broadcast and thus travels without routing
-   * information (during initial backchannel discovery).
+   * Reserved, always zero.
    */
-  uint32_t distance;
+  uint32_t reserved GNUNET_PACKED;
 
   /**
    * Target's peer identity (as backchannels may be transmitted
@@ -295,7 +368,7 @@ struct TransportBackchannelEncapsulationMessage
 /**
  * Body by which a peer confirms that it is using an ephemeral key.
  */
-struct EphemeralConfirmation
+struct EphemeralConfirmationPS
 {
 
   /**
@@ -317,7 +390,7 @@ struct EphemeralConfirmation
    * communicators must protect against replay attacks when using backchannel
    * communication!
    */
-  struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time;
 
   /**
    * Target's peer identity.
@@ -336,7 +409,7 @@ struct EphemeralConfirmation
  * Plaintext of the variable-size payload that is encrypted
  * within a `struct TransportBackchannelEncapsulationMessage`
  */
-struct TransportBackchannelRequestPayload
+struct TransportBackchannelRequestPayloadP
 {
 
   /**
@@ -350,22 +423,6 @@ struct TransportBackchannelRequestPayload
    */
   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
 
-  /**
-   * How long is this signature over the ephemeral key valid?
-   *
-   * Note that the receiver MUST IGNORE the absolute time, and only interpret
-   * the value as a mononic time and reject "older" values than the last one
-   * observed.  This is necessary as we do not want to require synchronized
-   * clocks and may not have a bidirectional communication channel.
-   *
-   * Even with this, there is no real guarantee against replay achieved here,
-   * unless the latest timestamp is persisted.  While persistence should be
-   * provided via PEERSTORE, we do not consider the mechanism reliable!  Thus,
-   * communicators must protect against replay attacks when using backchannel
-   * communication!
-   */
-  struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
-
   /**
    * Current monotonic time of the sending transport service.  Used to
    * detect replayed messages.  Note that the receiver should remember
@@ -390,7 +447,7 @@ struct TransportBackchannelRequestPayload
  * Outer layer of an encapsulated unfragmented application message sent
  * over an unreliable channel.
  */
-struct TransportReliabilityBox
+struct TransportReliabilityBoxMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
@@ -410,7 +467,27 @@ struct TransportReliabilityBox
    * messages sent over possibly unreliable channels.  Should
    * be a random.
    */
-  struct GNUNET_ShortHashCode msg_uuid;
+  struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayloadP
+{
+  /**
+   * How long was the ACK delayed for generating cummulative ACKs?
+   * Used to calculate the correct network RTT by taking the receipt
+   * time of the ack minus the transmission time of the sender minus
+   * this value.
+   */
+  struct GNUNET_TIME_RelativeNBO ack_delay;
+
+  /**
+   * UUID of a message being acknowledged.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
 };
 
 
@@ -429,19 +506,12 @@ struct TransportReliabilityAckMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Reserved. Zero.
+   * Counter of ACKs transmitted by the sender to us. Incremented
+   * by one for each ACK, used to detect how many ACKs were lost.
    */
-  uint32_t reserved GNUNET_PACKED;
-
-  /**
-   * How long was the ACK delayed relative to the average time of
-   * receipt of the messages being acknowledged?  Used to calculate
-   * the average RTT by taking the receipt time of the ack minus the
-   * average transmission time of the sender minus this value.
-   */
-  struct GNUNET_TIME_RelativeNBO avg_ack_delay;
+  uint32_t ack_counter GNUNET_PACKED;
 
-  /* followed by any number of `struct GNUNET_ShortHashCode`
+  /* followed by any number of `struct TransportCummulativeAckPayloadP`
      messages providing ACKs */
 };
 
@@ -449,7 +519,7 @@ struct TransportReliabilityAckMessage
 /**
  * Outer layer of an encapsulated fragmented application message.
  */
-struct TransportFragmentBox
+struct TransportFragmentBoxMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
@@ -459,18 +529,17 @@ struct TransportFragmentBox
   /**
    * Unique ID of this fragment (and fragment transmission!). Will
    * change even if a fragement is retransmitted to make each
-   * transmission attempt unique! Should be incremented by one for
-   * each fragment transmission. If a client receives a duplicate
-   * fragment (same @e frag_off), it must send
-   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
+   * transmission attempt unique! If a client receives a duplicate
+   * fragment (same @e frag_off for same @a msg_uuid, it must send
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately.
    */
-  uint32_t frag_uuid GNUNET_PACKED;
+  struct AcknowledgementUUIDP ack_uuid;
 
   /**
-   * Original message ID for of the message that all the1
-   * fragments belong to.  Must be the same for all fragments.
+   * Original message ID for of the message that all the fragments
+   * belong to.  Must be the same for all fragments.
    */
-  struct GNUNET_ShortHashCode msg_uuid;
+  struct MessageUUIDP msg_uuid;
 
   /**
    * Offset of this fragment in the overall message.
@@ -484,54 +553,6 @@ struct TransportFragmentBox
 };
 
 
-/**
- * Outer layer of an fragmented application message sent over a queue
- * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
- * received, the receiver has two RTTs or 64 further fragments with
- * the same basic message time to send an acknowledgement, possibly
- * acknowledging up to 65 fragments in one ACK.  ACKs must also be
- * sent immediately once all fragments were sent.
- */
-struct TransportFragmentAckMessage
-{
-  /**
-   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Unique ID of the lowest fragment UUID being acknowledged.
-   */
-  uint32_t frag_uuid GNUNET_PACKED;
-
-  /**
-   * Bitfield of up to 64 additional fragments following the
-   * @e msg_uuid being acknowledged by this message.
-   */
-  uint64_t extra_acks GNUNET_PACKED;
-
-  /**
-   * Original message ID for of the message that all the
-   * fragments belong to.
-   */
-  struct GNUNET_ShortHashCode msg_uuid;
-
-  /**
-   * How long was the ACK delayed relative to the average time of
-   * receipt of the fragments being acknowledged?  Used to calculate
-   * the average RTT by taking the receipt time of the ack minus the
-   * average transmission time of the sender minus this value.
-   */
-  struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
-  /**
-   * How long until the receiver will stop trying reassembly
-   * of this message?
-   */
-  struct GNUNET_TIME_RelativeNBO reassembly_timeout;
-};
-
-
 /**
  * Content signed by the initator during DV learning.
  *
@@ -573,7 +594,7 @@ struct DvInitPS
   /**
    * Challenge value used by the initiator to re-identify the path.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 };
 
 
@@ -613,13 +634,13 @@ struct DvHopPS
   /**
    * Challenge value used by the initiator to re-identify the path.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 };
 
 
 /**
  * An entry describing a peer on a path in a
- * `struct TransportDVLearn` message.
+ * `struct TransportDVLearnMessage` message.
  */
 struct DVPathEntryP
 {
@@ -649,7 +670,7 @@ struct DVPathEntryP
  * zero, peers that can forward to the initator should always try to
  * forward to the initiator.
  */
-struct TransportDVLearn
+struct TransportDVLearnMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
@@ -679,6 +700,20 @@ struct TransportDVLearn
    */
   struct GNUNET_TIME_RelativeNBO non_network_delay;
 
+  /**
+   * Time at the initiator when generating the signature.
+   *
+   * Note that the receiver MUST IGNORE the absolute time, and only interpret
+   * the value as a mononic time and reject "older" values than the last one
+   * observed.  This is necessary as we do not want to require synchronized
+   * clocks and may not have a bidirectional communication channel.
+   *
+   * Even with this, there is no real guarantee against replay achieved here,
+   * unless the latest timestamp is persisted.  Persistence should be
+   * provided via PEERSTORE if possible.
+   */
+  struct GNUNET_TIME_AbsoluteNBO monotonic_time;
+
   /**
    * Signature of this hop over the path, of purpose
    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
@@ -693,7 +728,7 @@ struct TransportDVLearn
   /**
    * Challenge value used by the initiator to re-identify the path.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /* Followed by @e num_hops `struct DVPathEntryP` values,
      excluding the initiator of the DV trace; the last entry is the
@@ -716,7 +751,7 @@ struct TransportDVLearn
  *
  * If a peer finds itself still on the list, it must drop the message.
  */
-struct TransportDVBox
+struct TransportDVBoxMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
@@ -754,7 +789,7 @@ struct TransportDVBox
  * Message send to another peer to validate that it can indeed
  * receive messages at a particular address.
  */
-struct TransportValidationChallenge
+struct TransportValidationChallengeMessage
 {
 
   /**
@@ -770,7 +805,7 @@ struct TransportValidationChallenge
   /**
    * Challenge to be signed by the receiving peer.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * Timestamp of the sender, to be copied into the reply
@@ -801,7 +836,7 @@ struct TransportValidationPS
   /**
    * Challenge signed by the receiving peer.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 };
 
 
@@ -809,7 +844,7 @@ struct TransportValidationPS
  * Message send to a peer to respond to a
  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
  */
-struct TransportValidationResponse
+struct TransportValidationResponseMessage
 {
 
   /**
@@ -831,7 +866,7 @@ struct TransportValidationResponse
   /**
    * The challenge that was signed by the receiving peer.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * Original timestamp of the sender (was @code{sender_time}),
@@ -853,7 +888,8 @@ GNUNET_NETWORK_STRUCT_END
 /**
  * What type of client is the `struct TransportClient` about?
  */
-enum ClientType {
+enum ClientType
+{
   /**
    * We do not know yet (client is fresh).
    */
@@ -881,6 +917,41 @@ enum ClientType {
 };
 
 
+/**
+ * Which transmission options are allowable for transmission?
+ * Interpreted bit-wise!
+ */
+enum RouteMessageOptions
+{
+  /**
+   * Only confirmed, non-DV direct neighbours.
+   */
+  RMO_NONE = 0,
+
+  /**
+   * We are allowed to use DV routing for this @a hdr
+   */
+  RMO_DV_ALLOWED = 1,
+
+  /**
+   * We are allowed to use unconfirmed queues or DV routes for this message
+   */
+  RMO_UNCONFIRMED_ALLOWED = 2,
+
+  /**
+   * Reliable and unreliable, DV and non-DV are all acceptable.
+   */
+  RMO_ANYTHING_GOES = (RMO_DV_ALLOWED | RMO_UNCONFIRMED_ALLOWED),
+
+  /**
+   * If we have multiple choices, it is OK to send this message
+   * over multiple channels at the same time to improve loss tolerance.
+   * (We do at most 2 transmissions.)
+   */
+  RMO_REDUNDANT = 4
+};
+
+
 /**
  * When did we launch this DV learning activity?
  */
@@ -900,7 +971,7 @@ struct LearnLaunchEntry
   /**
    * Challenge that uniquely identifies this activity.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * When did we transmit the DV learn message (used to calculate RTT) and
@@ -937,6 +1008,11 @@ struct EphemeralCacheEntry
    */
   struct GNUNET_TIME_Absolute ephemeral_validity;
 
+  /**
+   * What time was @e sender_sig created
+   */
+  struct GNUNET_TIME_Absolute monotime;
+
   /**
    * Our ephemeral key.
    */
@@ -955,124 +1031,396 @@ struct EphemeralCacheEntry
 };
 
 
+/**
+ * Information we keep per #GOODPUT_AGING_SLOTS about historic
+ * (or current) transmission performance.
+ */
+struct TransmissionHistoryEntry
+{
+  /**
+   * Number of bytes actually sent in the interval.
+   */
+  uint64_t bytes_sent;
+
+  /**
+   * Number of bytes received and acknowledged by the other peer in
+   * the interval.
+   */
+  uint64_t bytes_received;
+};
+
+
+/**
+ * Performance data for a transmission possibility.
+ */
+struct PerformanceData
+{
+  /**
+   * Weighted average for the RTT.
+   */
+  struct GNUNET_TIME_Relative aged_rtt;
+
+  /**
+   * Historic performance data, using a ring buffer of#GOODPUT_AGING_SLOTS
+   * entries.
+   */
+  struct TransmissionHistoryEntry the[GOODPUT_AGING_SLOTS];
+
+  /**
+   * What was the last age when we wrote to @e the? Used to clear
+   * old entries when the age advances.
+   */
+  unsigned int last_age;
+};
+
+
 /**
  * Client connected to the transport service.
  */
 struct TransportClient;
 
-
 /**
  * A neighbour that at least one communicator is connected to.
  */
 struct Neighbour;
 
-
 /**
  * Entry in our #dv_routes table, representing a (set of) distance
  * vector routes to a particular peer.
  */
 struct DistanceVector;
 
+/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
+
 /**
  * One possible hop towards a DV target.
  */
-struct DistanceVectorHop
+struct DistanceVectorHop;
+
+
+/**
+ * Context from #handle_incoming_msg().  Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
 {
 
   /**
-   * Kept in a MDLL, sorted by @e timeout.
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
    */
-  struct DistanceVectorHop *next_dv;
+  struct CommunicatorMessageContext *next;
 
   /**
-   * Kept in a MDLL, sorted by @e timeout.
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
    */
-  struct DistanceVectorHop *prev_dv;
+  struct CommunicatorMessageContext *prev;
 
   /**
-   * Kept in a MDLL.
+   * Which communicator provided us with the message.
    */
-  struct DistanceVectorHop *next_neighbour;
+  struct TransportClient *tc;
 
   /**
-   * Kept in a MDLL.
+   * Additional information for flow control and about the sender.
    */
-  struct DistanceVectorHop *prev_neighbour;
+  struct GNUNET_TRANSPORT_IncomingMessage im;
 
   /**
-   * What would be the next hop to @e target?
+   * Number of hops the message has travelled (if DV-routed).
+   * FIXME: make use of this in ACK handling!
    */
-  struct Neighbour *next_hop;
+  uint16_t total_hops;
+};
 
+
+/**
+ * A virtual link is another reachable peer that is known to CORE.  It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`.  With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink
+{
   /**
-   * Distance vector entry this hop belongs with.
+   * Identity of the peer at the other end of the link.
    */
-  struct DistanceVector *dv;
+  struct GNUNET_PeerIdentity target;
 
   /**
-   * Array of @e distance hops to the target, excluding @e next_hop.
-   * NULL if the entire path is us to @e next_hop to `target`. Allocated
-   * at the end of this struct.
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
    */
-  const struct GNUNET_PeerIdentity *path;
+  struct CommunicatorMessageContext *cmc_head;
 
   /**
-   * At what time do we forget about this path unless we see it again
-   * while learning?
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct CommunicatorMessageContext *cmc_tail;
 
   /**
-   * After what time do we know for sure that the path must have existed?
-   * Set to ZERO if the path is learned by snooping on DV learn messages
-   * initiated by other peers, and to the time at which we generated the
-   * challenge for DV learn operations this peer initiated.
+   * Task scheduled to possibly notfiy core that this peer is no
+   * longer counting as confirmed.  Runs the #core_visibility_check(),
+   * which checks that some DV-path or a queue exists that is still
+   * considered confirmed.
    */
-  struct GNUNET_TIME_Absolute freshness;
+  struct GNUNET_SCHEDULER_Task *visibility_task;
 
   /**
-   * How many hops in total to the `target` (excluding @e next_hop and `target`
-   * itself), thus 0 still means a distance of 2 hops (to @e next_hop and then
-   * to `target`)?
+   * Neighbour used by this virtual link, NULL if @e dv is used.
    */
-  unsigned int distance;
+  struct Neighbour *n;
+
+  /**
+   * Distance vector used by this virtual link, NULL if @e n is used.
+   */
+  struct DistanceVector *dv;
+
+  /**
+   * How many more messages can we send to core before we exhaust
+   * the receive window of CORE for this peer? If this hits zero,
+   * we must tell communicators to stop providing us more messages
+   * for this peer.  In fact, the window can go negative as we
+   * have multiple communicators, so per communicator we can go
+   * down by one into the negative range.
+   */
+  int core_recv_window;
 };
 
 
 /**
- * Entry in our #dv_routes table, representing a (set of) distance
- * vector routes to a particular peer.
+ * Data structure kept when we are waiting for an acknowledgement.
  */
-struct DistanceVector
+struct PendingAcknowledgement
 {
 
   /**
-   * To which peer is this a route?
+   * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to its pending message.
    */
-  struct GNUNET_PeerIdentity target;
+  struct PendingAcknowledgement *next_pm;
 
   /**
-   * Known paths to @e target.
+   * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to its pending message.
    */
-  struct DistanceVectorHop *dv_head;
+  struct PendingAcknowledgement *prev_pm;
 
   /**
-   * Known paths to @e target.
+   * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the queue that was used to transmit the
+   * @a pm.
    */
-  struct DistanceVectorHop *dv_tail;
+  struct PendingAcknowledgement *next_queue;
 
   /**
-   * Task scheduled to purge expired paths from @e dv_head MDLL.
+   * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the queue that was used to transmit the
+   * @a pm.
    */
-  struct GNUNET_SCHEDULER_Task *timeout_task;
-};
+  struct PendingAcknowledgement *prev_queue;
 
+  /**
+   * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the DVH that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *next_dvh;
 
-/**
- * A queue is a message queue provided by a communicator
- * via which we can reach a particular neighbour.
+  /**
+   * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the DVH that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *prev_dvh;
+
+  /**
+   * Pointers for the DLL of all pending acknowledgements.
+   * This list is sorted by @e transmission time.  If the list gets too
+   * long, the oldest entries are discarded.
+   */
+  struct PendingAcknowledgement *next_pa;
+
+  /**
+   * Pointers for the DLL of all pending acknowledgements.
+   * This list is sorted by @e transmission time.  If the list gets too
+   * long, the oldest entries are discarded.
+   */
+  struct PendingAcknowledgement *prev_pa;
+
+  /**
+   * Unique identifier for this transmission operation.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
+
+  /**
+   * Message that was transmitted, may be NULL if the message was ACKed
+   * via another channel.
+   */
+  struct PendingMessage *pm;
+
+  /**
+   * Distance vector path chosen for this transmission, NULL if transmission
+   * was to a direct neighbour OR if the path was forgotten in the meantime.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * Queue used for transmission, NULL if the queue has been destroyed
+   * (which may happen before we get an acknowledgement).
+   */
+  struct Queue *queue;
+
+  /**
+   * Time of the transmission, for RTT calculation.
+   */
+  struct GNUNET_TIME_Absolute transmission_time;
+
+  /**
+   * Number of bytes of the original message (to calculate bandwidth).
+   */
+  uint16_t message_size;
+};
+
+
+/**
+ * One possible hop towards a DV target.
  */
-struct Queue;
+struct DistanceVectorHop
+{
+
+  /**
+   * Kept in a MDLL, sorted by @e timeout.
+   */
+  struct DistanceVectorHop *next_dv;
+
+  /**
+   * Kept in a MDLL, sorted by @e timeout.
+   */
+  struct DistanceVectorHop *prev_dv;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct DistanceVectorHop *next_neighbour;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct DistanceVectorHop *prev_neighbour;
+
+  /**
+   * Head of MDLL of messages routed via this path.
+   */
+  struct PendingMessage *pending_msg_head;
+
+  /**
+   * Tail of MDLL of messages routed via this path.
+   */
+  struct PendingMessage *pending_msg_tail;
+
+  /**
+   * Head of DLL of PAs that used our @a path.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs that used our @a path.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
+  /**
+   * What would be the next hop to @e target?
+   */
+  struct Neighbour *next_hop;
+
+  /**
+   * Distance vector entry this hop belongs with.
+   */
+  struct DistanceVector *dv;
+
+  /**
+   * Array of @e distance hops to the target, excluding @e next_hop.
+   * NULL if the entire path is us to @e next_hop to `target`. Allocated
+   * at the end of this struct. Excludes the target itself!
+   */
+  const struct GNUNET_PeerIdentity *path;
+
+  /**
+   * At what time do we forget about this path unless we see it again
+   * while learning?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * For how long is the validation of this path considered
+   * valid?
+   * Set to ZERO if the path is learned by snooping on DV learn messages
+   * initiated by other peers, and to the time at which we generated the
+   * challenge for DV learn operations this peer initiated.
+   */
+  struct GNUNET_TIME_Absolute path_valid_until;
+
+  /**
+   * Performance data for this transmission possibility.
+   */
+  struct PerformanceData pd;
+
+  /**
+   * Number of hops in total to the `target` (excluding @e next_hop and `target`
+   * itself). Thus 0 still means a distance of 2 hops (to @e next_hop and then
+   * to `target`).
+   */
+  unsigned int distance;
+};
+
+
+/**
+ * Entry in our #dv_routes table, representing a (set of) distance
+ * vector routes to a particular peer.
+ */
+struct DistanceVector
+{
+
+  /**
+   * To which peer is this a route?
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Known paths to @e target.
+   */
+  struct DistanceVectorHop *dv_head;
+
+  /**
+   * Known paths to @e target.
+   */
+  struct DistanceVectorHop *dv_tail;
+
+  /**
+   * Task scheduled to purge expired paths from @e dv_head MDLL.
+   */
+  struct GNUNET_SCHEDULER_Task *timeout_task;
+
+  /**
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?  If so, this is the virtual link, otherwise NULL.
+   */
+  struct VirtualLink *link;
+};
 
 
 /**
@@ -1102,6 +1450,11 @@ struct QueueEntry
    */
   struct Queue *queue;
 
+  /**
+   * Pending message this entry is for, or NULL for none.
+   */
+  struct PendingMessage *pm;
+
   /**
    * Message ID used for this message with the queue used for transmission.
    */
@@ -1135,6 +1488,16 @@ struct Queue
    */
   struct Queue *next_client;
 
+  /**
+   * Head of DLL of PAs that used this queue.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs that used this queue.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * Head of DLL of unacked transmission requests.
    */
@@ -1162,17 +1525,27 @@ struct Queue
 
   /**
    * Task scheduled for the time when this queue can (likely) transmit the
-   * next message. Still needs to check with the @e tracker_out to be sure.
+   * next message.
    */
   struct GNUNET_SCHEDULER_Task *transmit_task;
 
   /**
-   * Our current RTT estimate for this queue.
+   * How long do *we* consider this @e address to be valid?  In the past or
+   * zero if we have not yet validated it.  Can be updated based on
+   * challenge-response validations (via address validation logic), or when we
+   * receive ACKs that we can definitively map to transmissions via this
+   * queue.
    */
-  struct GNUNET_TIME_Relative rtt;
+  struct GNUNET_TIME_Absolute validated_until;
+
+  /**
+   * Performance data for this queue.
+   */
+  struct PerformanceData pd;
 
   /**
-   * Message ID generator for transmissions on this queue.
+   * Message ID generator for transmissions on this queue to the
+   * communicator.
    */
   uint64_t mid_gen;
 
@@ -1186,12 +1559,6 @@ struct Queue
    */
   uint32_t mtu;
 
-  /**
-   * Distance to the target of this queue.
-   * FIXME: needed? DV is done differently these days...
-   */
-  uint32_t distance;
-
   /**
    * Messages pending.
    */
@@ -1216,16 +1583,6 @@ struct Queue
    * Connection status for this queue.
    */
   enum GNUNET_TRANSPORT_ConnectionStatus cs;
-
-  /**
-   * How much outbound bandwidth do we have available for this queue?
-   */
-  struct GNUNET_BANDWIDTH_Tracker tracker_out;
-
-  /**
-   * How much inbound bandwidth do we have available for this queue?
-   */
-  struct GNUNET_BANDWIDTH_Tracker tracker_in;
 };
 
 
@@ -1236,10 +1593,10 @@ struct ReassemblyContext
 {
 
   /**
-   * Original message ID for of the message that all the
-   * fragments belong to.
+   * Original message ID for of the message that all the fragments
+   * belong to.
    */
-  struct GNUNET_ShortHashCode msg_uuid;
+  struct MessageUUIDP msg_uuid;
 
   /**
    * Which neighbour is this context for?
@@ -1272,36 +1629,12 @@ struct ReassemblyContext
    */
   struct GNUNET_TIME_Absolute reassembly_timeout;
 
-  /**
-   * Average delay of all acks in @e extra_acks and @e frag_uuid.
-   * Should be reset to zero when @e num_acks is set to 0.
-   */
-  struct GNUNET_TIME_Relative avg_ack_delay;
-
   /**
    * Time we received the last fragment.  @e avg_ack_delay must be
    * incremented by now - @e last_frag multiplied by @e num_acks.
    */
   struct GNUNET_TIME_Absolute last_frag;
 
-  /**
-   * Bitfield of up to 64 additional fragments following @e frag_uuid
-   * to be acknowledged in the next cummulative ACK.
-   */
-  uint64_t extra_acks;
-
-  /**
-   * Unique ID of the lowest fragment UUID to be acknowledged in the
-   * next cummulative ACK.  Only valid if @e num_acks > 0.
-   */
-  uint32_t frag_uuid;
-
-  /**
-   * Number of ACKs we have accumulated so far.  Reset to 0
-   * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
-   */
-  unsigned int num_acks;
-
   /**
    * How big is the message we are reassembling in total?
    */
@@ -1336,7 +1669,7 @@ struct Neighbour
    * reassembly. May be NULL if we currently have no fragments from
    * this @e pid (lazy initialization).
    */
-  struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
+  struct GNUNET_CONTAINER_MultiHashMap32 *reassembly_map;
 
   /**
    * Heap with `struct ReassemblyContext` structs for fragments under
@@ -1383,25 +1716,34 @@ struct Neighbour
   struct Queue *queue_tail;
 
   /**
-   * Task run to cleanup pending messages that have exceeded their timeout.
+   * Handle for an operation to fetch @e last_dv_learn_monotime information from
+   * the PEERSTORE, or NULL.
    */
-  struct GNUNET_SCHEDULER_Task *timeout_task;
+  struct GNUNET_PEERSTORE_IterateContext *get;
 
   /**
-   * Quota at which CORE is allowed to transmit to this peer.
-   *
-   * FIXME: not yet used, tricky to get right given multiple queues!
-   *        (=> Idea: measure???)
-   * FIXME: how do we set this value initially when we tell CORE?
-   *    Options: start at a minimum value or at literally zero?
-   *         (=> Current thought: clean would be zero!)
+   * Handle to a PEERSTORE store operation to store this @e pid's @e
+   * @e last_dv_learn_monotime.  NULL if no PEERSTORE operation is pending.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
+  /**
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?  If so, this is the virtual link, otherwise NULL.
+   */
+  struct VirtualLink *link;
+
+  /**
+   * Latest DVLearn monotonic time seen from this peer.  Initialized only
+   * if @e dl_monotime_available is #GNUNET_YES.
    */
-  struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+  struct GNUNET_TIME_Absolute last_dv_learn_monotime;
 
   /**
-   * What is the earliest timeout of any message in @e pending_msg_tail?
+   * Do we have the lastest value for @e last_dv_learn_monotime from
+   * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
    */
-  struct GNUNET_TIME_Absolute earliest_timeout;
+  int dv_monotime_available;
 };
 
 
@@ -1441,7 +1783,8 @@ struct PeerRequest
 /**
  * Types of different pending messages.
  */
-enum PendingMessageType {
+enum PendingMessageType
+{
 
   /**
    * Ordinary message received from the CORE service.
@@ -1461,7 +1804,12 @@ enum PendingMessageType {
   /**
    * Any type of acknowledgement.
    */
-  PMT_ACKNOWLEDGEMENT = 3
+  PMT_ACKNOWLEDGEMENT = 3,
+
+  /**
+   * Control traffic generated by the TRANSPORT service itself.
+   */
+  PMT_CONTROL = 4
 
 };
 
@@ -1526,6 +1874,28 @@ struct PendingMessage
    */
   struct PendingMessage *prev_frag;
 
+  /**
+   * Kept in a MDLL of messages using this @a dvh (if @e dvh is
+   * non-NULL).
+   */
+  struct PendingMessage *next_dvh;
+
+  /**
+   * Kept in a MDLL of messages using this @a dvh (if @e dvh is
+   * non-NULL).
+   */
+  struct PendingMessage *prev_dvh;
+
+  /**
+   * Head of DLL of PAs for this pending message.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs for this pending message.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * This message, reliability boxed. Only possibly available if @e pmt is
    * #PMT_CORE.
@@ -1533,10 +1903,27 @@ struct PendingMessage
   struct PendingMessage *bpm;
 
   /**
-   * Target of the request.
+   * Target of the request (for transmission, may not be ultimate
+   * destination!).
    */
   struct Neighbour *target;
 
+  /**
+   * Distance vector path selected for this message, or
+   * NULL if transmitted directly.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * Set to non-NULL value if this message is currently being given to a
+   * communicator and we are awaiting that communicator's acknowledgement.
+   * Note that we must not retransmit a pending message while we're still
+   * in the process of giving it to a communicator. If a pending message
+   * is free'd while this entry is non-NULL, the @e qe reference to us
+   * should simply be set to NULL.
+   */
+  struct QueueEntry *qe;
+
   /**
    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
    */
@@ -1571,12 +1958,7 @@ struct PendingMessage
    * UUID to use for this message (used for reassembly of fragments, only
    * initialized if @e msg_uuid_set is #GNUNET_YES).
    */
-  struct GNUNET_ShortHashCode msg_uuid;
-
-  /**
-   * Counter incremented per generated fragment.
-   */
-  uint32_t frag_uuidgen;
+  struct MessageUUIDP msg_uuid;
 
   /**
    * Type of the pending message.
@@ -1602,6 +1984,66 @@ struct PendingMessage
 };
 
 
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayload
+{
+  /**
+   * When did we receive the message we are ACKing?  Used to calculate
+   * the delay we introduced by cummulating ACKs.
+   */
+  struct GNUNET_TIME_Absolute receive_time;
+
+  /**
+   * UUID of a message being acknowledged.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Data structure in which we track acknowledgements still to
+ * be sent to the
+ */
+struct AcknowledgementCummulator
+{
+  /**
+   * Target peer for which we are accumulating ACKs here.
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * ACK data being accumulated.  Only @e num_acks slots are valid.
+   */
+  struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS];
+
+  /**
+   * Task scheduled either to transmit the cummulative ACK message,
+   * or to clean up this data structure after extended periods of
+   * inactivity (if @e num_acks is zero).
+   */
+  struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * When is @e task run (only used if @e num_acks is non-zero)?
+   */
+  struct GNUNET_TIME_Absolute min_transmission_time;
+
+  /**
+   * Counter to produce the `ack_counter` in the `struct
+   * TransportReliabilityAckMessage`.  Allows the receiver to detect
+   * lost ACK messages.  Incremented by @e num_acks upon transmission.
+   */
+  uint32_t ack_counter;
+
+  /**
+   * Number of entries used in @e ack_uuids.  Reset to 0 upon transmission.
+   */
+  unsigned int num_acks;
+};
+
+
 /**
  * One of the addresses of this peer.
  */
@@ -1870,7 +2312,7 @@ struct ValidationState
    * (We must not rotate more often as otherwise we may discard valid answers
    * due to packet losses, latency and reorderings on the network).
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * Claimed address of the peer.
@@ -1899,12 +2341,71 @@ struct ValidationState
 
 
 /**
- * Head of linked list of all clients to this service.
+ * A Backtalker is a peer sending us backchannel messages. We use this
+ * struct to detect monotonic time violations, cache ephemeral key
+ * material (to avoid repeatedly checking signatures), and to synchronize
+ * monotonic time with the PEERSTORE.
  */
-static struct TransportClient *clients_head;
-
-/**
- * Tail of linked list of all clients to this service.
+struct Backtalker
+{
+  /**
+   * Peer this is about.
+   */
+  struct GNUNET_PeerIdentity pid;
+
+  /**
+   * Last (valid) monotonic time received from this sender.
+   */
+  struct GNUNET_TIME_Absolute monotonic_time;
+
+  /**
+   * When will this entry time out?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * Last (valid) ephemeral key received from this sender.
+   */
+  struct GNUNET_CRYPTO_EcdhePublicKey last_ephemeral;
+
+  /**
+   * Task associated with this backtalker. Can be for timeout,
+   * or other asynchronous operations.
+   */
+  struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * Communicator context waiting on this backchannel's @e get, or NULL.
+   */
+  struct CommunicatorMessageContext *cmc;
+
+  /**
+   * Handle for an operation to fetch @e monotonic_time information from the
+   * PEERSTORE, or NULL.
+   */
+  struct GNUNET_PEERSTORE_IterateContext *get;
+
+  /**
+   * Handle to a PEERSTORE store operation for this @e pid's @e
+   * monotonic_time.  NULL if no PEERSTORE operation is pending.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
+  /**
+   * Number of bytes of the original message body that follows after this
+   * struct.
+   */
+  size_t body_size;
+};
+
+
+/**
+ * Head of linked list of all clients to this service.
+ */
+static struct TransportClient *clients_head;
+
+/**
+ * Tail of linked list of all clients to this service.
  */
 static struct TransportClient *clients_tail;
 
@@ -1934,6 +2435,24 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
 
+/**
+ * Map from PIDs to `struct Backtalker` entries.  A peer is
+ * a backtalker if it recently send us backchannel messages.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers;
+
+/**
+ * Map from PIDs to `struct AcknowledgementCummulator`s.
+ * Here we track the cummulative ACKs for transmission.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
+
+/**
+ * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
+ * a `struct PendingAcknowledgement`.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+
 /**
  * Map from PIDs to `struct DistanceVector` entries describing
  * known paths to the peer.
@@ -1946,6 +2465,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
 
+/**
+ * Map from PIDs to `struct VirtualLink` entries describing
+ * links CORE knows to exist.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *links;
+
 /**
  * Map from challenges to `struct LearnLaunchEntry` values.
  */
@@ -2003,6 +2528,81 @@ static struct GNUNET_SCHEDULER_Task *dvlearn_task;
  */
 static struct GNUNET_SCHEDULER_Task *validation_task;
 
+/**
+ * The most recent PA we have created, head of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_head;
+
+/**
+ * The oldest PA we have created, tail of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_tail;
+
+/**
+ * Number of entries in the #pa_head/#pa_tail DLL.  Used to
+ * limit the size of the data structure.
+ */
+static unsigned int pa_count;
+
+
+/**
+ * Get an offset into the transmission history buffer for `struct
+ * PerformanceData`.  Note that the caller must perform the required
+ * modulo #GOODPUT_AGING_SLOTS operation before indexing into the
+ * array!
+ *
+ * An 'age' lasts 15 minute slots.
+ *
+ * @return current age of the world
+ */
+static unsigned int
+get_age ()
+{
+  struct GNUNET_TIME_Absolute now;
+
+  now = GNUNET_TIME_absolute_get ();
+  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
+}
+
+
+/**
+ * Release @a pa data structure.
+ *
+ * @param pa data structure to release
+ */
+static void
+free_pending_acknowledgement (struct PendingAcknowledgement *pa)
+{
+  struct Queue *q = pa->queue;
+  struct PendingMessage *pm = pa->pm;
+  struct DistanceVectorHop *dvh = pa->dvh;
+
+  GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
+  pa_count--;
+  if (NULL != q)
+  {
+    GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
+    pa->queue = NULL;
+  }
+  if (NULL != pm)
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+    pa->queue = NULL;
+  }
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multishortmap_remove (pending_acks,
+                                                        &pa->ack_uuid.value,
+                                                        pa));
+  GNUNET_free (pa);
+}
+
 
 /**
  * Free cached ephemeral key.
@@ -2018,6 +2618,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece)
 }
 
 
+/**
+ * Free virtual link.
+ *
+ * @param vl link data to free
+ */
+static void
+free_virtual_link (struct VirtualLink *vl)
+{
+  GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+  if (NULL != vl->visibility_task)
+  {
+    GNUNET_SCHEDULER_cancel (vl->visibility_task);
+    vl->visibility_task = NULL;
+  }
+  GNUNET_break (NULL == vl->n);
+  GNUNET_break (NULL == vl->dv);
+  GNUNET_free (vl);
+}
+
+
 /**
  * Free validation state.
  *
@@ -2029,7 +2649,8 @@ free_validation_state (struct ValidationState *vs)
   GNUNET_CONTAINER_multipeermap_remove (validation_map, &vs->pid, vs);
   GNUNET_CONTAINER_heap_remove_node (vs->hn);
   vs->hn = NULL;
-  if (NULL != vs->sc) {
+  if (NULL != vs->sc)
+  {
     GNUNET_PEERSTORE_store_cancel (vs->sc);
     vs->sc = NULL;
   }
@@ -2098,7 +2719,22 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
 {
   struct Neighbour *n = dvh->next_hop;
   struct DistanceVector *dv = dvh->dv;
+  struct PendingAcknowledgement *pa;
+  struct PendingMessage *pm;
 
+  while (NULL != (pm = dvh->pending_msg_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh,
+                                  dvh->pending_msg_head,
+                                  dvh->pending_msg_tail,
+                                  pm);
+    pm->dvh = NULL;
+  }
+  while (NULL != (pa = dvh->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+    pa->dvh = NULL;
+  }
   GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
   GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
   GNUNET_free (dvh);
@@ -2118,10 +2754,11 @@ free_dv_route (struct DistanceVector *dv)
 
   while (NULL != (dvh = dv->dv_head))
     free_distance_vector_hop (dvh);
-  if (NULL == dv->dv_head) {
+  if (NULL == dv->dv_head)
+  {
     GNUNET_assert (
-      GNUNET_YES
-      == GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
+      GNUNET_YES ==
+      GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
     if (NULL != dv->timeout_task)
       GNUNET_SCHEDULER_cancel (dv->timeout_task);
     GNUNET_free (dv);
@@ -2156,13 +2793,13 @@ notify_monitor (struct TransportClient *tc,
   env = GNUNET_MQ_msg_extra (md,
                              addr_len,
                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
-  md->nt = htonl ((uint32_t)nt);
+  md->nt = htonl ((uint32_t) nt);
   md->peer = *peer;
   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
-  md->cs = htonl ((uint32_t)me->cs);
+  md->cs = htonl ((uint32_t) me->cs);
   md->num_msg_pending = htonl (me->num_msg_pending);
   md->num_bytes_pending = htonl (me->num_bytes_pending);
   memcpy (&md[1], address, addr_len);
@@ -2185,13 +2822,14 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
                  enum GNUNET_NetworkType nt,
                  const struct MonitorEvent *me)
 {
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) {
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
     if (CT_MONITOR != tc->type)
       continue;
     if (tc->details.monitor.one_shot)
       continue;
-    if ((0 != GNUNET_is_zero (&tc->details.monitor.peer))
-        && (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
+    if ((0 != GNUNET_is_zero (&tc->details.monitor.peer)) &&
+        (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
       continue;
     notify_monitor (tc, peer, address, nt, me);
   }
@@ -2214,7 +2852,7 @@ client_connect_cb (void *cls,
 {
   struct TransportClient *tc;
 
-  (void)cls;
+  (void) cls;
   tc = GNUNET_new (struct TransportClient);
   tc->client = client;
   tc->mq = mq;
@@ -2235,10 +2873,10 @@ free_reassembly_context (struct ReassemblyContext *rc)
   struct Neighbour *n = rc->neighbour;
 
   GNUNET_assert (rc == GNUNET_CONTAINER_heap_remove_node (rc->hn));
-  GNUNET_assert (GNUNET_OK
-                 == GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
-                                                           &rc->msg_uuid,
-                                                           rc));
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multihashmap32_remove (n->reassembly_map,
+                                                         rc->msg_uuid.uuid,
+                                                         rc));
   GNUNET_free (rc);
 }
 
@@ -2255,18 +2893,19 @@ reassembly_cleanup_task (void *cls)
   struct ReassemblyContext *rc;
 
   n->reassembly_timeout_task = NULL;
-  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap))) {
-    if (0
-        == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)
-             .rel_value_us) {
+  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
+  {
+    if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)
+               .rel_value_us)
+    {
       free_reassembly_context (rc);
       continue;
     }
     GNUNET_assert (NULL == n->reassembly_timeout_task);
-    n->reassembly_timeout_task
-      GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
-                                 &reassembly_cleanup_task,
-                                 n);
+    n->reassembly_timeout_task =
+      GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
+                               &reassembly_cleanup_task,
+                               n);
     return;
   }
 }
@@ -2281,14 +2920,12 @@ reassembly_cleanup_task (void *cls)
  * @return #GNUNET_OK (continue iteration)
  */
 static int
-free_reassembly_cb (void *cls,
-                    const struct GNUNET_ShortHashCode *key,
-                    void *value)
+free_reassembly_cb (void *cls, uint32_t key, void *value)
 {
   struct ReassemblyContext *rc = value;
-  (void)cls;
-  (void)key;
 
+  (void) cls;
+  (void) key;
   free_reassembly_context (rc);
   return GNUNET_OK;
 }
@@ -2305,22 +2942,22 @@ free_neighbour (struct Neighbour *neighbour)
   struct DistanceVectorHop *dvh;
 
   GNUNET_assert (NULL == neighbour->queue_head);
-  GNUNET_assert (GNUNET_YES
-                 == GNUNET_CONTAINER_multipeermap_remove (neighbours,
-                                                          &neighbour->pid,
-                                                          neighbour));
-  if (NULL != neighbour->timeout_task)
-    GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
-  if (NULL != neighbour->reassembly_map) {
-    GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
-                                            &free_reassembly_cb,
-                                            NULL);
-    GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_remove (neighbours,
+                                                       &neighbour->pid,
+                                                       neighbour));
+  if (NULL != neighbour->reassembly_map)
+  {
+    GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
+                                             &free_reassembly_cb,
+                                             NULL);
+    GNUNET_CONTAINER_multihashmap32_destroy (neighbour->reassembly_map);
     neighbour->reassembly_map = NULL;
     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
     neighbour->reassembly_heap = NULL;
   }
-  while (NULL != (dvh = neighbour->dv_head)) {
+  while (NULL != (dvh = neighbour->dv_head))
+  {
     struct DistanceVector *dv = dvh->dv;
 
     free_distance_vector_hop (dvh);
@@ -2328,7 +2965,20 @@ free_neighbour (struct Neighbour *neighbour)
       free_dv_route (dv);
   }
   if (NULL != neighbour->reassembly_timeout_task)
+  {
     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
+    neighbour->reassembly_timeout_task = NULL;
+  }
+  if (NULL != neighbour->get)
+  {
+    GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
+    neighbour->get = NULL;
+  }
+  if (NULL != neighbour->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (neighbour->sc);
+    neighbour->sc = NULL;
+  }
   GNUNET_free (neighbour);
 }
 
@@ -2338,19 +2988,16 @@ free_neighbour (struct Neighbour *neighbour)
  *
  * @param tc client to inform (must be CORE client)
  * @param pid peer the connection is for
- * @param quota_out current quota for the peer
  */
 static void
 core_send_connect_info (struct TransportClient *tc,
-                        const struct GNUNET_PeerIdentity *pid,
-                        struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+                        const struct GNUNET_PeerIdentity *pid)
 {
   struct GNUNET_MQ_Envelope *env;
   struct ConnectInfoMessage *cim;
 
   GNUNET_assert (CT_CORE == tc->type);
   env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-  cim->quota_out = quota_out;
   cim->id = *pid;
   GNUNET_MQ_send (tc->mq, env);
 }
@@ -2360,16 +3007,18 @@ core_send_connect_info (struct TransportClient *tc,
  * Send message to CORE clients that we gained a connection
  *
  * @param pid peer the queue was for
- * @param quota_out current quota for the peer
  */
 static void
-cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
-                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
 {
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about connection to %s\n",
+              GNUNET_i2s (pid));
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
     if (CT_CORE != tc->type)
       continue;
-    core_send_connect_info (tc, pid, quota_out);
+    core_send_connect_info (tc, pid);
   }
 }
 
@@ -2382,7 +3031,11 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
 static void
 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
 {
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about disconnect from %s\n",
+              GNUNET_i2s (pid));
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
     struct GNUNET_MQ_Envelope *env;
     struct DisconnectInfoMessage *dim;
 
@@ -2396,10 +3049,9 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
 
 
 /**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * We believe we are ready to transmit a message on a queue. Gives the
+ * message to the communicator for transmission (updating the tracker,
+ * and re-scheduling itself if applicable).
  *
  * @param cls the `struct Queue` to process transmissions for
  */
@@ -2415,18 +3067,21 @@ transmit_on_queue (void *cls);
  * be called if the message queue is non-empty!
  *
  * @param queue the queue to do scheduling for
+ * @param inside_job set to #GNUNET_YES if called from
+ *            #transmit_on_queue() itself and NOT setting
+ *            the task means running immediately
  */
 static void
-schedule_transmit_on_queue (struct Queue *queue)
+schedule_transmit_on_queue (struct Queue *queue, int inside_job)
 {
   struct Neighbour *n = queue->neighbour;
   struct PendingMessage *pm = n->pending_msg_head;
   struct GNUNET_TIME_Relative out_delay;
-  unsigned int wsize;
 
   GNUNET_assert (NULL != pm);
-  if (queue->tc->details.communicator.total_queue_length
-      >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) {
+  if (queue->tc->details.communicator.total_queue_length >=
+      COMMUNICATOR_TOTAL_QUEUE_LIMIT)
+  {
     GNUNET_STATISTICS_update (
       GST_stats,
       "# Transmission throttled due to communicator queue limit",
@@ -2434,26 +3089,28 @@ schedule_transmit_on_queue (struct Queue *queue)
       GNUNET_NO);
     return;
   }
-  if (queue->queue_length >= QUEUE_LENGTH_LIMIT) {
-    GNUNET_STATISTICS_update (
-      GST_stats,
-      "# Transmission throttled due to queue queue limit",
-      1,
-      GNUNET_NO);
+  if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Transmission throttled due to queue queue limit",
+                              1,
+                              GNUNET_NO);
     return;
   }
 
-  wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */
-                            : queue->mtu;
-  out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize);
-  out_delay = GNUNET_TIME_relative_max (
-    GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
-    out_delay);
-  if (0 == out_delay.rel_value_us)
+  out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt);
+  if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
+  {
+    GNUNET_log (
+      GNUNET_ERROR_TYPE_DEBUG,
+      "Schedule transmission on queue %llu of %s decides to run immediately\n",
+      (unsigned long long) queue->qid,
+      GNUNET_i2s (&n->pid));
     return; /* we should run immediately! */
+  }
   /* queue has changed since we were scheduled, reschedule again */
-  queue->transmit_task
-    GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
+  queue->transmit_task =
+    GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Next transmission on queue `%s' in %s (high delay)\n",
@@ -2467,6 +3124,46 @@ schedule_transmit_on_queue (struct Queue *queue)
 }
 
 
+/**
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
+ *
+ * @param cls a `struct VirtualLink`
+ */
+static void
+check_link_down (void *cls)
+{
+  struct VirtualLink *vl = cls;
+  struct DistanceVector *dv = vl->dv;
+  struct Neighbour *n = vl->n;
+  struct GNUNET_TIME_Absolute dvh_timeout;
+  struct GNUNET_TIME_Absolute q_timeout;
+
+  vl->visibility_task = NULL;
+  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+    dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
+  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+    vl->dv = NULL;
+  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+    q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+    vl->n = NULL;
+  if ((NULL == vl->n) && (NULL == vl->dv))
+  {
+    cores_send_disconnect_info (&dv->target);
+    free_virtual_link (vl);
+    return;
+  }
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
+                             &check_link_down,
+                             vl);
+}
+
+
 /**
  * Free @a queue.
  *
@@ -2477,15 +3174,24 @@ free_queue (struct Queue *queue)
 {
   struct Neighbour *neighbour = queue->neighbour;
   struct TransportClient *tc = queue->tc;
-  struct MonitorEvent me
-    = {.cs = GNUNET_TRANSPORT_CS_DOWN, .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
+  struct MonitorEvent me = {.cs = GNUNET_TRANSPORT_CS_DOWN,
+                            .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
   struct QueueEntry *qe;
   int maxxed;
+  struct PendingAcknowledgement *pa;
+  struct VirtualLink *vl;
 
-  if (NULL != queue->transmit_task) {
+  if (NULL != queue->transmit_task)
+  {
     GNUNET_SCHEDULER_cancel (queue->transmit_task);
     queue->transmit_task = NULL;
   }
+  while (NULL != (pa = queue->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
+    pa->queue = NULL;
+  }
+
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                 neighbour->queue_head,
                                 neighbour->queue_tail,
@@ -2494,18 +3200,24 @@ free_queue (struct Queue *queue)
                                 tc->details.communicator.queue_head,
                                 tc->details.communicator.queue_tail,
                                 queue);
-  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT
-            >= tc->details.communicator.total_queue_length);
-  while (NULL != (qe = queue->queue_head)) {
+  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >=
+            tc->details.communicator.total_queue_length);
+  while (NULL != (qe = queue->queue_head))
+  {
     GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
     queue->queue_length--;
     tc->details.communicator.total_queue_length--;
+    if (NULL != qe->pm)
+    {
+      GNUNET_assert (qe == qe->pm->qe);
+      qe->pm->qe = NULL;
+    }
     GNUNET_free (qe);
   }
   GNUNET_assert (0 == queue->queue_length);
-  if ((maxxed)
-      && (COMMUNICATOR_TOTAL_QUEUE_LIMIT
-          < tc->details.communicator.total_queue_length)) {
+  if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
+                   tc->details.communicator.total_queue_length))
+  {
     /* Communicator dropped below threshold, resume all queues */
     GNUNET_STATISTICS_update (
       GST_stats,
@@ -2514,14 +3226,19 @@ free_queue (struct Queue *queue)
       GNUNET_NO);
     for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
          s = s->next_client)
-      schedule_transmit_on_queue (s);
+      schedule_transmit_on_queue (s, GNUNET_NO);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
-  GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
-  GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
   GNUNET_free (queue);
-  if (NULL == neighbour->queue_head) {
-    cores_send_disconnect_info (&neighbour->pid);
+
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+  if ((NULL != vl) && (neighbour == vl->n))
+  {
+    GNUNET_SCHEDULER_cancel (vl->visibility_task);
+    check_link_down (vl);
+  }
+  if (NULL == neighbour->queue_head)
+  {
     free_neighbour (neighbour);
   }
 }
@@ -2540,11 +3257,13 @@ free_address_list_entry (struct AddressListEntry *ale)
   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
                                tc->details.communicator.addr_tail,
                                ale);
-  if (NULL != ale->sc) {
+  if (NULL != ale->sc)
+  {
     GNUNET_PEERSTORE_store_cancel (ale->sc);
     ale->sc = NULL;
   }
-  if (NULL != ale->st) {
+  if (NULL != ale->st)
+  {
     GNUNET_SCHEDULER_cancel (ale->st);
     ale->st = NULL;
   }
@@ -2570,10 +3289,10 @@ stop_peer_request (void *cls,
 
   GNUNET_PEERSTORE_watch_cancel (pr->wc);
   GNUNET_assert (
-    GNUNET_YES
-    == GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
-                                             pid,
-                                             pr));
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
+                                          pid,
+                                          pr));
   GNUNET_free (pr);
 
   return GNUNET_OK;
@@ -2595,25 +3314,29 @@ client_disconnect_cb (void *cls,
 {
   struct TransportClient *tc = app_ctx;
 
-  (void)cls;
+  (void) cls;
+  (void) client;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client %p disconnected, cleaning up.\n",
               tc);
   GNUNET_CONTAINER_DLL_remove (clients_head, clients_tail, tc);
-  switch (tc->type) {
+  switch (tc->type)
+  {
   case CT_NONE:
     break;
   case CT_CORE: {
     struct PendingMessage *pm;
 
-    while (NULL != (pm = tc->details.core.pending_msg_head)) {
+    while (NULL != (pm = tc->details.core.pending_msg_head))
+    {
       GNUNET_CONTAINER_MDLL_remove (client,
                                     tc->details.core.pending_msg_head,
                                     tc->details.core.pending_msg_tail,
                                     pm);
       pm->client = NULL;
     }
-  } break;
+  }
+  break;
   case CT_MONITOR:
     break;
   case CT_COMMUNICATOR: {
@@ -2625,7 +3348,8 @@ client_disconnect_cb (void *cls,
     while (NULL != (ale = tc->details.communicator.addr_head))
       free_address_list_entry (ale);
     GNUNET_free (tc->details.communicator.address_prefix);
-  } break;
+  }
+  break;
   case CT_APPLICATION:
     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
                                            &stop_peer_request,
@@ -2652,9 +3376,12 @@ notify_client_connect_info (void *cls,
                             void *value)
 {
   struct TransportClient *tc = cls;
-  struct Neighbour *neighbour = value;
 
-  core_send_connect_info (tc, pid, neighbour->quota_out);
+  (void) value;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Telling new CORE client about existing connection to %s\n",
+              GNUNET_i2s (pid));
+  core_send_connect_info (tc, pid);
   return GNUNET_OK;
 }
 
@@ -2674,19 +3401,24 @@ handle_client_start (void *cls, const struct StartMessage *start)
   uint32_t options;
 
   options = ntohl (start->options);
-  if ((0 != (1 & options))
-      && (0 != GNUNET_memcmp (&start->self, &GST_my_identity))) {
+  if ((0 != (1 & options)) &&
+      (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
+  {
     /* client thinks this is a different peer, reject */
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
-  if (CT_NONE != tc->type) {
+  if (CT_NONE != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
   tc->type = CT_CORE;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "New CORE client with PID %s registered\n",
+              GNUNET_i2s (&start->self));
   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
                                          &notify_client_connect_info,
                                          tc);
@@ -2707,17 +3439,20 @@ check_client_send (void *cls, const struct OutboundMessage *obm)
   uint16_t size;
   const struct GNUNET_MessageHeader *obmm;
 
-  if (CT_CORE != tc->type) {
+  if (CT_CORE != tc->type)
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
-  if (size < sizeof (struct GNUNET_MessageHeader)) {
+  if (size < sizeof (struct GNUNET_MessageHeader))
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  obmm = (const struct GNUNET_MessageHeader *)&obm[1];
-  if (size != ntohs (obmm->size)) {
+  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+  if (size != ntohs (obmm->size))
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -2727,6 +3462,9 @@ check_client_send (void *cls, const struct OutboundMessage *obm)
 
 /**
  * Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
  *
  * @param root root of the tree to free
  */
@@ -2735,8 +3473,16 @@ free_fragment_tree (struct PendingMessage *root)
 {
   struct PendingMessage *frag;
 
-  while (NULL != (frag = root->head_frag)) {
+  while (NULL != (frag = root->head_frag))
+  {
+    struct PendingAcknowledgement *pa;
+
     free_fragment_tree (frag);
+    while (NULL != (pa = frag->pa_head))
+    {
+      GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+      pa->pm = NULL;
+    }
     GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
     GNUNET_free (frag);
   }
@@ -2755,51 +3501,66 @@ free_pending_message (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
+  struct DistanceVectorHop *dvh = pm->dvh;
+  struct PendingAcknowledgement *pa;
 
-  if (NULL != tc) {
+  if (NULL != tc)
+  {
     GNUNET_CONTAINER_MDLL_remove (client,
                                   tc->details.core.pending_msg_head,
                                   tc->details.core.pending_msg_tail,
                                   pm);
   }
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh,
+                                  dvh->pending_msg_head,
+                                  dvh->pending_msg_tail,
+                                  pm);
+  }
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                 target->pending_msg_head,
                                 target->pending_msg_tail,
                                 pm);
+  while (NULL != (pa = pm->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+
   free_fragment_tree (pm);
+  if (NULL != pm->qe)
+  {
+    GNUNET_assert (pm == pm->qe->pm);
+    pm->qe->pm = NULL;
+  }
   GNUNET_free_non_null (pm->bpm);
   GNUNET_free (pm);
 }
 
 
 /**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request.  Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
  *
  * @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- *          for transmission failure
- * @param bytes_physical amount of bandwidth consumed
  */
 static void
-client_send_response (struct PendingMessage *pm,
-                      int success,
-                      uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
   struct GNUNET_MQ_Envelope *env;
   struct SendOkMessage *som;
 
-  if (NULL != tc) {
+  if (NULL != tc)
+  {
     env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->success = htonl ((uint32_t)success);
-    som->bytes_msg = htons (pm->bytes_msg);
-    som->bytes_physical = htonl (bytes_physical);
     som->peer = target->pid;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Confirming transmission to %s\n",
+                GNUNET_i2s (&pm->target->pid));
     GNUNET_MQ_send (tc->mq, env);
   }
   free_pending_message (pm);
@@ -2807,41 +3568,147 @@ client_send_response (struct PendingMessage *pm,
 
 
 /**
- * Checks the message queue for a neighbour for messages that have timed
- * out and purges them.
+ * Create a DV Box message.
+ *
+ * @param total_hops how many hops did the message take so far
+ * @param num_hops length of the @a hops array
+ * @param origin origin of the message
+ * @param hops next peer(s) to the destination, including destination
+ * @param payload payload of the box
+ * @param payload_size number of bytes in @a payload
+ * @return boxed message (caller must #GNUNET_free() it).
+ */
+static struct TransportDVBoxMessage *
+create_dv_box (uint16_t total_hops,
+               const struct GNUNET_PeerIdentity *origin,
+               const struct GNUNET_PeerIdentity *target,
+               uint16_t num_hops,
+               const struct GNUNET_PeerIdentity *hops,
+               const void *payload,
+               uint16_t payload_size)
+{
+  struct TransportDVBoxMessage *dvb;
+  struct GNUNET_PeerIdentity *dhops;
+
+  GNUNET_assert (UINT16_MAX <
+                 sizeof (struct TransportDVBoxMessage) +
+                   sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
+                   payload_size);
+  dvb = GNUNET_malloc (sizeof (struct TransportDVBoxMessage) +
+                       sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
+                       payload_size);
+  dvb->header.size =
+    htons (sizeof (struct TransportDVBoxMessage) +
+           sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) + payload_size);
+  dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
+  dvb->total_hops = htons (total_hops);
+  dvb->num_hops = htons (num_hops + 1);
+  dvb->origin = *origin;
+  dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
+  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
+  dhops[num_hops] = *target;
+  memcpy (&dhops[num_hops + 1], payload, payload_size);
+
+  if (GNUNET_EXTRA_LOGGING > 0)
+  {
+    char *path;
+
+    path = GNUNET_strdup (GNUNET_i2s (&dvb->origin));
+    for (unsigned int i = 0; i <= num_hops; i++)
+    {
+      char *tmp;
+
+      GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
+      GNUNET_free (path);
+      path = tmp;
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Creating DVBox for %u bytes of payload via %s\n",
+                (unsigned int) payload_size,
+                path);
+    GNUNET_free (path);
+  }
+
+  return dvb;
+}
+
+
+/**
+ * Pick @a hops_array_length random DV paths satisfying @a options
  *
- * @param cls a `struct Neighbour`
+ * @param dv data structure to pick paths from
+ * @param options constraints to satisfy
+ * @param hops_array[out] set to the result
+ * @param hops_array_length length of the @a hops_array
+ * @return number of entries set in @a hops_array
  */
-static void
-check_queue_timeouts (void *cls)
+static unsigned int
+pick_random_dv_hops (const struct DistanceVector *dv,
+                     enum RouteMessageOptions options,
+                     struct DistanceVectorHop **hops_array,
+                     unsigned int hops_array_length)
 {
-  struct Neighbour *n = cls;
-  struct PendingMessage *pm;
-  struct GNUNET_TIME_Absolute now;
-  struct GNUNET_TIME_Absolute earliest_timeout;
+  uint64_t choices[hops_array_length];
+  uint64_t num_dv;
+  unsigned int dv_count;
 
-  n->timeout_task = NULL;
-  earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
-  now = GNUNET_TIME_absolute_get ();
-  for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos;
-       pos = pm) {
-    pm = pos->next_neighbour;
-    if (pos->timeout.abs_value_us <= now.abs_value_us) {
-      GNUNET_STATISTICS_update (
-        GST_stats,
-        "# messages dropped (timeout before confirmation)",
-        1,
-        GNUNET_NO);
-      client_send_response (pm, GNUNET_NO, 0);
-      continue;
+  /* Pick random vectors, but weighted by distance, giving more weight
+     to shorter vectors */
+  num_dv = 0;
+  dv_count = 0;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+  {
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
+        (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
+           .rel_value_us == 0))
+      continue; /* pos unconfirmed and confirmed required */
+    num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
+    dv_count++;
+  }
+  if (0 == dv_count)
+    return 0;
+  if (dv_count <= hops_array_length)
+  {
+    dv_count = 0;
+    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+         pos = pos->next_dv)
+      hops_array[dv_count++] = pos;
+    return dv_count;
+  }
+  for (unsigned int i = 0; i < hops_array_length; i++)
+  {
+    int ok = GNUNET_NO;
+    while (GNUNET_NO == ok)
+    {
+      choices[i] =
+        GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
+      ok = GNUNET_YES;
+      for (unsigned int j = 0; j < i; j++)
+        if (choices[i] == choices[j])
+        {
+          ok = GNUNET_NO;
+          break;
+        }
     }
-    earliest_timeout
-      = GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
   }
-  n->earliest_timeout = earliest_timeout;
-  if (NULL != n->pending_msg_head)
-    n->timeout_task
-      = GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
+  dv_count = 0;
+  num_dv = 0;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+  {
+    uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
+
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
+        (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
+           .rel_value_us == 0))
+      continue; /* pos unconfirmed and confirmed required */
+    for (unsigned int i = 0; i < hops_array_length; i++)
+      if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
+        hops_array[dv_count++] = pos;
+    num_dv += delta;
+  }
+  return dv_count;
 }
 
 
@@ -2858,27 +3725,25 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
   struct PendingMessage *pm;
   const struct GNUNET_MessageHeader *obmm;
   struct Neighbour *target;
+  struct DistanceVector *dv;
+  struct DistanceVectorHop *dvh;
   uint32_t bytes_msg;
   int was_empty;
+  const void *payload;
+  size_t payload_size;
+  struct TransportDVBoxMessage *dvb;
+  struct VirtualLink *vl;
 
   GNUNET_assert (CT_CORE == tc->type);
-  obmm = (const struct GNUNET_MessageHeader *)&obm[1];
+  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
   bytes_msg = ntohs (obmm->size);
-  target = lookup_neighbour (&obm->peer);
-  if (NULL == target) {
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+  if (NULL == vl)
+  {
     /* Failure: don't have this peer as a neighbour (anymore).
        Might have gone down asynchronously, so this is NOT
        a protocol violation by CORE. Still count the event,
        as this should be rare. */
-    struct GNUNET_MQ_Envelope *env;
-    struct SendOkMessage *som;
-
-    env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->success = htonl (GNUNET_SYSERR);
-    som->bytes_msg = htonl (bytes_msg);
-    som->bytes_physical = htonl (0);
-    som->peer = obm->peer;
-    GNUNET_MQ_send (tc->mq, env);
     GNUNET_SERVICE_client_continue (tc->client);
     GNUNET_STATISTICS_update (GST_stats,
                               "# messages dropped (neighbour unknown)",
@@ -2886,14 +3751,59 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                               GNUNET_NO);
     return;
   }
+  target = lookup_neighbour (&obm->peer);
+  if (NULL == target)
+    dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
+  else
+    dv = NULL;
+  GNUNET_assert ((NULL != target) || (NULL != dv));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending %u bytes to %s using %s\n",
+              bytes_msg,
+              GNUNET_i2s (&obm->peer),
+              (NULL == target) ? "distance vector path" : "direct queue");
+  if (NULL == target)
+  {
+    unsigned int res;
+    struct DistanceVectorHop *dvh;
+
+    res = pick_random_dv_hops (dv, RMO_NONE, &dvh, 1);
+    GNUNET_assert (1 == res);
+    target = dvh->next_hop;
+    dvb = create_dv_box (0,
+                         &GST_my_identity,
+                         &obm->peer,
+                         dvh->distance,
+                         dvh->path,
+                         &obm[1],
+                         bytes_msg);
+    payload = dvb;
+    payload_size = ntohs (dvb->header.size);
+  }
+  else
+  {
+    dvh = NULL;
+    dvb = NULL;
+    payload = &obm[1];
+    payload_size = bytes_msg;
+  }
+
   was_empty = (NULL == target->pending_msg_head);
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + payload_size);
   pm->client = tc;
   pm->target = target;
-  pm->bytes_msg = bytes_msg;
-  pm->timeout = GNUNET_TIME_relative_to_absolute (
-    GNUNET_TIME_relative_ntoh (obm->timeout));
-  memcpy (&pm[1], &obm[1], bytes_msg);
+  pm->bytes_msg = payload_size;
+  memcpy (&pm[1], payload, payload_size);
+  GNUNET_free_non_null (dvb);
+  dvb = NULL;
+  pm->dvh = dvh;
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_insert (dvh,
+                                  dvh->pending_msg_head,
+                                  dvh->pending_msg_tail,
+                                  pm);
+  }
   GNUNET_CONTAINER_MDLL_insert (neighbour,
                                 target->pending_msg_head,
                                 target->pending_msg_tail,
@@ -2902,22 +3812,21 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                                 tc->details.core.pending_msg_head,
                                 tc->details.core.pending_msg_tail,
                                 pm);
-  if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) {
-    target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
-    if (NULL != target->timeout_task)
-      GNUNET_SCHEDULER_cancel (target->timeout_task);
-    target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
-                                                    &check_queue_timeouts,
-                                                    target);
-  }
   if (! was_empty)
     return; /* all queues must already be busy */
   for (struct Queue *queue = target->queue_head; NULL != queue;
-       queue = queue->next_neighbour) {
+       queue = queue->next_neighbour)
+  {
     /* try transmission on any queue that is idle */
     if (NULL == queue->transmit_task)
-      queue->transmit_task
-        = GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Queue %llu to %s is idle, triggering transmission\n",
+                  (unsigned long long) queue->qid,
+                  GNUNET_i2s (&queue->neighbour->pid));
+      queue->transmit_task =
+        GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
+    }
   }
 }
 
@@ -2936,7 +3845,8 @@ check_communicator_available (
   struct TransportClient *tc = cls;
   uint16_t size;
 
-  if (CT_NONE != tc->type) {
+  if (CT_NONE != tc->type)
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -2950,14 +3860,85 @@ check_communicator_available (
 
 
 /**
- * Communicator started.  Process the request.
+ * Send ACK to communicator (if requested) and free @a cmc.
  *
- * @param cls the client
- * @param cam the send message that was sent
+ * @param cmc context for which we are done handling the message
  */
 static void
-handle_communicator_available (
-  void *cls,
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+  if (0 != ntohl (cmc->im.fc_on))
+  {
+    /* send ACK when done to communicator for flow control! */
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+    ack->reserved = htonl (0);
+    ack->fc_id = cmc->im.fc_id;
+    ack->sender = cmc->im.sender;
+    GNUNET_MQ_send (cmc->tc->mq, env);
+  }
+  GNUNET_SERVICE_client_continue (cmc->tc->client);
+  GNUNET_free (cmc);
+}
+
+
+/**
+ * Client confirms that it is done handling message(s) to a particular
+ * peer. We may now provide more messages to CORE for this peer.
+ *
+ * Notifies the respective queues that more messages can now be received.
+ *
+ * @param cls the client
+ * @param rom the message that was sent
+ */
+static void
+handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
+{
+  struct TransportClient *tc = cls;
+  struct VirtualLink *vl;
+  uint32_t delta;
+  struct CommunicatorMessageContext *cmc;
+
+  if (CT_CORE != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+  if (NULL == vl)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# RECV_OK dropped: virtual link unknown",
+                              1,
+                              GNUNET_NO);
+    GNUNET_SERVICE_client_continue (tc->client);
+    return;
+  }
+  delta = ntohl (rom->increase_window_delta);
+  vl->core_recv_window += delta;
+  if (vl->core_recv_window <= 0)
+    return;
+  /* resume communicators */
+  while (NULL != (cmc = vl->cmc_tail))
+  {
+    GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+    finish_cmc_handling (cmc);
+  }
+}
+
+
+/**
+ * Communicator started.  Process the request.
+ *
+ * @param cls the client
+ * @param cam the send message that was sent
+ */
+static void
+handle_communicator_available (
+  void *cls,
   const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
 {
   struct TransportClient *tc = cls;
@@ -2965,11 +3946,18 @@ handle_communicator_available (
 
   size = ntohs (cam->header.size) - sizeof (*cam);
   if (0 == size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Receive-only communicator connected\n");
     return; /* receive-only communicator */
-  tc->details.communicator.address_prefix
-    = GNUNET_strdup ((const char *)&cam[1]);
-  tc->details.communicator.cc
-    = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics)ntohl (cam->cc);
+  }
+  tc->details.communicator.address_prefix =
+    GNUNET_strdup ((const char *) &cam[1]);
+  tc->details.communicator.cc =
+    (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Communicator with prefix `%s' connected\n",
+              tc->details.communicator.address_prefix);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -2991,25 +3979,28 @@ check_communicator_backchannel (
   uint16_t msize;
   uint16_t isize;
 
-  (void)cls;
+  (void) cls;
   msize = ntohs (cb->header.size) - sizeof (*cb);
-  if (UINT16_MAX - msize
-      > sizeof (struct TransportBackchannelEncapsulationMessage)
-          + sizeof (struct TransportBackchannelRequestPayload)) {
+  if (((size_t) (UINT16_MAX - msize)) >
+      sizeof (struct TransportBackchannelEncapsulationMessage) +
+        sizeof (struct TransportBackchannelRequestPayloadP))
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  inbox = (const struct GNUNET_MessageHeader *)&cb[1];
+  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
   isize = ntohs (inbox->size);
-  if (isize >= msize) {
+  if (isize >= msize)
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  is = (const char *)inbox;
+  is = (const char *) inbox;
   is += isize;
   msize -= isize;
   GNUNET_assert (msize > 0);
-  if ('\0' != is[msize - 1]) {
+  if ('\0' != is[msize - 1])
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -3027,12 +4018,13 @@ expire_ephemerals (void *cls)
 {
   struct EphemeralCacheEntry *ece;
 
-  (void)cls;
+  (void) cls;
   ephemeral_task = NULL;
-  while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap))) {
-    if (0
-        == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
-             .rel_value_us) {
+  while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
+  {
+    if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
+               .rel_value_us)
+    {
       free_ephemeral (ece);
       continue;
     }
@@ -3045,56 +4037,56 @@ expire_ephemerals (void *cls)
 
 
 /**
- * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
- * one, cache it and return it.
+ * Lookup ephemeral key in our #ephemeral_map. If no valid one exists,
+ * generate one, cache it and return it.
  *
  * @param pid peer to look up ephemeral for
  * @param private_key[out] set to the private key
  * @param ephemeral_key[out] set to the key
  * @param ephemeral_sender_sig[out] set to the signature
- * @param ephemeral_validity[out] set to the validity expiration time
+ * @param monotime[out] set to the monotime used for the signature
  */
 static void
 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
-                  struct GNUNET_TIME_Absolute *ephemeral_validity)
+                  struct GNUNET_TIME_Absolute *monotime)
 {
   struct EphemeralCacheEntry *ece;
-  struct EphemeralConfirmation ec;
+  struct EphemeralConfirmationPS ec;
 
   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map, pid);
-  if ((NULL != ece)
-      && (0
-          == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
-               .rel_value_us)) {
+  if ((NULL != ece) &&
+      (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
+              .rel_value_us))
+  {
     free_ephemeral (ece);
     ece = NULL;
   }
-  if (NULL == ece) {
+  if (NULL == ece)
+  {
     ece = GNUNET_new (struct EphemeralCacheEntry);
     ece->target = *pid;
-    ece->ephemeral_validity
-      = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
-                                  EPHEMERAL_VALIDITY);
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
+    ece->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+    ece->ephemeral_validity =
+      GNUNET_TIME_absolute_add (ece->monotime, EPHEMERAL_VALIDITY);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, &ece->ephemeral_key);
     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
     ec.purpose.size = htonl (sizeof (ec));
     ec.target = *pid;
     ec.ephemeral_key = ece->ephemeral_key;
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                                &ec.purpose,
-                                                &ece->sender_sig));
-    ece->hn
-      = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
-                                      ece,
-                                      ece->ephemeral_validity.abs_value_us);
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CONTAINER_multipeermap_put (
+    GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                                          &ec.purpose,
+                                                          &ece->sender_sig));
+    ece->hn =
+      GNUNET_CONTAINER_heap_insert (ephemeral_heap,
+                                    ece,
+                                    ece->ephemeral_validity.abs_value_us);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multipeermap_put (
                      ephemeral_map,
                      &ece->target,
                      ece,
@@ -3107,32 +4099,240 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
   *private_key = ece->private_key;
   *ephemeral_key = ece->ephemeral_key;
   *ephemeral_sender_sig = ece->sender_sig;
-  *ephemeral_validity = ece->ephemeral_validity;
+  *monotime = ece->monotime;
+}
+
+
+/**
+ * Send the control message @a payload on @a queue.
+ *
+ * @param queue the queue to use for transmission
+ * @param pm pending message to update once transmission is done, may be NULL!
+ * @param payload the payload to send (encapsulated in a
+ *        #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
+ * @param payload_size number of bytes in @a payload
+ */
+static void
+queue_send_msg (struct Queue *queue,
+                struct PendingMessage *pm,
+                const void *payload,
+                size_t payload_size)
+{
+  struct Neighbour *n = queue->neighbour;
+  struct GNUNET_TRANSPORT_SendMessageTo *smt;
+  struct GNUNET_MQ_Envelope *env;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Queueing %u bytes of payload for transmission on queue %llu to %s\n",
+              (unsigned int) payload_size,
+              (unsigned long long) queue->qid,
+              GNUNET_i2s (&queue->neighbour->pid));
+  env = GNUNET_MQ_msg_extra (smt,
+                             payload_size,
+                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
+  smt->qid = queue->qid;
+  smt->mid = queue->mid_gen;
+  smt->receiver = n->pid;
+  memcpy (&smt[1], payload, payload_size);
+  {
+    /* Pass the env to the communicator of queue for transmission. */
+    struct QueueEntry *qe;
+
+    qe = GNUNET_new (struct QueueEntry);
+    qe->mid = queue->mid_gen++;
+    qe->queue = queue;
+    if (NULL != pm)
+    {
+      qe->pm = pm;
+      GNUNET_assert (NULL == pm->qe);
+      pm->qe = qe;
+    }
+    GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
+    GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
+    queue->queue_length++;
+    queue->tc->details.communicator.total_queue_length++;
+    GNUNET_MQ_send (queue->tc->mq, env);
+  }
+}
+
+
+// FIXME: improve logging after this point!
+
+/**
+ * Pick a queue of @a n under constraints @a options and schedule
+ * transmission of @a hdr.
+ *
+ * @param n neighbour to send to
+ * @param hdr message to send as payload
+ * @param options whether queues must be confirmed or not,
+ *        and whether we may pick multiple (2) queues
+ */
+static void
+route_via_neighbour (const struct Neighbour *n,
+                     const struct GNUNET_MessageHeader *hdr,
+                     enum RouteMessageOptions options)
+{
+  struct GNUNET_TIME_Absolute now;
+  unsigned int candidates;
+  unsigned int sel1;
+  unsigned int sel2;
+
+  /* Pick one or two 'random' queues from n (under constraints of options) */
+  now = GNUNET_TIME_absolute_get ();
+  /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
+     weight in the future; weight could be assigned by observed
+     bandwidth (note: not sure if we should do this for this type
+     of control traffic though). */
+  candidates = 0;
+  for (struct Queue *pos = n->queue_head; NULL != pos;
+       pos = pos->next_neighbour)
+  {
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+        (pos->validated_until.abs_value_us > now.abs_value_us))
+      candidates++;
+  }
+  if (0 == candidates)
+  {
+    /* This can happen rarely if the last confirmed queue timed
+       out just as we were beginning to process this message. */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# route selection failed (all no valid queue)",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+  sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
+  if (0 == (options & RMO_REDUNDANT))
+    sel2 = candidates; /* picks none! */
+  else
+    sel2 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
+  candidates = 0;
+  for (struct Queue *pos = n->queue_head; NULL != pos;
+       pos = pos->next_neighbour)
+  {
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+        (pos->validated_until.abs_value_us > now.abs_value_us))
+    {
+      if ((sel1 == candidates) || (sel2 == candidates))
+        queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
+      candidates++;
+    }
+  }
+}
+
+
+/**
+ * Given a distance vector path @a dvh route @a payload to
+ * the ultimate destination respecting @a options.
+ * Sets up the boxed message and queues it at the next hop.
+ *
+ * @param dvh choice of the path for the message
+ * @param payload body to transmit
+ * @param options options to use for control
+ */
+static void
+forward_via_dvh (const struct DistanceVectorHop *dvh,
+                 const struct GNUNET_MessageHeader *payload,
+                 enum RouteMessageOptions options)
+{
+  struct TransportDVBoxMessage *dvb;
+
+  dvb = create_dv_box (0,
+                       &GST_my_identity,
+                       &dvh->dv->target,
+                       dvh->distance,
+                       dvh->path,
+                       payload,
+                       ntohs (payload->size));
+  route_via_neighbour (dvh->next_hop, &dvb->header, options);
+  GNUNET_free (dvb);
+}
+
+
+/**
+ * Pick a path of @a dv under constraints @a options and schedule
+ * transmission of @a hdr.
+ *
+ * @param n neighbour to send to
+ * @param hdr message to send as payload
+ * @param options whether path must be confirmed or not
+ *        and whether we may pick multiple (2) paths
+ */
+static void
+route_via_dv (const struct DistanceVector *dv,
+              const struct GNUNET_MessageHeader *hdr,
+              enum RouteMessageOptions options)
+{
+  struct DistanceVectorHop *hops[2];
+  unsigned int res;
+
+  res = pick_random_dv_hops (dv,
+                             options,
+                             hops,
+                             (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
+  for (unsigned int i = 0; i < res; i++)
+    forward_via_dvh (hops[i], hdr, options & (~RMO_REDUNDANT));
 }
 
 
 /**
  * We need to transmit @a hdr to @a target.  If necessary, this may
- * involve DV routing or even broadcasting and fragmentation.
+ * involve DV routing.
  *
  * @param target peer to receive @a hdr
- * @param hdr header of the message to route
+ * @param hdr header of the message to route and #GNUNET_free()
+ * @param options which transmission channels are allowed
  */
 static void
 route_message (const struct GNUNET_PeerIdentity *target,
-               struct GNUNET_MessageHeader *hdr)
-{
-  // FIXME: this one is tricky:
-  // - we could try a direct, reliable channel
-  // - if that is unavailable / for load balancing, we may try:
-  //   * multiple (?) direct unreliable channels - depending on loss rate?
-  //   * some (?) DV channels - if above unavailable / too lossy?
-  //   * _random_ other peers ("broadcasting") in hope of *discovering*
-  //      a path back! - if all else fails
-  // => need more on DV first!
-
-  // FIXME: send hdr to target, free hdr (possibly using DV, possibly
-  // broadcasting)
+               struct GNUNET_MessageHeader *hdr,
+               enum RouteMessageOptions options)
+{
+  struct VirtualLink *vl;
+  struct Neighbour *n;
+  struct DistanceVector *dv;
+
+  vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+  n = vl->n;
+  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
+  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
+  {
+    /* if confirmed is required, and we do not have anything
+       confirmed, drop respective options */
+    if (NULL == n)
+      n = lookup_neighbour (target);
+    if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+      dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
+  }
+  if ((NULL == n) && (NULL == dv))
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Messages dropped in routing: no acceptable method",
+                              1,
+                              GNUNET_NO);
+    GNUNET_free (hdr);
+    return;
+  }
+  /* If both dv and n are possible and we must choose:
+     flip a coin for the choice between the two; for now 50/50 */
+  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
+  {
+    if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 2))
+      n = NULL;
+    else
+      dv = NULL;
+  }
+  if ((NULL != n) && (NULL != dv))
+    options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
+                                  enough for redunancy, so clear the flag. */
+  if (NULL != n)
+  {
+    route_via_neighbour (n, hdr, options);
+  }
+  if (NULL != dv)
+  {
+    route_via_dv (dv, hdr, options);
+  }
   GNUNET_free (hdr);
 }
 
@@ -3142,39 +4342,69 @@ route_message (const struct GNUNET_PeerIdentity *target,
  */
 struct BackchannelKeyState
 {
-  // FIXME: actual data types in this struct are likely still totally wrong
   /**
-   *
+   * State of our block cipher.
    */
-  char hdr_key[128];
+  gcry_cipher_hd_t cipher;
 
   /**
-   *
+   * Actual key material.
    */
-  char body_key[128];
+  struct
+  {
 
-  /**
-   *
-   */
-  char hmac_key[128];
+    /**
+     * Key used for HMAC calculations (via #GNUNET_CRYPTO_hmac()).
+     */
+    struct GNUNET_CRYPTO_AuthKey hmac_key;
+
+    /**
+     * Symmetric key to use for encryption.
+     */
+    char aes_key[256 / 8];
+
+    /**
+     * Counter value to use during setup.
+     */
+    char aes_ctr[128 / 8];
+
+  } material;
 };
 
 
+/**
+ * Given the key material in @a km and the initialization vector
+ * @a iv, setup the key material for the backchannel in @a key.
+ *
+ * @param km raw master secret
+ * @param iv initialization vector
+ * @param key[out] symmetric cipher and HMAC state to generate
+ */
 static void
 bc_setup_key_state_from_km (const struct GNUNET_HashCode *km,
-                            const struct GNUNET_HashCode *iv,
+                            const struct GNUNET_ShortHashCode *iv,
                             struct BackchannelKeyState *key)
 {
   /* must match #dh_key_derive_eph_pub */
-  GNUNET_assert (GNUNET_YES
-                 == GNUNET_CRYPTO_kdf (key,
-                                       sizeof (*key),
-                                       "transport-backchannel-key",
-                                       strlen ("transport-backchannel-key"),
-                                       &km,
-                                       sizeof (km),
-                                       iv,
-                                       sizeof (*iv)));
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CRYPTO_kdf (&key->material,
+                                    sizeof (key->material),
+                                    "transport-backchannel-key",
+                                    strlen ("transport-backchannel-key"),
+                                    &km,
+                                    sizeof (km),
+                                    iv,
+                                    sizeof (*iv)));
+  gcry_cipher_open (&key->cipher,
+                    GCRY_CIPHER_AES256 /* low level: go for speed */,
+                    GCRY_CIPHER_MODE_CTR,
+                    0 /* flags */);
+  gcry_cipher_setkey (key->cipher,
+                      &key->material.aes_key,
+                      sizeof (key->material.aes_key));
+  gcry_cipher_setctr (key->cipher,
+                      &key->material.aes_ctr,
+                      sizeof (key->material.aes_ctr));
 }
 
 
@@ -3196,9 +4426,9 @@ dh_key_derive_eph_pid (
 {
   struct GNUNET_HashCode km;
 
-  GNUNET_assert (
-    GNUNET_YES
-    == GNUNET_CRYPTO_ecdsa_ecdh (priv_ephemeral, &target->public_key, &km));
+  GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_ecdh_eddsa (priv_ephemeral,
+                                                         &target->public_key,
+                                                         &km));
   bc_setup_key_state_from_km (&km, iv, key);
 }
 
@@ -3219,9 +4449,9 @@ dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
 {
   struct GNUNET_HashCode km;
 
-  GNUNET_assert (
-    GNUNET_YES
-    == GNUNET_CRYPTO_ecdsa_ecdh (GST_my_private_key, pub_ephemeral, &km));
+  GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_eddsa_ecdh (GST_my_private_key,
+                                                         pub_ephemeral,
+                                                         &km));
   bc_setup_key_state_from_km (&km, iv, key);
 }
 
@@ -3241,7 +4471,7 @@ bc_hmac (const struct BackchannelKeyState *key,
          const void *data,
          size_t data_size)
 {
-  // FIXME!
+  GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
 }
 
 
@@ -3256,11 +4486,12 @@ bc_hmac (const struct BackchannelKeyState *key,
  */
 static void
 bc_encrypt (struct BackchannelKeyState *key,
-            void *dst,
             const void *in,
+            void *dst,
             size_t in_size)
 {
-  // FIXME!
+  GNUNET_assert (0 ==
+                 gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
 }
 
 
@@ -3275,11 +4506,12 @@ bc_encrypt (struct BackchannelKeyState *key,
  */
 static void
 bc_decrypt (struct BackchannelKeyState *key,
-            const void *ciph,
             void *out,
+            const void *ciph,
             size_t out_size)
 {
-  // FIXME!
+  GNUNET_assert (
+    0 == gcry_cipher_decrypt (key->cipher, out, out_size, ciph, out_size));
 }
 
 
@@ -3291,7 +4523,8 @@ bc_decrypt (struct BackchannelKeyState *key,
 static void
 bc_key_clean (struct BackchannelKeyState *key)
 {
-  // FIXME!
+  gcry_cipher_close (key->cipher);
+  GNUNET_CRYPTO_zero_keys (&key->material, sizeof (key->material));
 }
 
 
@@ -3308,34 +4541,32 @@ handle_communicator_backchannel (
 {
   struct TransportClient *tc = cls;
   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
-  struct GNUNET_TIME_Absolute ephemeral_validity;
+  struct GNUNET_TIME_Absolute monotime;
   struct TransportBackchannelEncapsulationMessage *enc;
-  struct TransportBackchannelRequestPayload ppay;
+  struct TransportBackchannelRequestPayloadP ppay;
   struct BackchannelKeyState key;
   char *mpos;
   uint16_t msize;
 
   /* encapsulate and encrypt message */
-  msize = ntohs (cb->header.size) - sizeof (*cb)
-          + sizeof (struct TransportBackchannelRequestPayload);
+  msize = ntohs (cb->header.size) - sizeof (*cb) +
+          sizeof (struct TransportBackchannelRequestPayloadP);
   enc = GNUNET_malloc (sizeof (*enc) + msize);
-  enc->header.type
-    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
+  enc->header.type =
+    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
   enc->header.size = htons (sizeof (*enc) + msize);
   enc->target = cb->pid;
   lookup_ephemeral (&cb->pid,
                     &private_key,
                     &enc->ephemeral_key,
                     &ppay.sender_sig,
-                    &ephemeral_validity);
+                    &monotime);
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &enc->iv,
                               sizeof (enc->iv));
   dh_key_derive_eph_pid (&private_key, &cb->pid, &enc->iv, &key);
-  ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
-  ppay.monotonic_time
-    = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
-  mpos = (char *)&enc[1];
+  ppay.monotonic_time = GNUNET_TIME_absolute_hton (monotime);
+  mpos = (char *) &enc[1];
   bc_encrypt (&key, &ppay, mpos, sizeof (ppay));
   bc_encrypt (&key,
               &cb[1],
@@ -3346,7 +4577,7 @@ handle_communicator_backchannel (
            mpos,
            sizeof (ppay) + ntohs (cb->header.size) - sizeof (*cb));
   bc_key_clean (&key);
-  route_message (&cb->pid, &enc->header);
+  route_message (&cb->pid, &enc->header, RMO_DV_ALLOWED);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -3364,7 +4595,8 @@ check_add_address (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -3400,10 +4632,11 @@ peerstore_store_own_cb (void *cls, int success)
                 ale->address);
   /* refresh period is 1/4 of expiration time, that should be plenty
      without being excessive. */
-  ale->st = GNUNET_SCHEDULER_add_delayed (
-    GNUNET_TIME_relative_divide (ale->expiration, 4ULL),
-    &store_pi,
-    ale);
+  ale->st =
+    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
+                                                               4ULL),
+                                  &store_pi,
+                                  ale);
 }
 
 
@@ -3439,12 +4672,13 @@ store_pi (void *cls)
                                     &peerstore_store_own_cb,
                                     ale);
   GNUNET_free (addr);
-  if (NULL == ale->sc) {
+  if (NULL == ale->sc)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Failed to store our address `%s' with peerstore\n",
                 ale->address);
-    ale->st
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &store_pi, ale);
+    ale->st =
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &store_pi, ale);
   }
 }
 
@@ -3466,10 +4700,10 @@ handle_add_address (void *cls,
   slen = ntohs (aam->header.size) - sizeof (*aam);
   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
   ale->tc = tc;
-  ale->address = (const char *)&ale[1];
+  ale->address = (const char *) &ale[1];
   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
   ale->aid = aam->aid;
-  ale->nt = (enum GNUNET_NetworkType)ntohl (aam->nt);
+  ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
   memcpy (&ale[1], &aam[1], slen);
   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
                                tc->details.communicator.addr_tail,
@@ -3491,14 +4725,16 @@ handle_del_address (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
        NULL != ale;
-       ale = ale->next) {
+       ale = ale->next)
+  {
     if (dam->aid != ale->aid)
       continue;
     GNUNET_assert (ale->tc == tc);
@@ -3510,30 +4746,6 @@ handle_del_address (void *cls,
 }
 
 
-/**
- * Context from #handle_incoming_msg().  Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
-  /**
-   * Which communicator provided us with the message.
-   */
-  struct TransportClient *tc;
-
-  /**
-   * Additional information for flow control and about the sender.
-   */
-  struct GNUNET_TRANSPORT_IncomingMessage im;
-
-  /**
-   * Number of hops the message has travelled (if DV-routed).
-   * FIXME: make use of this in ACK handling!
-   */
-  uint16_t total_hops;
-};
-
-
 /**
  * Given an inbound message @a msg from a communicator @a cmc,
  * demultiplex it based on the type calling the right handler.
@@ -3546,30 +4758,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg);
 
 
-/**
- * Send ACK to communicator (if requested) and free @a cmc.
- *
- * @param cmc context for which we are done handling the message
- */
-static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
-{
-  if (0 != ntohl (cmc->im.fc_on)) {
-    /* send ACK when done to communicator for flow control! */
-    struct GNUNET_MQ_Envelope *env;
-    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
-
-    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
-    ack->reserved = htonl (0);
-    ack->fc_id = cmc->im.fc_id;
-    ack->sender = cmc->im.sender;
-    GNUNET_MQ_send (cmc->tc->mq, env);
-  }
-  GNUNET_SERVICE_client_continue (cmc->tc->client);
-  GNUNET_free (cmc);
-}
-
-
 /**
  * Communicator gave us an unencapsulated message to pass as-is to
  * CORE.  Process the request.
@@ -3582,10 +4770,12 @@ static void
 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
 {
   struct CommunicatorMessageContext *cmc = cls;
+  struct VirtualLink *vl;
   uint16_t size = ntohs (mh->size);
 
-  if ((size > UINT16_MAX - sizeof (struct InboundMessage))
-      || (size < sizeof (struct GNUNET_MessageHeader))) {
+  if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
+      (size < sizeof (struct GNUNET_MessageHeader)))
+  {
     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
 
     GNUNET_break (0);
@@ -3593,8 +4783,28 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     GNUNET_SERVICE_client_drop (client);
     return;
   }
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+  if (NULL == vl)
+  {
+    /* FIXME: sender is giving us messages for CORE but we don't have
+       the link up yet! I *suspect* this can happen right now (i.e.
+       sender has verified us, but we didn't verify sender), but if
+       we pass this on, CORE would be confused (link down, messages
+       arrive).  We should investigate more if this happens often,
+       or in a persistent manner, and possibly do "something" about
+       it. Thus logging as error for now. */
+    GNUNET_break_op (0);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (virtual link still down)",
+                              1,
+                              GNUNET_NO);
+
+    finish_cmc_handling (cmc);
+    return;
+  }
   /* Forward to all CORE clients */
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) {
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
     struct GNUNET_MQ_Envelope *env;
     struct InboundMessage *im;
 
@@ -3605,10 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
   }
-  /* FIXME: consider doing this _only_ once the message
-     was drained from the CORE MQs to extend flow control to CORE!
-     (basically, increment counter in cmc, decrement on MQ send continuation! */
-  finish_cmc_handling (cmc);
+  vl->core_recv_window--;
+  if (vl->core_recv_window > 0)
+  {
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* Wait with calling #finish_cmc_handling(cmc) until the message
+     was processed by CORE MQs (for CORE flow control)! */
+  GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
 }
 
 
@@ -3620,20 +4835,24 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_fragment_box (void *cls, const struct TransportFragmentBox *fb)
+check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
 {
   uint16_t size = ntohs (fb->header.size);
   uint16_t bsize = size - sizeof (*fb);
 
-  if (0 == bsize) {
+  (void) cls;
+  if (0 == bsize)
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size)) {
+  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size)) {
+  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
@@ -3642,44 +4861,162 @@ check_fragment_box (void *cls, const struct TransportFragmentBox *fb)
 
 
 /**
- * Generate a fragment acknowledgement for an @a rc.
+ * Clean up an idle cummulative acknowledgement data structure.
  *
- * @param rc context to generate ACK for, @a rc ACK state is reset
+ * @param cls a `struct AcknowledgementCummulator *`
  */
 static void
-send_fragment_ack (struct ReassemblyContext *rc)
+destroy_ack_cummulator (void *cls)
 {
-  struct TransportFragmentAckMessage *ack;
+  struct AcknowledgementCummulator *ac = cls;
 
-  ack = GNUNET_new (struct TransportFragmentAckMessage);
-  ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
-  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
-  ack->frag_uuid = htonl (rc->frag_uuid);
-  ack->extra_acks = GNUNET_htonll (rc->extra_acks);
-  ack->msg_uuid = rc->msg_uuid;
-  ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
-  if (0 == rc->msg_missing)
-    ack->reassembly_timeout = GNUNET_TIME_relative_hton (
-      GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
-  else
-    ack->reassembly_timeout = GNUNET_TIME_relative_hton (
-      GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
-  route_message (&rc->neighbour->pid, &ack->header);
-  rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
-  rc->num_acks = 0;
-  rc->extra_acks = 0LLU;
+  ac->task = NULL;
+  GNUNET_assert (0 == ac->num_acks);
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
+  GNUNET_free (ac);
 }
 
 
 /**
- * Communicator gave us a fragment.  Process the request.
+ * Do the transmission of a cummulative acknowledgement now.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
+ * @param cls a `struct AcknowledgementCummulator *`
+ */
+static void
+transmit_cummulative_ack_cb (void *cls)
+{
+  struct AcknowledgementCummulator *ac = cls;
+  struct TransportReliabilityAckMessage *ack;
+  struct TransportCummulativeAckPayloadP *ap;
+
+  ac->task = NULL;
+  GNUNET_assert (0 < ac->ack_counter);
+  ack = GNUNET_malloc (sizeof (*ack) +
+                       ac->ack_counter *
+                         sizeof (struct TransportCummulativeAckPayloadP));
+  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+  ack->header.size =
+    htons (sizeof (*ack) +
+           ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+  ack->ack_counter = htonl (ac->ack_counter++);
+  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
+  for (unsigned int i = 0; i < ac->ack_counter; i++)
+  {
+    ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
+    ap[i].ack_delay = GNUNET_TIME_relative_hton (
+      GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
+  }
+  route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+  ac->num_acks = 0;
+  ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
+                                           &destroy_ack_cummulator,
+                                           ac);
+}
+
+
+/**
+ * Transmit an acknowledgement for @a ack_uuid to @a pid delaying
+ * transmission by at most @a ack_delay.
+ *
+ * @param pid target peer
+ * @param ack_uuid UUID to ack
+ * @param max_delay how long can the ACK wait
+ */
+static void
+cummulative_ack (const struct GNUNET_PeerIdentity *pid,
+                 const struct AcknowledgementUUIDP *ack_uuid,
+                 struct GNUNET_TIME_Absolute max_delay)
+{
+  struct AcknowledgementCummulator *ac;
+
+  ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
+  if (NULL == ac)
+  {
+    ac = GNUNET_new (struct AcknowledgementCummulator);
+    ac->target = *pid;
+    ac->min_transmission_time = max_delay;
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     ack_cummulators,
+                     &ac->target,
+                     ac,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  }
+  else
+  {
+    if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
+    {
+      /* must run immediately, ack buffer full! */
+      GNUNET_SCHEDULER_cancel (ac->task);
+      transmit_cummulative_ack_cb (ac);
+    }
+    GNUNET_SCHEDULER_cancel (ac->task);
+    ac->min_transmission_time =
+      GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
+  }
+  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
+  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
+  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
+  ac->num_acks++;
+  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
+                                      &transmit_cummulative_ack_cb,
+                                      ac);
+}
+
+
+/**
+ * Closure for #find_by_message_uuid.
+ */
+struct FindByMessageUuidContext
+{
+  /**
+   * UUID to look for.
+   */
+  struct MessageUUIDP message_uuid;
+
+  /**
+   * Set to the reassembly context if found.
+   */
+  struct ReassemblyContext *rc;
+};
+
+
+/**
+ * Iterator called to find a reassembly context by the message UUID in the
+ * multihashmap32.
+ *
+ * @param cls a `struct FindByMessageUuidContext`
+ * @param key a key (unused)
+ * @param value a `struct ReassemblyContext`
+ * @return #GNUNET_YES if not found, #GNUNET_NO if found
+ */
+static int
+find_by_message_uuid (void *cls, uint32_t key, void *value)
+{
+  struct FindByMessageUuidContext *fc = cls;
+  struct ReassemblyContext *rc = value;
+
+  (void) key;
+  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
+  {
+    fc->rc = rc;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a fragment.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
  * @param fb the message that was received
  */
 static void
-handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
+handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
 {
   struct CommunicatorMessageContext *cmc = cls;
   struct Neighbour *n;
@@ -3688,13 +5025,13 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
   uint16_t msize;
   uint16_t fsize;
   uint16_t frag_off;
-  uint32_t frag_uuid;
   char *target;
   struct GNUNET_TIME_Relative cdelay;
-  int ack_now;
+  struct FindByMessageUuidContext fc;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n) {
+  n = lookup_neighbour (&cmc->im.sender);
+  if (NULL == n)
+  {
     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
 
     GNUNET_break (0);
@@ -3702,42 +5039,52 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
     GNUNET_SERVICE_client_drop (client);
     return;
   }
-  if (NULL == n->reassembly_map) {
-    n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8, GNUNET_YES);
-    n->reassembly_heap
-      = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-    n->reassembly_timeout_task
-      = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
-                                      &reassembly_cleanup_task,
-                                      n);
+  if (NULL == n->reassembly_map)
+  {
+    n->reassembly_map = GNUNET_CONTAINER_multihashmap32_create (8);
+    n->reassembly_heap =
+      GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+    n->reassembly_timeout_task =
+      GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
+                                    &reassembly_cleanup_task,
+                                    n);
   }
   msize = ntohs (fb->msg_size);
-  rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map, &fb->msg_uuid);
-  if (NULL == rc) {
+  fc.message_uuid = fb->msg_uuid;
+  fc.rc = NULL;
+  GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
+                                                fb->msg_uuid.uuid,
+                                                &find_by_message_uuid,
+                                                &fc);
+  if (NULL == (rc = fc.rc))
+  {
     rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */
                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
     rc->msg_uuid = fb->msg_uuid;
     rc->neighbour = n;
     rc->msg_size = msize;
-    rc->reassembly_timeout
-      GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
+    rc->reassembly_timeout =
+      GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
     rc->last_frag = GNUNET_TIME_absolute_get ();
     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
                                            rc,
                                            rc->reassembly_timeout.abs_value_us);
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CONTAINER_multishortmap_put (
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap32_put (
                      n->reassembly_map,
-                     &rc->msg_uuid,
+                     rc->msg_uuid.uuid,
                      rc,
-                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    target = (char *)&rc[1];
-    rc->bitfield = (uint8_t *)(target + rc->msg_size);
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+    target = (char *) &rc[1];
+    rc->bitfield = (uint8_t *) (target + rc->msg_size);
     rc->msg_missing = rc->msg_size;
-  } else {
-    target = (char *)&rc[1];
   }
-  if (msize != rc->msg_size) {
+  else
+  {
+    target = (char *) &rc[1];
+  }
+  if (msize != rc->msg_size)
+  {
     GNUNET_break (0);
     finish_cmc_handling (cmc);
     return;
@@ -3745,80 +5092,51 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
 
   /* reassemble */
   fsize = ntohs (fb->header.size) - sizeof (*fb);
+  if (0 == fsize)
+  {
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    return;
+  }
   frag_off = ntohs (fb->frag_off);
   memcpy (&target[frag_off], &fb[1], fsize);
   /* update bitfield and msg_missing */
-  for (unsigned int i = frag_off; i < frag_off + fsize; i++) {
-    if (0 == (rc->bitfield[i / 8] & (1 << (i % 8)))) {
+  for (unsigned int i = frag_off; i < frag_off + fsize; i++)
+  {
+    if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
+    {
       rc->bitfield[i / 8] |= (1 << (i % 8));
       rc->msg_missing--;
     }
   }
 
   /* Compute cummulative ACK */
-  frag_uuid = ntohl (fb->frag_uuid);
   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
-  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks);
+  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
+  if (0 == rc->msg_missing)
+    cdelay = GNUNET_TIME_UNIT_ZERO;
+  cummulative_ack (&cmc->im.sender,
+                   &fb->ack_uuid,
+                   GNUNET_TIME_relative_to_absolute (cdelay));
   rc->last_frag = GNUNET_TIME_absolute_get ();
-  rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay);
-  ack_now = GNUNET_NO;
-  if (0 == rc->num_acks) {
-    /* case one: first ack */
-    rc->frag_uuid = frag_uuid;
-    rc->extra_acks = 0LLU;
-    rc->num_acks = 1;
-  } else if ((frag_uuid >= rc->frag_uuid)
-             && (frag_uuid <= rc->frag_uuid + 64)) {
-    /* case two: ack fits after existing min UUID */
-    if ((frag_uuid == rc->frag_uuid)
-        || (0
-            != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1))))) {
-      /* duplicate fragment, ack now! */
-      ack_now = GNUNET_YES;
-    } else {
-      rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
-      rc->num_acks++;
-    }
-  } else if ((rc->frag_uuid > frag_uuid)
-             && (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks))
-                 || ((rc->frag_uuid < frag_uuid + 64)
-                     && (rc->extra_acks
-                         == (rc->extra_acks
-                             & ~((1LLU << (64 - (rc->frag_uuid - frag_uuid)))
-                                 - 1LLU)))))) {
-    /* can fit ack by shifting extra acks and starting at
-       frag_uid, test above esured that the bits we will
-       shift 'extra_acks' by are all zero. */
-    rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
-    rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
-    rc->frag_uuid = frag_uuid;
-    rc->num_acks++;
-  }
-  if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very
-                             aggressive. */
-    ack_now = GNUNET_YES; /* maximum acks received */
-  // FIXME: possibly also ACK based on RTT (but for that we'd need to
-  // determine the queue used for the ACK first!)
-
   /* is reassembly complete? */
-  if (0 != rc->msg_missing) {
-    if (ack_now)
-      send_fragment_ack (rc);
+  if (0 != rc->msg_missing)
+  {
     finish_cmc_handling (cmc);
     return;
   }
   /* reassembly is complete, verify result */
-  msg = (const struct GNUNET_MessageHeader *)&rc[1];
-  if (ntohs (msg->size) != rc->msg_size) {
+  msg = (const struct GNUNET_MessageHeader *) &rc[1];
+  if (ntohs (msg->size) != rc->msg_size)
+  {
     GNUNET_break (0);
     free_reassembly_context (rc);
     finish_cmc_handling (cmc);
     return;
   }
   /* successful reassembly */
-  send_fragment_ack (rc);
   demultiplex_with_cmc (cmc, msg);
-  /* FIXME: really free here? Might be bad if fragments are still
+  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
      en-route and we forget that we finished this reassembly immediately!
      -> keep around until timeout?
      -> shorten timeout based on ACK? */
@@ -3827,161 +5145,204 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
 
 
 /**
- * Check the @a fa against the fragments associated with @a pm.
- * If it matches, remove the matching fragments from the transmission
- * list.
+ * Communicator gave us a reliability box.  Check the message.
  *
- * @param pm pending message to check against the ack
- * @param fa the ack that was received
- * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_ack_against_pm (struct PendingMessage *pm,
-                      const struct TransportFragmentAckMessage *fa)
-{
-  int match;
-  struct PendingMessage *nxt;
-  uint32_t fs = ntohl (fa->frag_uuid);
-  uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
-
-  match = GNUNET_NO;
-  for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt) {
-    const struct TransportFragmentBox *tfb
-      = (const struct TransportFragmentBox *)&pm[1];
-    uint32_t fu = ntohl (tfb->frag_uuid);
-
-    GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
-    nxt = frag->next_frag;
-    /* Check for exact match or match in the 'xtra' bitmask */
-    if ((fu == fs)
-        || ((fu > fs) && (fu <= fs + 64)
-            && (0 != (1LLU << (fu - fs - 1) & xtra)))) {
-      match = GNUNET_YES;
-      free_fragment_tree (frag);
-    }
-  }
-  return match;
+check_reliability_box (void *cls,
+                       const struct TransportReliabilityBoxMessage *rb)
+{
+  (void) cls;
+  GNUNET_MQ_check_boxed_message (rb);
+  return GNUNET_YES;
 }
 
 
 /**
- * Communicator gave us a fragment acknowledgement.  Process the request.
+ * Communicator gave us a reliability box.  Process the request.
  *
  * @param cls a `struct CommunicatorMessageContext` (must call
  * #finish_cmc_handling() when done)
- * @param fa the message that was received
+ * @param rb the message that was received
  */
 static void
-handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa)
+handle_reliability_box (void *cls,
+                        const struct TransportReliabilityBoxMessage *rb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct Neighbour *n;
-  int matched;
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &rb[1];
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n) {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+  // FIXME: call cummulative_ack(), have ack_countdown influence max_delay!
+  (void) (0 == ntohl (rb->ack_countdown));
+  /* continue with inner message */
+  demultiplex_with_cmc (cmc, inbox);
+}
 
-    GNUNET_break (0);
-    finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
-    return;
-  }
-  /* FIXME-OPTIMIZE: maybe use another hash map here? */
-  matched = GNUNET_NO;
-  for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm;
-       pm = pm->prev_neighbour) {
-    if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid))
-      continue;
-    matched = GNUNET_YES;
-    if (GNUNET_YES == check_ack_against_pm (pm, fa)) {
-      struct GNUNET_TIME_Relative avg_ack_delay
-        = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
-      // FIXME: update RTT and other reliability data!
-      // ISSUE: we don't know which of n's queues the message(s)
-      // took (and in fact the different messages might have gone
-      // over different queues and possibly over multiple).
-      // => track queues with PendingMessages, and update RTT only if
-      //    the queue used is unique?
-      //    -> how can we get loss rates?
-      //    -> or, add extra state to Box and ACK to identify queue?
-      // IDEA: generate MULTIPLE frag-uuids per fragment and track
-      //    the queue with the fragment! (-> this logic must
-      //    be moved into check_ack_against_pm!)
-      (void)avg_ack_delay;
-    } else {
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# FRAGMENT_ACKS dropped, no matching fragment",
-                                1,
-                                GNUNET_NO);
-    }
-    if (NULL == pm->head_frag) {
-      // if entire message is ACKed, handle that as well.
-      // => clean up PM, any post actions?
-      free_pending_message (pm);
-    } else {
-      struct GNUNET_TIME_Relative reassembly_timeout
-        = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
-      // OPTIMIZE-FIXME: adjust retransmission strategy based on
-      // reassembly_timeout!
-      (void)reassembly_timeout;
-    }
-    break;
-  }
-  if (GNUNET_NO == matched) {
-    GNUNET_STATISTICS_update (
-      GST_stats,
-      "# FRAGMENT_ACKS dropped, no matching pending message",
-      1,
-      GNUNET_NO);
+
+/**
+ * Check if we have advanced to another age since the last time.  If
+ * so, purge ancient statistics (more than GOODPUT_AGING_SLOTS before
+ * the current age)
+ *
+ * @param pd[in,out] data to update
+ * @param age current age
+ */
+static void
+update_pd_age (struct PerformanceData *pd, unsigned int age)
+{
+  unsigned int sage;
+
+  if (age == pd->last_age)
+    return; /* nothing to do */
+  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
+  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
+  {
+    struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
+
+    the->bytes_sent = 0;
+    the->bytes_received = 0;
   }
-  finish_cmc_handling (cmc);
+  pd->last_age = age;
 }
 
 
 /**
- * Communicator gave us a reliability box.  Check the message.
+ * Update @a pd based on the latest @a rtt and the number of bytes
+ * that were confirmed to be successfully transmitted.
  *
- * @param cls a `struct CommunicatorMessageContext`
- * @param rb the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param pd[in,out] data to update
+ * @param rtt latest round-trip time
+ * @param bytes_transmitted_ok number of bytes receiver confirmed as received
  */
-static int
-check_reliability_box (void *cls, const struct TransportReliabilityBox *rb)
+static void
+update_performance_data (struct PerformanceData *pd,
+                         struct GNUNET_TIME_Relative rtt,
+                         uint16_t bytes_transmitted_ok)
 {
-  GNUNET_MQ_check_boxed_message (rb);
-  return GNUNET_YES;
+  uint64_t nval = rtt.rel_value_us;
+  uint64_t oval = pd->aged_rtt.rel_value_us;
+  unsigned int age = get_age ();
+  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
+
+  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
+    pd->aged_rtt = rtt;
+  else
+    pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
+  update_pd_age (pd, age);
+  the->bytes_received += bytes_transmitted_ok;
 }
 
 
 /**
- * Communicator gave us a reliability box.  Process the request.
+ * We have successfully transmitted data via @a q, update metrics.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param rb the message that was received
+ * @param q queue to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
  */
 static void
-handle_reliability_box (void *cls, const struct TransportReliabilityBox *rb)
+update_queue_performance (struct Queue *q,
+                          struct GNUNET_TIME_Relative rtt,
+                          uint16_t bytes_transmitted_ok)
 {
-  struct CommunicatorMessageContext *cmc = cls;
-  const struct GNUNET_MessageHeader *inbox
-    = (const struct GNUNET_MessageHeader *)&rb[1];
+  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
+}
+
+
+/**
+ * We have successfully transmitted data via @a dvh, update metrics.
+ *
+ * @param dvh distance vector path data to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
+ */
+static void
+update_dvh_performance (struct DistanceVectorHop *dvh,
+                        struct GNUNET_TIME_Relative rtt,
+                        uint16_t bytes_transmitted_ok)
+{
+  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
+}
 
-  if (0 == ntohl (rb->ack_countdown)) {
-    struct TransportReliabilityAckMessage *ack;
 
-    /* FIXME: implement cummulative ACKs and ack_countdown,
-       then setting the avg_ack_delay field below: */
-    ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct GNUNET_ShortHashCode));
-    ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
-    ack->header.size
-      = htons (sizeof (*ack) + sizeof (struct GNUNET_ShortHashCode));
-    memcpy (&ack[1], &rb->msg_uuid, sizeof (struct GNUNET_ShortHashCode));
-    route_message (&cmc->im.sender, &ack->header);
+/**
+ * The @a pa was acknowledged, process the acknowledgement.
+ *
+ * @param pa the pending acknowledgement that was satisfied
+ * @param ack_delay artificial delay from cummulative acks created by the
+ * other peer
+ */
+static void
+handle_acknowledged (struct PendingAcknowledgement *pa,
+                     struct GNUNET_TIME_Relative ack_delay)
+{
+  struct PendingMessage *pm = pa->pm;
+  struct GNUNET_TIME_Relative delay;
+
+  delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
+  if (delay.rel_value_us > ack_delay.rel_value_us)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  else
+    delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
+  if (NULL != pa->queue)
+    update_queue_performance (pa->queue, delay, pa->message_size);
+  if (NULL != pa->dvh)
+    update_dvh_performance (pa->dvh, delay, pa->message_size);
+  if (NULL != pm)
+  {
+    if (NULL != pm->frag_parent)
+    {
+      pm = pm->frag_parent;
+      free_fragment_tree (pa->pm);
+    }
+    while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
+    {
+      struct PendingMessage *parent = pm->frag_parent;
+
+      free_fragment_tree (pm);
+      pm = parent;
+    }
+    if (NULL != pm->head_frag)
+      pm = NULL; /* we are done, otherwise free 'pm' below */
   }
-  /* continue with inner message */
-  demultiplex_with_cmc (cmc, inbox);
+  if (NULL != pm)
+    free_pending_message (pm);
+  free_pending_acknowledgement (pa);
+}
+
+
+/**
+ * Communicator gave us a reliability ack.  Check it is well-formed.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (unused)
+ * @param ra the message that was received
+ * @return #GNUNET_Ok if @a ra is well-formed
+ */
+static int
+check_reliability_ack (void *cls,
+                       const struct TransportReliabilityAckMessage *ra)
+{
+  unsigned int n_acks;
+
+  (void) cls;
+  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+           sizeof (struct TransportCummulativeAckPayloadP);
+  if (0 == n_acks)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if ((ntohs (ra->header.size) - sizeof (*ra)) !=
+      n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
 }
 
 
@@ -3997,97 +5358,318 @@ handle_reliability_ack (void *cls,
                         const struct TransportReliabilityAckMessage *ra)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct Neighbour *n;
+  const struct TransportCummulativeAckPayloadP *ack;
+  struct PendingAcknowledgement *pa;
   unsigned int n_acks;
-  const struct GNUNET_ShortHashCode *msg_uuids;
-  struct PendingMessage *nxt;
-  int matched;
+  uint32_t ack_counter;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n) {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+           sizeof (struct TransportCummulativeAckPayloadP);
+  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
+  for (unsigned int i = 0; i < n_acks; i++)
+  {
+    pa =
+      GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
+    if (NULL == pa)
+    {
+      GNUNET_STATISTICS_update (
+        GST_stats,
+        "# FRAGMENT_ACKS dropped, no matching pending message",
+        1,
+        GNUNET_NO);
+      continue;
+    }
+    handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
+  }
 
-    GNUNET_break (0);
-    finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
-    return;
+  ack_counter = htonl (ra->ack_counter);
+  (void) ack_counter; /* silence compiler warning for now */
+  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
+  // (DV and/or Neighbour?)
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param be the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_backchannel_encapsulation (
+  void *cls,
+  const struct TransportBackchannelEncapsulationMessage *be)
+{
+  uint16_t size = ntohs (be->header.size);
+
+  (void) cls;
+  if ((size - sizeof (*be)) <
+      (sizeof (struct TransportBackchannelRequestPayloadP) +
+       sizeof (struct GNUNET_MessageHeader)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  n_acks = (ntohs (ra->header.size) - sizeof (*ra))
-           / sizeof (struct GNUNET_ShortHashCode);
-  msg_uuids = (const struct GNUNET_ShortHashCode *)&ra[1];
+  return GNUNET_YES;
+}
 
-  /* FIXME-OPTIMIZE: maybe use another hash map here? */
-  matched = GNUNET_NO;
-  for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt) {
-    int in_list;
 
-    nxt = pm->next_neighbour;
-    in_list = GNUNET_NO;
-    for (unsigned int i = 0; i < n_acks; i++) {
-      if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid))
-        continue;
-      in_list = GNUNET_YES;
+/**
+ * We received the plaintext @a msg from backtalker @a b. Forward
+ * it to the respective communicator.
+ *
+ * @param b a backtalker
+ * @param msg a message, consisting of a `struct GNUNET_MessageHeader`
+ *        followed by the target name of the communicator
+ * @param msg_size number of bytes in @a msg
+ */
+static void
+forward_backchannel_payload (struct Backtalker *b,
+                             const void *msg,
+                             size_t msg_size)
+{
+  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
+  struct GNUNET_MQ_Envelope *env;
+  struct TransportClient *tc;
+  const struct GNUNET_MessageHeader *mh;
+  const char *target_communicator;
+  uint16_t mhs;
+
+  /* Determine target_communicator and check @a msg is well-formed */
+  mh = msg;
+  mhs = ntohs (mh->size);
+  if (mhs <= msg_size)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  target_communicator = &((const char *) msg)[ntohs (mh->size)];
+  if ('\0' != target_communicator[msg_size - mhs - 1])
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  /* Find client providing this communicator */
+  for (tc = clients_head; NULL != tc; tc = tc->next)
+    if ((CT_COMMUNICATOR == tc->type) &&
+        (0 ==
+         strcmp (tc->details.communicator.address_prefix, target_communicator)))
       break;
-    }
-    if (GNUNET_NO == in_list)
-      continue;
+  if (NULL == tc)
+  {
+    char *stastr;
+
+    GNUNET_asprintf (
+      &stastr,
+      "# Backchannel message dropped: target communicator `%s' unknown",
+      target_communicator);
+    GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO);
+    GNUNET_free (stastr);
+    return;
+  }
+  /* Finally, deliver backchannel message to communicator */
+  env = GNUNET_MQ_msg_extra (
+    cbi,
+    msg_size,
+    GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
+  cbi->pid = b->pid;
+  memcpy (&cbi[1], msg, msg_size);
+  GNUNET_MQ_send (tc->mq, env);
+}
 
-    /* this pm was acked! */
-    matched = GNUNET_YES;
-    free_pending_message (pm);
 
-    {
-      struct GNUNET_TIME_Relative avg_ack_delay
-        = GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
-      // FIXME: update RTT and other reliability data!
-      // ISSUE: we don't know which of n's queues the message(s)
-      // took (and in fact the different messages might have gone
-      // over different queues and possibly over multiple).
-      // => track queues with PendingMessages, and update RTT only if
-      //    the queue used is unique?
-      //    -> how can we get loss rates?
-      //    -> or, add extra state to MSG and ACKs to identify queue?
-      //    -> if we do this, might just do the same for the avg_ack_delay!
-      (void)avg_ack_delay;
-    }
+/**
+ * Free data structures associated with @a b.
+ *
+ * @param b data structure to release
+ */
+static void
+free_backtalker (struct Backtalker *b)
+{
+  if (NULL != b->get)
+  {
+    GNUNET_PEERSTORE_iterate_cancel (b->get);
+    b->get = NULL;
+    GNUNET_assert (NULL != b->cmc);
+    finish_cmc_handling (b->cmc);
+    b->cmc = NULL;
+  }
+  if (NULL != b->task)
+  {
+    GNUNET_SCHEDULER_cancel (b->task);
+    b->task = NULL;
+  }
+  if (NULL != b->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (b->sc);
+    b->sc = NULL;
+  }
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
+  GNUNET_free (b);
+}
+
+
+/**
+ * Callback to free backtalker records.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct Backtalker`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_backtalker_cb (void *cls,
+                    const struct GNUNET_PeerIdentity *pid,
+                    void *value)
+{
+  struct Backtalker *b = value;
+
+  (void) cls;
+  (void) pid;
+  free_backtalker (b);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function called when it is time to clean up a backtalker.
+ *
+ * @param cls a `struct Backtalker`
+ */
+static void
+backtalker_timeout_cb (void *cls)
+{
+  struct Backtalker *b = cls;
+
+  b->task = NULL;
+  if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
+  {
+    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    return;
+  }
+  GNUNET_assert (NULL == b->sc);
+  free_backtalker (b);
+}
+
+
+/**
+ * Function called with the monotonic time of a backtalker
+ * by PEERSTORE. Updates the time and continues processing.
+ *
+ * @param cls a `struct Backtalker`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
+ */
+static void
+backtalker_monotime_cb (void *cls,
+                        const struct GNUNET_PEERSTORE_Record *record,
+                        const char *emsg)
+{
+  struct Backtalker *b = cls;
+  struct GNUNET_TIME_AbsoluteNBO *mtbe;
+  struct GNUNET_TIME_Absolute mt;
+
+  (void) emsg;
+  if (NULL == record)
+  {
+    /* we're done with #backtalker_monotime_cb() invocations,
+       continue normal processing */
+    b->get = NULL;
+    GNUNET_assert (NULL != b->cmc);
+    finish_cmc_handling (b->cmc);
+    b->cmc = NULL;
+    if (0 != b->body_size)
+      forward_backchannel_payload (b, &b[1], b->body_size);
+    return;
+  }
+  if (sizeof (*mtbe) != record->value_size)
+  {
+    GNUNET_break (0);
+    return;
   }
-  if (GNUNET_NO == matched) {
+  mtbe = record->value;
+  mt = GNUNET_TIME_absolute_ntoh (*mtbe);
+  if (mt.abs_value_us > b->monotonic_time.abs_value_us)
+  {
     GNUNET_STATISTICS_update (
       GST_stats,
-      "# FRAGMENT_ACKS dropped, no matching pending message",
+      "# Backchannel messages dropped: monotonic time not increasing",
       1,
       GNUNET_NO);
+    b->monotonic_time = mt;
+    /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+     */
+    b->body_size = 0;
+    return;
   }
-  finish_cmc_handling (cmc);
 }
 
 
 /**
- * Communicator gave us a backchannel encapsulation.  Check the message.
+ * Function called by PEERSTORE when the store operation of
+ * a backtalker's monotonic time is complete.
  *
- * @param cls a `struct CommunicatorMessageContext`
- * @param be the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param cls the `struct Backtalker`
+ * @param success #GNUNET_OK on success
  */
-static int
-check_backchannel_encapsulation (
-  void *cls,
-  const struct TransportBackchannelEncapsulationMessage *be)
+static void
+backtalker_monotime_store_cb (void *cls, int success)
 {
-  uint16_t size = ntohs (be->header.size);
+  struct Backtalker *b = cls;
 
-  (void)cls;
-  if (size - sizeof (*be) < sizeof (struct TransportBackchannelRequestPayload)
-                              + sizeof (struct GNUNET_MessageHeader)) {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+  if (GNUNET_OK != success)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to store backtalker's monotonic time in PEERSTORE!\n");
   }
-  return GNUNET_YES;
+  b->sc = NULL;
+  b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+}
+
+
+/**
+ * The backtalker @a b monotonic time changed. Update PEERSTORE.
+ *
+ * @param b a backtalker with updated monotonic time
+ */
+static void
+update_backtalker_monotime (struct Backtalker *b)
+{
+  struct GNUNET_TIME_AbsoluteNBO mtbe;
+
+  if (NULL != b->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (b->sc);
+    b->sc = NULL;
+  }
+  else
+  {
+    GNUNET_SCHEDULER_cancel (b->task);
+    b->task = NULL;
+  }
+  mtbe = GNUNET_TIME_absolute_hton (b->monotonic_time);
+  b->sc =
+    GNUNET_PEERSTORE_store (peerstore,
+                            "transport",
+                            &b->pid,
+                            GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+                            &mtbe,
+                            sizeof (mtbe),
+                            GNUNET_TIME_UNIT_FOREVER_ABS,
+                            GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                            &backtalker_monotime_store_cb,
+                            b);
 }
 
 
 /**
  * Communicator gave us a backchannel encapsulation.  Process the request.
+ * (We are not the origin of the backchannel here, the communicator simply
+ * received a backchannel message and we are expected to forward it.)
  *
  * @param cls a `struct CommunicatorMessageContext` (must call
  * #finish_cmc_handling() when done)
@@ -4104,20 +5686,21 @@ handle_backchannel_encapsulation (
   const char *hdr;
   size_t hdr_len;
 
-  if (0 != GNUNET_memcmp (&be->target, &GST_my_identity)) {
+  if (0 != GNUNET_memcmp (&be->target, &GST_my_identity))
+  {
     /* not for me, try to route to target */
-    /* FIXME: someone needs to update be->distance! */
-    /* FIXME: BE routing can be special, should we put all of this
-       on 'route_message'? Maybe at least pass some more arguments? */
-    route_message (&be->target, GNUNET_copy_message (&be->header));
+    route_message (&be->target,
+                   GNUNET_copy_message (&be->header),
+                   RMO_DV_ALLOWED);
     finish_cmc_handling (cmc);
     return;
   }
   dh_key_derive_eph_pub (&be->ephemeral_key, &be->iv, &key);
-  hdr = (const char *)&be[1];
+  hdr = (const char *) &be[1];
   hdr_len = ntohs (be->header.size) - sizeof (*be);
   bc_hmac (&key, &hmac, hdr, hdr_len);
-  if (0 != GNUNET_memcmp (&hmac, &be->hmac)) {
+  if (0 != GNUNET_memcmp (&hmac, &be->hmac))
+  {
     /* HMAC missmatch, disard! */
     GNUNET_break_op (0);
     finish_cmc_handling (cmc);
@@ -4125,23 +5708,88 @@ handle_backchannel_encapsulation (
   }
   /* begin actual decryption */
   {
-    struct TransportBackchannelRequestPayload ppay;
+    struct Backtalker *b;
+    struct GNUNET_TIME_Absolute monotime;
+    struct TransportBackchannelRequestPayloadP ppay;
     char body[hdr_len - sizeof (ppay)];
 
-    GNUNET_assert (hdr_len
-                   >= sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
+    GNUNET_assert (hdr_len >=
+                   sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
     bc_decrypt (&key, &ppay, hdr, sizeof (ppay));
     bc_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
     bc_key_clean (&key);
-    // FIXME: verify signatures in ppay!
-    // => check if ephemeral key is known & valid, if not
-    // => verify sig, cache ephemeral key
-    // => update monotonic_time of sender for replay detection
-
-    // FIXME: forward to specified communicator!
-    // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
+    monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
+    b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
+    if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
+    {
+      GNUNET_STATISTICS_update (
+        GST_stats,
+        "# Backchannel messages dropped: monotonic time not increasing",
+        1,
+        GNUNET_NO);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    if ((NULL == b) ||
+        (0 != GNUNET_memcmp (&b->last_ephemeral, &be->ephemeral_key)))
+    {
+      /* Check signature */
+      struct EphemeralConfirmationPS ec;
+
+      ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+      ec.purpose.size = htonl (sizeof (ec));
+      ec.target = GST_my_identity;
+      ec.ephemeral_key = be->ephemeral_key;
+      if (
+        GNUNET_OK !=
+        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
+                                    &ec.purpose,
+                                    &ppay.sender_sig,
+                                    &ppay.sender.public_key))
+      {
+        /* Signature invalid, disard! */
+        GNUNET_break_op (0);
+        finish_cmc_handling (cmc);
+        return;
+      }
+    }
+    if (NULL != b)
+    {
+      /* update key cache and mono time */
+      b->last_ephemeral = be->ephemeral_key;
+      b->monotonic_time = monotime;
+      update_backtalker_monotime (b);
+      forward_backchannel_payload (b, body, sizeof (body));
+      b->timeout =
+        GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    /* setup data structure to cache signature AND check
+       monotonic time with PEERSTORE before forwarding backchannel payload */
+    b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
+    b->pid = ppay.sender;
+    b->body_size = sizeof (body);
+    memcpy (&b[1], body, sizeof (body));
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     backtalkers,
+                     &b->pid,
+                     b,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    b->monotonic_time = monotime; /* NOTE: to be checked still! */
+    b->cmc = cmc;
+    b->timeout =
+      GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    b->get =
+      GNUNET_PEERSTORE_iterate (peerstore,
+                                "transport",
+                                &b->pid,
+                                GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+                                &backtalker_monotime_cb,
+                                b);
   }
-  finish_cmc_handling (cmc);
 }
 
 
@@ -4161,18 +5809,58 @@ path_cleanup_cb (void *cls)
   struct DistanceVectorHop *pos;
 
   dv->timeout_task = NULL;
-  while (NULL != (pos = dv->dv_head)) {
+  while (NULL != (pos = dv->dv_head))
+  {
     GNUNET_assert (dv == pos->dv);
     if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
       break;
     free_distance_vector_hop (pos);
   }
-  if (NULL == pos) {
+  if (NULL == pos)
+  {
     free_dv_route (dv);
     return;
   }
-  dv->timeout_task
-    = GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
+  dv->timeout_task =
+    GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
+}
+
+
+/**
+ * The @a hop is a validated path to the respective target
+ * peer and we should tell core about it -- and schedule
+ * a job to revoke the state.
+ *
+ * @param hop a path to some peer that is the reason for activation
+ */
+static void
+activate_core_visible_dv_path (struct DistanceVectorHop *hop)
+{
+  struct DistanceVector *dv = hop->dv;
+  struct VirtualLink *vl;
+
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+  if (NULL != vl)
+  {
+    /* Link was already up, remember dv is also now available and we are done */
+    vl->dv = dv;
+    return;
+  }
+  vl = GNUNET_new (struct VirtualLink);
+  vl->target = dv->target;
+  vl->dv = dv;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
+     before, so tell CORE about it (finally!) */
+  cores_send_connect_info (&dv->target);
 }
 
 
@@ -4188,11 +5876,13 @@ path_cleanup_cb (void *cls)
  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
  *
  * @param path the path we learned, path[0] should be us,
- *             and then path contains a valid path from us to `path[path_len-1]`
- *             path[1] should be a direct neighbour (we should check!)
+ *             and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
  * @param path_len number of entries on the @a path, at least three!
  * @param network_latency how long does the message take from us to
  * `path[path_len-1]`? set to "forever" if unknown
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
  * @return #GNUNET_YES on success,
  *         #GNUNET_NO if we have better path(s) to the target
  *         #GNUNET_SYSERR if the path is useless and/or invalid
@@ -4202,41 +5892,46 @@ path_cleanup_cb (void *cls)
 static int
 learn_dv_path (const struct GNUNET_PeerIdentity *path,
                unsigned int path_len,
-               struct GNUNET_TIME_Relative network_latency)
+               struct GNUNET_TIME_Relative network_latency,
+               struct GNUNET_TIME_Absolute path_valid_until)
 {
   struct DistanceVectorHop *hop;
   struct DistanceVector *dv;
   struct Neighbour *next_hop;
   unsigned int shorter_distance;
 
-  if (path_len < 3) {
+  if (path_len < 3)
+  {
     /* what a boring path! not allowed! */
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
   GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
-  next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours, &path[1]);
-  if (NULL == next_hop) {
+  next_hop = lookup_neighbour (&path[1]);
+  if (NULL == next_hop)
+  {
     /* next hop must be a neighbour, otherwise this whole thing is useless! */
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
   for (unsigned int i = 2; i < path_len; i++)
-    if (NULL != GNUNET_CONTAINER_multipeermap_get (neighbours, &path[i])) {
+    if (NULL != lookup_neighbour (&path[i]))
+    {
       /* Useless path, we have a direct connection to some hop
          in the middle of the path, so this one doesn't even
          seem terribly useful for redundancy */
       return GNUNET_SYSERR;
     }
   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
-  if (NULL == dv) {
+  if (NULL == dv)
+  {
     dv = GNUNET_new (struct DistanceVector);
     dv->target = path[path_len - 1];
     dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
                                                      &path_cleanup_cb,
                                                      dv);
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CONTAINER_multipeermap_put (
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multipeermap_put (
                      dv_routes,
                      &dv->target,
                      dv,
@@ -4245,22 +5940,27 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
   /* Check if we have this path already! */
   shorter_distance = 0;
   for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-       pos = pos->next_dv) {
+       pos = pos->next_dv)
+  {
     if (pos->distance < path_len - 2)
       shorter_distance++;
     /* Note that the distances in 'pos' excludes us (path[0]) and
        the next_hop (path[1]), so we need to subtract two
        and check next_hop explicitly */
-    if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop)) {
+    if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop))
+    {
       int match = GNUNET_YES;
 
-      for (unsigned int i = 0; i < pos->distance; i++) {
-        if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2])) {
+      for (unsigned int i = 0; i < pos->distance; i++)
+      {
+        if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
+        {
           match = GNUNET_NO;
           break;
         }
       }
-      if (GNUNET_YES == match) {
+      if (GNUNET_YES == match)
+      {
         struct GNUNET_TIME_Relative last_timeout;
 
         /* Re-discovered known path, update timeout */
@@ -4269,14 +5969,20 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
                                   1,
                                   GNUNET_NO);
         last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
-        pos->timeout
-          = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+        pos->timeout =
+          GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+        pos->path_valid_until =
+          GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
         GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
         GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
-        if (last_timeout.rel_value_us
-            < GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
-                                             DV_PATH_DISCOVERY_FREQUENCY)
-                .rel_value_us) {
+        if (0 <
+            GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
+          activate_core_visible_dv_path (pos);
+        if (last_timeout.rel_value_us <
+            GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
+                                           DV_PATH_DISCOVERY_FREQUENCY)
+              .rel_value_us)
+        {
           /* Some peer send DV learn messages too often, we are learning
              the same path faster than it would be useful; do not forward! */
           return GNUNET_NO;
@@ -4287,26 +5993,31 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
   }
   /* Count how many shorter paths we have (incl. direct
      neighbours) before simply giving up on this one! */
-  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET) {
+  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
+  {
     /* We have a shorter path already! */
     return GNUNET_NO;
   }
   /* create new DV path entry */
-  hop = GNUNET_malloc (sizeof (struct DistanceVectorHop)
-                       sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
+  hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
+                       sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
   hop->next_hop = next_hop;
   hop->dv = dv;
-  hop->path = (const struct GNUNET_PeerIdentity *)&hop[1];
+  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
   memcpy (&hop[1],
           &path[2],
           sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+  hop->path_valid_until = path_valid_until;
   hop->distance = path_len - 2;
+  hop->pd.aged_rtt = network_latency;
   GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
   GNUNET_CONTAINER_MDLL_insert (neighbour,
                                 next_hop->dv_head,
                                 next_hop->dv_tail,
                                 hop);
+  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
+    activate_core_visible_dv_path (hop);
   return GNUNET_YES;
 }
 
@@ -4319,27 +6030,32 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_dv_learn (void *cls, const struct TransportDVLearn *dvl)
+check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
 {
   uint16_t size = ntohs (dvl->header.size);
   uint16_t num_hops = ntohs (dvl->num_hops);
-  const struct DVPathEntryP *hops = (const struct DVPathEntryP *)&dvl[1];
+  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
 
-  (void)cls;
-  if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP)) {
+  (void) cls;
+  if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  if (num_hops > MAX_DV_HOPS_ALLOWED) {
+  if (num_hops > MAX_DV_HOPS_ALLOWED)
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  for (unsigned int i = 0; i < num_hops; i++) {
-    if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop)) {
+  for (unsigned int i = 0; i < num_hops; i++)
+  {
+    if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
+    {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
-    if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop)) {
+    if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
+    {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
@@ -4361,55 +6077,56 @@ check_dv_learn (void *cls, const struct TransportDVLearn *dvl)
  */
 static void
 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
-                  const struct TransportDVLearn *msg,
+                  const struct TransportDVLearnMessage *msg,
                   uint16_t bi_history,
                   uint16_t nhops,
                   const struct DVPathEntryP *hops,
                   struct GNUNET_TIME_Absolute in_time)
 {
   struct DVPathEntryP *dhops;
-  struct TransportDVLearn *fwd;
+  struct TransportDVLearnMessage *fwd;
   struct GNUNET_TIME_Relative nnd;
 
   /* compute message for forwarding */
   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
-  fwd = GNUNET_malloc (sizeof (struct TransportDVLearn)
-                       (nhops + 1) * sizeof (struct DVPathEntryP));
+  fwd = GNUNET_malloc (sizeof (struct TransportDVLearnMessage) +
+                       (nhops + 1) * sizeof (struct DVPathEntryP));
   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
-  fwd->header.size = htons (sizeof (struct TransportDVLearn)
-                            (nhops + 1) * sizeof (struct DVPathEntryP));
+  fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
+                            (nhops + 1) * sizeof (struct DVPathEntryP));
   fwd->num_hops = htons (nhops + 1);
   fwd->bidirectional = htons (bi_history);
-  nnd = GNUNET_TIME_relative_add (
-    GNUNET_TIME_absolute_get_duration (in_time),
-    GNUNET_TIME_relative_ntoh (msg->non_network_delay));
+  nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
+                                  GNUNET_TIME_relative_ntoh (
+                                    msg->non_network_delay));
   fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
   fwd->init_sig = msg->init_sig;
   fwd->initiator = msg->initiator;
   fwd->challenge = msg->challenge;
-  dhops = (struct DVPathEntryP *)&fwd[1];
+  dhops = (struct DVPathEntryP *) &fwd[1];
   GNUNET_memcpy (dhops, hops, sizeof (struct DVPathEntryP) * nhops);
   dhops[nhops].hop = GST_my_identity;
   {
-    struct DvHopPS dhp
-      = {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
-         .purpose.size = htonl (sizeof (dhp)),
-         .pred = dhops[nhops - 1].hop,
-         .succ = *next_hop,
-         .challenge = msg->challenge};
-
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                                &dhp.purpose,
-                                                &dhops[nhops].hop_sig));
-  }
-  route_message (next_hop, &fwd->header);
+    struct DvHopPS dhp = {.purpose.purpose =
+                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                          .purpose.size = htonl (sizeof (dhp)),
+                          .pred = dhops[nhops - 1].hop,
+                          .succ = *next_hop,
+                          .challenge = msg->challenge};
+
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                             &dhp.purpose,
+                                             &dhops[nhops].hop_sig));
+  }
+  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
 }
 
 
 /**
  * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
  *
+ * @param sender_monotonic_time monotonic time of the initiator
  * @param init the signer
  * @param challenge the challenge that was signed
  * @param init_sig signature presumably by @a init
@@ -4417,21 +6134,24 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
  */
 static int
 validate_dv_initiator_signature (
+  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
   const struct GNUNET_PeerIdentity *init,
-  const struct GNUNET_ShortHashCode *challenge,
+  const struct ChallengeNonceP *challenge,
   const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
 {
-  struct DvInitPS ip = {
-    .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
-    .purpose.size = htonl (sizeof (ip)),
-    .challenge = *challenge};
-
-  if (GNUNET_OK
-      != GNUNET_CRYPTO_eddsa_verify (
-        GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
-        &ip.purpose,
-        init_sig,
-        &init->public_key)) {
+  struct DvInitPS ip = {.purpose.purpose = htonl (
+                          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+                        .purpose.size = htonl (sizeof (ip)),
+                        .monotonic_time = sender_monotonic_time,
+                        .challenge = *challenge};
+
+  if (
+    GNUNET_OK !=
+    GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
+                                &ip.purpose,
+                                init_sig,
+                                &init->public_key))
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
@@ -4439,6 +6159,213 @@ validate_dv_initiator_signature (
 }
 
 
+/**
+ * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
+ */
+struct NeighbourSelectionContext
+{
+  /**
+   * Original message we received.
+   */
+  const struct TransportDVLearnMessage *dvl;
+
+  /**
+   * The hops taken.
+   */
+  const struct DVPathEntryP *hops;
+
+  /**
+   * Time we received the message.
+   */
+  struct GNUNET_TIME_Absolute in_time;
+
+  /**
+   * Offsets of the selected peers.
+   */
+  uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
+
+  /**
+   * Number of peers eligible for selection.
+   */
+  unsigned int num_eligible;
+
+  /**
+   * Number of peers that were selected for forwarding.
+   */
+  unsigned int num_selections;
+
+  /**
+   * Number of hops in @e hops
+   */
+  uint16_t nhops;
+
+  /**
+   * Bitmap of bidirectional connections encountered.
+   */
+  uint16_t bi_history;
+};
+
+
+/**
+ * Function called for each neighbour during #handle_dv_learn.
+ *
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
+ */
+static int
+dv_neighbour_selection (void *cls,
+                        const struct GNUNET_PeerIdentity *pid,
+                        void *value)
+{
+  struct NeighbourSelectionContext *nsc = cls;
+
+  (void) value;
+  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+    return GNUNET_YES; /* skip initiator */
+  for (unsigned int i = 0; i < nsc->nhops; i++)
+    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+      return GNUNET_YES; /* skip peers on path */
+  nsc->num_eligible++;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Function called for each neighbour during #handle_dv_learn.
+ * We call #forward_dv_learn() on the neighbour(s) selected
+ * during #dv_neighbour_selection().
+ *
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
+ */
+static int
+dv_neighbour_transmission (void *cls,
+                           const struct GNUNET_PeerIdentity *pid,
+                           void *value)
+{
+  struct NeighbourSelectionContext *nsc = cls;
+
+  (void) value;
+  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+    return GNUNET_YES; /* skip initiator */
+  for (unsigned int i = 0; i < nsc->nhops; i++)
+    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+      return GNUNET_YES; /* skip peers on path */
+  for (unsigned int i = 0; i < nsc->num_selections; i++)
+  {
+    if (nsc->selections[i] == nsc->num_eligible)
+    {
+      forward_dv_learn (pid,
+                        nsc->dvl,
+                        nsc->bi_history,
+                        nsc->nhops,
+                        nsc->hops,
+                        nsc->in_time);
+      break;
+    }
+  }
+  nsc->num_eligible++;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Computes the number of neighbours we should forward a DVInit
+ * message to given that it has so far taken @a hops_taken hops
+ * though the network and that the number of neighbours we have
+ * in total is @a neighbour_count, out of which @a eligible_count
+ * are not yet on the path.
+ *
+ * NOTE: technically we might want to include NSE in the formula to
+ * get a better grip on the overall network size. However, for now
+ * using NSE here would create a dependency issue in the build system.
+ * => Left for later, hardcoded to 50 for now.
+ *
+ * The goal of the fomula is that we want to reach a total of LOG(NSE)
+ * peers via DV (`target_total`).  We want the reach to be spread out
+ * over various distances to the origin, with a bias towards shorter
+ * distances.
+ *
+ * We make the strong assumption that the network topology looks
+ * "similar" at other hops, in particular the @a neighbour_count
+ * should be comparable at other hops.
+ *
+ * If the local neighbourhood is densely connected, we expect that @a
+ * eligible_count is close to @a neighbour_count minus @a hops_taken
+ * as a lot of the path is already known. In that case, we should
+ * forward to few(er) peers to try to find a path out of the
+ * neighbourhood. OTOH, if @a eligible_count is close to @a
+ * neighbour_count, we should forward to many peers as we are either
+ * still close to the origin (i.e.  @a hops_taken is small) or because
+ * we managed to get beyond a local cluster.  We express this as
+ * the `boost_factor` using the square of the fraction of eligible
+ * neighbours (so if only 50% are eligible, we boost by 1/4, but if
+ * 99% are eligible, the 'boost' will be almost 1).
+ *
+ * Second, the more hops we have taken, the larger the problem of an
+ * exponential traffic explosion gets.  So we take the `target_total`,
+ * and compute our degree such that at each distance d 2^{-d} peers
+ * are selected (corrected by the `boost_factor`).
+ *
+ * @param hops_taken number of hops DVInit has travelled so far
+ * @param neighbour_count number of neighbours we have in total
+ * @param eligible_count number of neighbours we could in
+ *        theory forward to
+ */
+static unsigned int
+calculate_fork_degree (unsigned int hops_taken,
+                       unsigned int neighbour_count,
+                       unsigned int eligible_count)
+{
+  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
+  double eligible_ratio =
+    ((double) eligible_count) / ((double) neighbour_count);
+  double boost_factor = eligible_ratio * eligible_ratio;
+  unsigned int rnd;
+  double left;
+
+  if (hops_taken >= 64)
+    return 0; /* precaution given bitshift below */
+  for (unsigned int i = 1; i < hops_taken; i++)
+  {
+    /* For each hop, subtract the expected number of targets
+       reached at distance d (so what remains divided by 2^d) */
+    target_total -= (target_total * boost_factor / (1LLU << i));
+  }
+  rnd =
+    (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
+  /* round up or down probabilistically depending on how close we were
+     when floor()ing to rnd */
+  left = target_total - (double) rnd;
+  if (UINT32_MAX * left >
+      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
+    rnd++; /* round up */
+  return rnd;
+}
+
+
+/**
+ * Function called when peerstore is done storing a DV monotonic time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param success #GNUNET_YES if peerstore was successful
+ */
+static void
+neighbour_store_dvmono_cb (void *cls, int success)
+{
+  struct Neighbour *n = cls;
+
+  n->sc = NULL;
+  if (GNUNET_YES != success)
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to store other peer's monotonic time in peerstore!\n");
+}
+
+
 /**
  * Communicator gave us a DV learn message.  Process the request.
  *
@@ -4447,7 +6374,7 @@ validate_dv_initiator_signature (
  * @param dvl the message that was received
  */
 static void
-handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
+handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
 {
   struct CommunicatorMessageContext *cmc = cls;
   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
@@ -4458,20 +6385,26 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
   int do_fwd;
   int did_initiator;
   struct GNUNET_TIME_Absolute in_time;
+  struct Neighbour *n;
 
   nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
   bi_history = ntohs (dvl->bidirectional);
-  hops = (const struct DVPathEntryP *)&dvl[1];
-  if (0 == nhops) {
+  hops = (const struct DVPathEntryP *) &dvl[1];
+  if (0 == nhops)
+  {
     /* sanity check */
-    if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender)) {
+    if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
+    {
       GNUNET_break (0);
       finish_cmc_handling (cmc);
       return;
     }
-  } else {
+  }
+  else
+  {
     /* sanity check */
-    if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender)) {
+    if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
+    {
       GNUNET_break (0);
       finish_cmc_handling (cmc);
       return;
@@ -4480,26 +6413,78 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
 
   GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
   cc = cmc->tc->details.communicator.cc;
-  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE
-            == cc); // FIXME: add bi-directional flag to cc?
+  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
+            cc); // FIXME: add bi-directional flag to cc?
   in_time = GNUNET_TIME_absolute_get ();
 
   /* continue communicator here, everything else can happen asynchronous! */
   finish_cmc_handling (cmc);
 
-  // FIXME: should we bother to verify _every_ DV initiator signature?
-  if (GNUNET_OK
-      != validate_dv_initiator_signature (&dvl->initiator,
-                                          &dvl->challenge,
-                                          &dvl->init_sig)) {
-    GNUNET_break_op (0);
-    return;
+  n = lookup_neighbour (&dvl->initiator);
+  if (NULL != n)
+  {
+    if ((n->dv_monotime_available == GNUNET_YES) &&
+        (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
+         n->last_dv_learn_monotime.abs_value_us))
+    {
+      GNUNET_STATISTICS_update (GST_stats,
+                                "# DV learn discarded due to time travel",
+                                1,
+                                GNUNET_NO);
+      return;
+    }
+    if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
+                                                      &dvl->initiator,
+                                                      &dvl->challenge,
+                                                      &dvl->init_sig))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
+    if (GNUNET_YES == n->dv_monotime_available)
+    {
+      if (NULL != n->sc)
+        GNUNET_PEERSTORE_store_cancel (n->sc);
+      n->sc =
+        GNUNET_PEERSTORE_store (peerstore,
+                                "transport",
+                                &dvl->initiator,
+                                GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+                                &dvl->monotonic_time,
+                                sizeof (dvl->monotonic_time),
+                                GNUNET_TIME_UNIT_FOREVER_ABS,
+                                GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                                &neighbour_store_dvmono_cb,
+                                n);
+    }
+  }
+  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
+     If signature verification load too high, implement random drop strategy */
+  for (unsigned int i = 0; i < nhops; i++)
+  {
+    struct DvHopPS dhp = {.purpose.purpose =
+                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                          .purpose.size = htonl (sizeof (dhp)),
+                          .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+                          .succ = (nhops - 1 == i) ? GST_my_identity
+                                                   : hops[i + 1].hop,
+                          .challenge = dvl->challenge};
+
+    if (GNUNET_OK !=
+        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
+                                    &dhp.purpose,
+                                    &hops[i].hop_sig,
+                                    &hops[i].hop.public_key))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
   }
-  // FIXME: asynchronously (!) verify hop-by-hop signatures!
-  // => if signature verification load too high, implement random drop strategy!
 
   do_fwd = GNUNET_YES;
-  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator)) {
+  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
+  {
     struct GNUNET_PeerIdentity path[nhops + 1];
     struct GNUNET_TIME_Relative host_latency_sum;
     struct GNUNET_TIME_Relative latency;
@@ -4519,49 +6504,64 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
     /* assumption: latency on all links is the same */
     network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
 
-    for (unsigned int i = 2; i <= nhops; i++) {
+    for (unsigned int i = 2; i <= nhops; i++)
+    {
       struct GNUNET_TIME_Relative ilat;
 
       /* assumption: linear latency increase per hop */
       ilat = GNUNET_TIME_relative_multiply (network_latency, i);
       path[i] = hops[i - 1].hop;
-      learn_dv_path (path, i, ilat);
+      learn_dv_path (path,
+                     i,
+                     ilat,
+                     GNUNET_TIME_relative_to_absolute (
+                       ADDRESS_VALIDATION_LIFETIME));
     }
     /* as we initiated, do not forward again (would be circular!) */
     do_fwd = GNUNET_NO;
     return;
-  } else if (bi_hop) {
+  }
+  else if (bi_hop)
+  {
     /* last hop was bi-directional, we could learn something here! */
     struct GNUNET_PeerIdentity path[nhops + 2];
 
     path[0] = GST_my_identity;
     path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
-    for (unsigned int i = 0; i < nhops; i++) {
+    for (unsigned int i = 0; i < nhops; i++)
+    {
       int iret;
 
       if (0 == (bi_history & (1 << i)))
         break; /* i-th hop not bi-directional, stop learning! */
-      if (i == nhops) {
+      if (i == nhops)
+      {
         path[i + 2] = dvl->initiator;
-      } else {
+      }
+      else
+      {
         path[i + 2] = hops[nhops - i - 2].hop;
       }
 
-      iret = learn_dv_path (path, i + 2, GNUNET_TIME_UNIT_FOREVER_REL);
-      if (GNUNET_SYSERR == iret) {
+      iret = learn_dv_path (path,
+                            i + 2,
+                            GNUNET_TIME_UNIT_FOREVER_REL,
+                            GNUNET_TIME_UNIT_ZERO_ABS);
+      if (GNUNET_SYSERR == iret)
+      {
         /* path invalid or too long to be interesting for US, thus should also
            not be interesting to our neighbours, cut path when forwarding to
            'i' hops, except of course for the one that goes back to the
            initiator */
-        GNUNET_STATISTICS_update (
-          GST_stats,
-          "# DV learn not forwarded due invalidity of path",
-          1,
-          GNUNET_NO);
+        GNUNET_STATISTICS_update (GST_stats,
+                                  "# DV learn not forwarded due invalidity of path",
+                                  1,
+                                  GNUNET_NO);
         do_fwd = GNUNET_NO;
         break;
       }
-      if ((GNUNET_NO == iret) && (nhops == i + 1)) {
+      if ((GNUNET_NO == iret) && (nhops == i + 1))
+      {
         /* we have better paths, and this is the longest target,
            so there cannot be anything interesting later */
         GNUNET_STATISTICS_update (GST_stats,
@@ -4574,7 +6574,8 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
     }
   }
 
-  if (MAX_DV_HOPS_ALLOWED == nhops) {
+  if (MAX_DV_HOPS_ALLOWED == nhops)
+  {
     /* At limit, we're out of here! */
     finish_cmc_handling (cmc);
     return;
@@ -4583,10 +6584,10 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
   /* Forward to initiator, if path non-trivial and possible */
   bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
   did_initiator = GNUNET_NO;
-  if ((1 < nhops)
-      && (GNUNET_YES
-          == GNUNET_CONTAINER_multipeermap_contains (neighbours,
-                                                     &dvl->initiator))) {
+  if ((1 < nhops) &&
+      (GNUNET_YES ==
+       GNUNET_CONTAINER_multipeermap_contains (neighbours, &dvl->initiator)))
+  {
     /* send back to origin! */
     forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
     did_initiator = GNUNET_YES;
@@ -4595,20 +6596,37 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
      ourselves (do_fwd), or the path was darn short and thus the initiator is
      likely to still be very interested in this (and we did NOT already
      send it back to the initiator) */
-  if ((do_fwd)
-      || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR)
-          && (GNUNET_NO == did_initiator))) {
-    /* FIXME: loop over all neighbours, pick those with low
-       queues AND that are not yet on the path; possibly
-       adapt threshold to nhops! */
-#if FIXME
-    forward_dv_learn (NULL, // fill in peer from iterator here!
-                      dvl,
-                      bi_history,
-                      nhops,
-                      hops,
-                      in_time);
-#endif
+  if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
+                   (GNUNET_NO == did_initiator)))
+  {
+    /* Pick random neighbours that are not yet on the path */
+    struct NeighbourSelectionContext nsc;
+    unsigned int n_cnt;
+
+    n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
+    nsc.nhops = nhops;
+    nsc.dvl = dvl;
+    nsc.bi_history = bi_history;
+    nsc.hops = hops;
+    nsc.in_time = in_time;
+    nsc.num_eligible = 0;
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &dv_neighbour_selection,
+                                           &nsc);
+    if (0 == nsc.num_eligible)
+      return; /* done here, cannot forward to anyone else */
+    nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
+    nsc.num_selections =
+      GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
+    for (unsigned int i = 0; i < nsc.num_selections; i++)
+      nsc.selections[i] =
+        (nsc.num_selections == n_cnt)
+          ? i /* all were selected, avoid collisions by chance */
+          : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
+    nsc.num_eligible = 0;
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &dv_neighbour_transmission,
+                                           &nsc);
   }
 }
 
@@ -4621,33 +6639,40 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_dv_box (void *cls, const struct TransportDVBox *dvb)
+check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
   uint16_t size = ntohs (dvb->header.size);
   uint16_t num_hops = ntohs (dvb->num_hops);
-  const struct GNUNET_PeerIdentity *hops
-    = (const struct GNUNET_PeerIdentity *)&dvb[1];
-  const struct GNUNET_MessageHeader *inbox
-    = (const struct GNUNET_MessageHeader *)&hops[num_hops];
+  const struct GNUNET_PeerIdentity *hops =
+    (const struct GNUNET_PeerIdentity *) &dvb[1];
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &hops[num_hops];
   uint16_t isize;
   uint16_t itype;
 
-  (void)cls;
-  if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity)
-               + sizeof (struct GNUNET_MessageHeader)) {
+  (void) cls;
+  if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) +
+               sizeof (struct GNUNET_MessageHeader))
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
   isize = ntohs (inbox->size);
-  if (size
-      != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity)
-           + isize) {
+  if (size !=
+      sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
   itype = ntohs (inbox->type);
-  if ((GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype)
-      || (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype)) {
+  if ((GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
+      (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (0 == GNUNET_memcmp (&dvb->origin, &GST_my_identity))
+  {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
@@ -4655,6 +6680,41 @@ check_dv_box (void *cls, const struct TransportDVBox *dvb)
 }
 
 
+/**
+ * Create a DV Box message and queue it for transmission to
+ * @ea next_hop.
+ *
+ * @param next_hop peer to receive the message next
+ * @param total_hops how many hops did the message take so far
+ * @param num_hops length of the @a hops array
+ * @param origin origin of the message
+ * @param hops next peer(s) to the destination, including destination
+ * @param payload payload of the box
+ * @param payload_size number of bytes in @a payload
+ */
+static void
+forward_dv_box (struct Neighbour *next_hop,
+                uint16_t total_hops,
+                uint16_t num_hops,
+                const struct GNUNET_PeerIdentity *origin,
+                const struct GNUNET_PeerIdentity *hops,
+                const void *payload,
+                uint16_t payload_size)
+{
+  struct TransportDVBoxMessage *dvb;
+
+  dvb = create_dv_box (total_hops,
+                       origin,
+                       &hops[num_hops - 1] /* == target */,
+                       num_hops - 1 /* do not count target twice */,
+                       hops,
+                       payload,
+                       payload_size);
+  route_message (&next_hop->pid, &dvb->header, RMO_NONE);
+  GNUNET_free (dvb);
+}
+
+
 /**
  * Communicator gave us a DV box.  Process the request.
  *
@@ -4663,21 +6723,48 @@ check_dv_box (void *cls, const struct TransportDVBox *dvb)
  * @param dvb the message that was received
  */
 static void
-handle_dv_box (void *cls, const struct TransportDVBox *dvb)
+handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
   struct CommunicatorMessageContext *cmc = cls;
   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
   uint16_t num_hops = ntohs (dvb->num_hops);
-  const struct GNUNET_PeerIdentity *hops
-    = (const struct GNUNET_PeerIdentity *)&dvb[1];
-  const struct GNUNET_MessageHeader *inbox
-    = (const struct GNUNET_MessageHeader *)&hops[num_hops];
-
-  if (num_hops > 0) {
-    // FIXME: if we are not the target, shorten path and forward along.
-    // Try from the _end_ of hops array if we know the given
-    // neighbour (shortening the path!).
-    // NOTE: increment total_hops!
+  const struct GNUNET_PeerIdentity *hops =
+    (const struct GNUNET_PeerIdentity *) &dvb[1];
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &hops[num_hops];
+
+  if (num_hops > 0)
+  {
+    /* We're trying from the end of the hops array, as we may be
+       able to find a shortcut unknown to the origin that way */
+    for (int i = num_hops - 1; i >= 0; i--)
+    {
+      struct Neighbour *n;
+
+      if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
+      {
+        GNUNET_break_op (0);
+        finish_cmc_handling (cmc);
+        return;
+      }
+      n = lookup_neighbour (&hops[i]);
+      if (NULL == n)
+        continue;
+      forward_dv_box (n,
+                      ntohs (dvb->total_hops) + 1,
+                      num_hops - i - 1, /* number of hops left */
+                      &dvb->origin,
+                      &hops[i + 1], /* remaining hops */
+                      (const void *) &dvb[1],
+                      size);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    /* Woopsie, next hop not in neighbours, drop! */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# DV Boxes dropped: next hop unknown",
+                              1,
+                              GNUNET_NO);
     finish_cmc_handling (cmc);
     return;
   }
@@ -4701,7 +6788,8 @@ check_incoming_msg (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -4719,39 +6807,42 @@ check_incoming_msg (void *cls,
  * @param tvc the message that was received
  */
 static void
-handle_validation_challenge (void *cls,
-                             const struct TransportValidationChallenge *tvc)
+handle_validation_challenge (
+  void *cls,
+  const struct TransportValidationChallengeMessage *tvc)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct TransportValidationResponse *tvr;
+  struct TransportValidationResponseMessage *tvr;
 
-  if (cmc->total_hops > 0) {
+  if (cmc->total_hops > 0)
+  {
     /* DV routing is not allowed for validation challenges! */
     GNUNET_break_op (0);
     finish_cmc_handling (cmc);
     return;
   }
-  tvr = GNUNET_new (struct TransportValidationResponse);
-  tvr->header.type
-    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
+  tvr = GNUNET_new (struct TransportValidationResponseMessage);
+  tvr->header.type =
+    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
   tvr->header.size = htons (sizeof (*tvr));
   tvr->challenge = tvc->challenge;
   tvr->origin_time = tvc->sender_time;
   tvr->validity_duration = cmc->im.expected_address_validity;
   {
     /* create signature */
-    struct TransportValidationPS tvp = {
-      .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
-      .purpose.size = htonl (sizeof (tvp)),
-      .validity_duration = tvr->validity_duration,
-      .challenge = tvc->challenge};
-
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                                &tvp.purpose,
-                                                &tvr->signature));
-  }
-  route_message (&cmc->im.sender, &tvr->header);
+    struct TransportValidationPS tvp =
+      {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+       .purpose.size = htonl (sizeof (tvp)),
+       .validity_duration = tvr->validity_duration,
+       .challenge = tvc->challenge};
+
+    GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                                          &tvp.purpose,
+                                                          &tvr->signature));
+  }
+  route_message (&cmc->im.sender,
+                 &tvr->header,
+                 RMO_ANYTHING_GOES | RMO_REDUNDANT);
   finish_cmc_handling (cmc);
 }
 
@@ -4764,7 +6855,7 @@ struct CheckKnownChallengeContext
   /**
    * Set to the challenge we are looking for.
    */
-  const struct GNUNET_ShortHashCode *challenge;
+  const struct ChallengeNonceP *challenge;
 
   /**
    * Set to a matching validation state, if one was found.
@@ -4790,7 +6881,7 @@ check_known_challenge (void *cls,
   struct CheckKnownChallengeContext *ckac = cls;
   struct ValidationState *vs = value;
 
-  (void)pid;
+  (void) pid;
   if (0 != GNUNET_memcmp (&vs->challenge, ckac->challenge))
     return GNUNET_OK;
   ckac->vs = vs;
@@ -4846,23 +6937,47 @@ update_next_challenge_time (struct ValidationState *vs,
     return; /* be lazy */
   vs->next_challenge = new_time;
   if (NULL == vs->hn)
-    vs->hn = GNUNET_CONTAINER_heap_insert (validation_heap,
-                                           vs,
-                                           new_time.abs_value_us);
+    vs->hn =
+      GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
   else
     GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
-  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap))
-      && (NULL != validation_task))
+  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
+      (NULL != validation_task))
     return;
   if (NULL != validation_task)
     GNUNET_SCHEDULER_cancel (validation_task);
   /* randomize a bit */
-  delta.rel_value_us
-    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
+  delta.rel_value_us =
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                              MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
   new_time = GNUNET_TIME_absolute_add (new_time, delta);
-  validation_task
-    = GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
+  validation_task =
+    GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
+}
+
+
+/**
+ * Find the queue matching @a pid and @a address.
+ *
+ * @param pid peer the queue must go to
+ * @param address address the queue must use
+ * @return NULL if no such queue exists
+ */
+static struct Queue *
+find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
+{
+  struct Neighbour *n;
+
+  n = lookup_neighbour (pid);
+  if (NULL == n)
+    return NULL;
+  for (struct Queue *pos = n->queue_head; NULL != pos;
+       pos = pos->next_neighbour)
+  {
+    if (0 == strcmp (pos->address, address))
+      return pos;
+  }
+  return NULL;
 }
 
 
@@ -4875,21 +6990,26 @@ update_next_challenge_time (struct ValidationState *vs,
  * @param tvr the message that was received
  */
 static void
-handle_validation_response (void *cls,
-                            const struct TransportValidationResponse *tvr)
+handle_validation_response (
+  void *cls,
+  const struct TransportValidationResponseMessage *tvr)
 {
   struct CommunicatorMessageContext *cmc = cls;
   struct ValidationState *vs;
-  struct CheckKnownChallengeContext ckac
-    = {.challenge = &tvr->challenge, .vs = NULL};
+  struct CheckKnownChallengeContext ckac = {.challenge = &tvr->challenge,
+                                            .vs = NULL};
   struct GNUNET_TIME_Absolute origin_time;
+  struct Queue *q;
+  struct Neighbour *n;
+  struct VirtualLink *vl;
 
   /* check this is one of our challenges */
-  (void)GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                                    &cmc->im.sender,
-                                                    &check_known_challenge,
-                                                    &ckac);
-  if (NULL == (vs = ckac.vs)) {
+  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                                     &cmc->im.sender,
+                                                     &check_known_challenge,
+                                                     &ckac);
+  if (NULL == (vs = ckac.vs))
+  {
     /* This can happen simply if we 'forgot' the challenge by now,
        i.e. because we received the validation response twice */
     GNUNET_STATISTICS_update (GST_stats,
@@ -4902,8 +7022,9 @@ handle_validation_response (void *cls,
 
   /* sanity check on origin time */
   origin_time = GNUNET_TIME_absolute_ntoh (tvr->origin_time);
-  if ((origin_time.abs_value_us < vs->first_challenge_use.abs_value_us)
-      || (origin_time.abs_value_us > vs->last_challenge_use.abs_value_us)) {
+  if ((origin_time.abs_value_us < vs->first_challenge_use.abs_value_us) ||
+      (origin_time.abs_value_us > vs->last_challenge_use.abs_value_us))
+  {
     GNUNET_break_op (0);
     finish_cmc_handling (cmc);
     return;
@@ -4911,18 +7032,19 @@ handle_validation_response (void *cls,
 
   {
     /* check signature */
-    struct TransportValidationPS tvp = {
-      .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
-      .purpose.size = htonl (sizeof (tvp)),
-      .validity_duration = tvr->validity_duration,
-      .challenge = tvr->challenge};
-
-    if (GNUNET_OK
-        != GNUNET_CRYPTO_eddsa_verify (
-          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
-          &tvp.purpose,
-          &tvr->signature,
-          &cmc->im.sender.public_key)) {
+    struct TransportValidationPS tvp =
+      {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+       .purpose.size = htonl (sizeof (tvp)),
+       .validity_duration = tvr->validity_duration,
+       .challenge = tvr->challenge};
+
+    if (
+      GNUNET_OK !=
+      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
+                                  &tvp.purpose,
+                                  &tvr->signature,
+                                  &cmc->im.sender.public_key))
+    {
       GNUNET_break_op (0);
       finish_cmc_handling (cmc);
       return;
@@ -4931,12 +7053,14 @@ handle_validation_response (void *cls,
 
   /* validity is capped by our willingness to keep track of the
      validation entry and the maximum the other peer allows */
-  vs->valid_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_min (
-    GNUNET_TIME_relative_ntoh (tvr->validity_duration),
-    MAX_ADDRESS_VALID_UNTIL));
-  vs->validated_until = GNUNET_TIME_absolute_min (
-    vs->valid_until,
-    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME));
+  vs->valid_until = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_min (GNUNET_TIME_relative_ntoh (
+                                tvr->validity_duration),
+                              MAX_ADDRESS_VALID_UNTIL));
+  vs->validated_until =
+    GNUNET_TIME_absolute_min (vs->valid_until,
+                              GNUNET_TIME_relative_to_absolute (
+                                ADDRESS_VALIDATION_LIFETIME));
   vs->validation_rtt = GNUNET_TIME_absolute_get_duration (origin_time);
   vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
@@ -4946,8 +7070,8 @@ handle_validation_response (void *cls,
     vs->validated_until,
     GNUNET_TIME_relative_multiply (vs->validation_rtt,
                                    VALIDATION_RTT_BUFFER_FACTOR));
-  vs->last_challenge_use
-    GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
+  vs->last_challenge_use =
+    GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
   update_next_challenge_time (vs, vs->first_challenge_use);
   vs->sc = GNUNET_PEERSTORE_store (peerstore,
                                    "transport",
@@ -4959,8 +7083,44 @@ handle_validation_response (void *cls,
                                    GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
                                    &peerstore_store_validation_cb,
                                    vs);
-  // FIXME: should we find the matching queue and update the RTT?
   finish_cmc_handling (cmc);
+
+  /* Finally, we now possibly have a confirmed (!) working queue,
+     update queue status (if queue still is around) */
+  q = find_queue (&vs->pid, vs->address);
+  if (NULL == q)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Queues lost at time of successful validation",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+  q->validated_until = vs->validated_until;
+  q->pd.aged_rtt = vs->validation_rtt;
+  n = q->neighbour;
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+  if (NULL != vl)
+  {
+    /* Link was already up, remember n is also now available and we are done */
+    vl->n = n;
+    return;
+  }
+  vl = GNUNET_new (struct VirtualLink);
+  vl->target = n->pid;
+  vl->n = n;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
+     before, so tell CORE about it (finally!) */
+  cores_send_connect_info (&n->pid);
 }
 
 
@@ -4974,12 +7134,12 @@ handle_incoming_msg (void *cls,
                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
   struct TransportClient *tc = cls;
-  struct CommunicatorMessageContext *cmc
-    GNUNET_new (struct CommunicatorMessageContext);
+  struct CommunicatorMessageContext *cmc =
+    GNUNET_new (struct CommunicatorMessageContext);
 
   cmc->tc = tc;
   cmc->im = *im;
-  demultiplex_with_cmc (cmc, (const struct GNUNET_MessageHeader *)&im[1]);
+  demultiplex_with_cmc (cmc, (const struct GNUNET_MessageHeader *) &im[1]);
 }
 
 
@@ -4994,57 +7154,54 @@ static void
 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_MQ_MessageHandler handlers[]
-    = {GNUNET_MQ_hd_var_size (fragment_box,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
-                              struct TransportFragmentBox,
-                              &cmc),
-       GNUNET_MQ_hd_fixed_size (fragment_ack,
-                                GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
-                                struct TransportFragmentAckMessage,
-                                &cmc),
-       GNUNET_MQ_hd_var_size (reliability_box,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
-                              struct TransportReliabilityBox,
-                              &cmc),
-       GNUNET_MQ_hd_fixed_size (reliability_ack,
-                                GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
-                                struct TransportReliabilityAckMessage,
-                                &cmc),
-       GNUNET_MQ_hd_var_size (
-         backchannel_encapsulation,
-         GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
-         struct TransportBackchannelEncapsulationMessage,
-         &cmc),
-       GNUNET_MQ_hd_var_size (dv_learn,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
-                              struct TransportDVLearn,
-                              &cmc),
-       GNUNET_MQ_hd_var_size (dv_box,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
-                              struct TransportDVBox,
-                              &cmc),
-       GNUNET_MQ_hd_fixed_size (
-         validation_challenge,
-         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
-         struct TransportValidationChallenge,
-         &cmc),
-       GNUNET_MQ_hd_fixed_size (
-         validation_response,
-         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
-         struct TransportValidationResponse,
-         &cmc),
-       GNUNET_MQ_handler_end ()};
+  struct GNUNET_MQ_MessageHandler handlers[] =
+    {GNUNET_MQ_hd_var_size (fragment_box,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+                            struct TransportFragmentBoxMessage,
+                            &cmc),
+     GNUNET_MQ_hd_var_size (reliability_box,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+                            struct TransportReliabilityBoxMessage,
+                            &cmc),
+     GNUNET_MQ_hd_var_size (reliability_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+                            struct TransportReliabilityAckMessage,
+                            &cmc),
+     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+                            struct TransportBackchannelEncapsulationMessage,
+                            &cmc),
+     GNUNET_MQ_hd_var_size (dv_learn,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+                            struct TransportDVLearnMessage,
+                            &cmc),
+     GNUNET_MQ_hd_var_size (dv_box,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+                            struct TransportDVBoxMessage,
+                            &cmc),
+     GNUNET_MQ_hd_fixed_size (
+       validation_challenge,
+       GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
+       struct TransportValidationChallengeMessage,
+       &cmc),
+     GNUNET_MQ_hd_fixed_size (
+       validation_response,
+       GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
+       struct TransportValidationResponseMessage,
+       &cmc),
+     GNUNET_MQ_handler_end ()};
   int ret;
 
   ret = GNUNET_MQ_handle_message (handlers, msg);
-  if (GNUNET_SYSERR == ret) {
+  if (GNUNET_SYSERR == ret)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (cmc->tc->client);
     GNUNET_free (cmc);
     return;
   }
-  if (GNUNET_NO == ret) {
+  if (GNUNET_NO == ret)
+  {
     /* unencapsulated 'raw' message */
     handle_raw_message (&cmc, msg);
   }
@@ -5063,7 +7220,8 @@ check_add_queue_message (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -5072,25 +7230,6 @@ check_add_queue_message (void *cls,
 }
 
 
-/**
- * Bandwidth tracker informs us that the delay until we should receive
- * more has changed.
- *
- * @param cls a `struct Queue` for which the delay changed
- */
-static void
-tracker_update_in_cb (void *cls)
-{
-  struct Queue *queue = cls;
-  struct GNUNET_TIME_Relative in_delay;
-  unsigned int rsize;
-
-  rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
-  in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, rsize);
-  // FIXME: how exactly do we do inbound flow control?
-}
-
-
 /**
  * If necessary, generates the UUID for a @a pm
  *
@@ -5108,39 +7247,90 @@ set_pending_message_uuid (struct PendingMessage *pm)
 }
 
 
+/**
+ * Setup data structure waiting for acknowledgements.
+ *
+ * @param queue queue the @a pm will be sent over
+ * @param dvh path the message will take, may be NULL
+ * @param pm the pending message for transmission
+ * @return corresponding fresh pending acknowledgement
+ */
+static struct PendingAcknowledgement *
+prepare_pending_acknowledgement (struct Queue *queue,
+                                 struct DistanceVectorHop *dvh,
+                                 struct PendingMessage *pm)
+{
+  struct PendingAcknowledgement *pa;
+
+  pa = GNUNET_new (struct PendingAcknowledgement);
+  pa->queue = queue;
+  pa->dvh = dvh;
+  pa->pm = pm;
+  do
+  {
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                                &pa->ack_uuid,
+                                sizeof (pa->ack_uuid));
+  } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
+                           pending_acks,
+                           &pa->ack_uuid.value,
+                           pa,
+                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
+  GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
+  if (NULL != dvh)
+    GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
+  pa->transmission_time = GNUNET_TIME_absolute_get ();
+  pa->message_size = pm->bytes_msg;
+  return pa;
+}
+
+
 /**
  * Fragment the given @a pm to the given @a mtu.  Adds
  * additional fragments to the neighbour as well. If the
  * @a mtu is too small, generates and error for the @a pm
  * and returns NULL.
  *
+ * @param queue which queue to fragment for
+ * @param dvh path the message will take, or NULL
  * @param pm pending message to fragment for transmission
- * @param mtu MTU to apply
  * @return new message to transmit
  */
 static struct PendingMessage *
-fragment_message (struct PendingMessage *pm, uint16_t mtu)
+fragment_message (struct Queue *queue,
+                  struct DistanceVectorHop *dvh,
+                  struct PendingMessage *pm)
 {
+  struct PendingAcknowledgement *pa;
   struct PendingMessage *ff;
+  uint16_t mtu;
 
+  pa = prepare_pending_acknowledgement (queue, dvh, pm);
+  mtu = (0 == queue->mtu)
+          ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
+          : queue->mtu;
   set_pending_message_uuid (pm);
 
   /* This invariant is established in #handle_add_queue_message() */
-  GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
+  GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
 
   /* select fragment for transmission, descending the tree if it has
-     been expanded until we are at a leaf or at a fragment that is small enough
+     been expanded until we are at a leaf or at a fragment that is small
+     enough
    */
   ff = pm;
-  while (((ff->bytes_msg > mtu) || (pm == ff))
-         && (ff->frag_off == ff->bytes_msg) && (NULL != ff->head_frag)) {
+  while (((ff->bytes_msg > mtu) || (pm == ff)) &&
+         (ff->frag_off == ff->bytes_msg) && (NULL != ff->head_frag))
+  {
     ff = ff->head_frag; /* descent into fragmented fragments */
   }
 
-  if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg)) {
+  if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg))
+  {
     /* Did not yet calculate all fragments, calculate next fragment */
     struct PendingMessage *frag;
-    struct TransportFragmentBox tfb;
+    struct TransportFragmentBoxMessage tfb;
     const char *orig;
     char *msg;
     uint16_t fragmax;
@@ -5148,29 +7338,32 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
     uint16_t msize;
     uint16_t xoff = 0;
 
-    orig = (const char *)&ff[1];
+    orig = (const char *) &ff[1];
     msize = ff->bytes_msg;
-    if (pm != ff) {
-      const struct TransportFragmentBox *tfbo;
+    if (pm != ff)
+    {
+      const struct TransportFragmentBoxMessage *tfbo;
 
-      tfbo = (const struct TransportFragmentBox *)orig;
-      orig += sizeof (struct TransportFragmentBox);
-      msize -= sizeof (struct TransportFragmentBox);
+      tfbo = (const struct TransportFragmentBoxMessage *) orig;
+      orig += sizeof (struct TransportFragmentBoxMessage);
+      msize -= sizeof (struct TransportFragmentBoxMessage);
       xoff = ntohs (tfbo->frag_off);
     }
-    fragmax = mtu - sizeof (struct TransportFragmentBox);
+    fragmax = mtu - sizeof (struct TransportFragmentBoxMessage);
     fragsize = GNUNET_MIN (msize - ff->frag_off, fragmax);
-    frag = GNUNET_malloc (sizeof (struct PendingMessage)
-                          + sizeof (struct TransportFragmentBox) + fragsize);
+    frag =
+      GNUNET_malloc (sizeof (struct PendingMessage) +
+                     sizeof (struct TransportFragmentBoxMessage) + fragsize);
     frag->target = pm->target;
     frag->frag_parent = ff;
     frag->timeout = pm->timeout;
-    frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
+    frag->bytes_msg = sizeof (struct TransportFragmentBoxMessage) + fragsize;
     frag->pmt = PMT_FRAGMENT_BOX;
-    msg = (char *)&frag[1];
+    msg = (char *) &frag[1];
     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
-    tfb.header.size = htons (sizeof (struct TransportFragmentBox) + fragsize);
-    tfb.frag_uuid = htonl (pm->frag_uuidgen++);
+    tfb.header.size =
+      htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
+    tfb.ack_uuid = pa->ack_uuid;
     tfb.msg_uuid = pm->msg_uuid;
     tfb.frag_off = htons (ff->frag_off + xoff);
     tfb.msg_size = htons (pm->bytes_msg);
@@ -5201,13 +7394,18 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
  * @a pm).  If the @a pm is already fragmented or reliability boxed,
  * or itself an ACK, this function simply returns @a pm.
  *
+ * @param queue which queue to prepare transmission for
+ * @param dvh path the message will take, or NULL
  * @param pm pending message to box for transmission over unreliabile queue
  * @return new message to transmit
  */
 static struct PendingMessage *
-reliability_box_message (struct PendingMessage *pm)
+reliability_box_message (struct Queue *queue,
+                         struct DistanceVectorHop *dvh,
+                         struct PendingMessage *pm)
 {
-  struct TransportReliabilityBox rbox;
+  struct TransportReliabilityBoxMessage rbox;
+  struct PendingAcknowledgement *pa;
   struct PendingMessage *bpm;
   char *msg;
 
@@ -5217,14 +7415,17 @@ reliability_box_message (struct PendingMessage *pm)
   if (NULL != pm->bpm)
     return pm->bpm; /* already computed earlier: do nothing */
   GNUNET_assert (NULL == pm->head_frag);
-  if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX) {
+  if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
+  {
     /* failed hard */
     GNUNET_break (0);
-    client_send_response (pm, GNUNET_NO, 0);
+    client_send_response (pm);
     return NULL;
   }
-  bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox)
-                       + pm->bytes_msg);
+  pa = prepare_pending_acknowledgement (queue, dvh, pm);
+
+  bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
+                       pm->bytes_msg);
   bpm->target = pm->target;
   bpm->frag_parent = pm;
   GNUNET_CONTAINER_MDLL_insert (frag, pm->head_frag, pm->tail_frag, bpm);
@@ -5235,8 +7436,9 @@ reliability_box_message (struct PendingMessage *pm)
   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
-  rbox.msg_uuid = pm->msg_uuid;
-  msg = (char *)&bpm[1];
+
+  rbox.ack_uuid = pa->ack_uuid;
+  msg = (char *) &bpm[1];
   memcpy (msg, &rbox, sizeof (rbox));
   memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
   pm->bpm = bpm;
@@ -5245,8 +7447,62 @@ reliability_box_message (struct PendingMessage *pm)
 
 
 /**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
+ * Change the value of the `next_attempt` field of @a pm
+ * to @a next_attempt and re-order @a pm in the transmission
+ * list as required by the new timestmap.
+ *
+ * @param pm a pending message to update
+ * @param next_attempt timestamp to use
+ */
+static void
+update_pm_next_attempt (struct PendingMessage *pm,
+                        struct GNUNET_TIME_Absolute next_attempt)
+{
+  struct Neighbour *neighbour = pm->target;
+
+  pm->next_attempt = next_attempt;
+  if (NULL == pm->frag_parent)
+  {
+    struct PendingMessage *pos;
+
+    /* re-insert sort in neighbour list */
+    GNUNET_CONTAINER_MDLL_remove (neighbour,
+                                  neighbour->pending_msg_head,
+                                  neighbour->pending_msg_tail,
+                                  pm);
+    pos = neighbour->pending_msg_tail;
+    while ((NULL != pos) &&
+           (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+      pos = pos->prev_neighbour;
+    GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+                                        neighbour->pending_msg_head,
+                                        neighbour->pending_msg_tail,
+                                        pos,
+                                        pm);
+  }
+  else
+  {
+    /* re-insert sort in fragment list */
+    struct PendingMessage *fp = pm->frag_parent;
+    struct PendingMessage *pos;
+
+    GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm);
+    pos = fp->tail_frag;
+    while ((NULL != pos) &&
+           (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+      pos = pos->prev_frag;
+    GNUNET_CONTAINER_MDLL_insert_after (frag,
+                                        fp->head_frag,
+                                        fp->tail_frag,
+                                        pos,
+                                        pm);
+  }
+}
+
+
+/**
+ * We believe we are ready to transmit a message on a queue.
+ * Gives the message to the
  * communicator for transmission (updating the tracker, and re-scheduling
  * itself if applicable).
  *
@@ -5260,20 +7516,25 @@ transmit_on_queue (void *cls)
   struct PendingMessage *pm;
   struct PendingMessage *s;
   uint32_t overhead;
-  struct GNUNET_TRANSPORT_SendMessageTo *smt;
-  struct GNUNET_MQ_Envelope *env;
 
   queue->transmit_task = NULL;
-  if (NULL == (pm = n->pending_msg_head)) {
+  if (NULL == (pm = n->pending_msg_head))
+  {
     /* no message pending, nothing to do here! */
     return;
   }
-  schedule_transmit_on_queue (queue);
+  if (NULL != pm->qe)
+  {
+    /* message still pending with communciator!
+       LOGGING-FIXME: Use stats? logging? Should this not be rare? */
+    return;
+  }
+  schedule_transmit_on_queue (queue, GNUNET_YES);
   if (NULL != queue->transmit_task)
     return; /* do it later */
   overhead = 0;
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    overhead += sizeof (struct TransportReliabilityBox);
+    overhead += sizeof (struct TransportReliabilityBoxMessage);
   s = pm;
   if ( ( (0 != queue->mtu) &&
         (pm->bytes_msg + overhead > queue->mtu) ) ||
@@ -5281,59 +7542,40 @@ transmit_on_queue (void *cls)
        (NULL != pm->head_frag /* fragments already exist, should
                                 respect that even if MTU is 0 for
                                 this queue */) )
-    s = fragment_message (s,
-                          (0 == queue->mtu)
-                            ? UINT16_MAX
-                                - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
-                            : queue->mtu);
-  if (NULL == s) {
+    s = fragment_message (queue, pm->dvh, s);
+  if (NULL == s)
+  {
     /* Fragmentation failed, try next message... */
-    schedule_transmit_on_queue (queue);
+    schedule_transmit_on_queue (queue, GNUNET_NO);
     return;
   }
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    s = reliability_box_message (s);
-  if (NULL == s) {
+    // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
+    s = reliability_box_message (queue, pm->dvh, s);
+  if (NULL == s)
+  {
     /* Reliability boxing failed, try next message... */
-    schedule_transmit_on_queue (queue);
+    schedule_transmit_on_queue (queue, GNUNET_NO);
     return;
   }
 
   /* Pass 's' for transission to the communicator */
-  env = GNUNET_MQ_msg_extra (smt,
-                             s->bytes_msg,
-                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
-  smt->qid = queue->qid;
-  smt->mid = queue->mid_gen;
-  smt->receiver = n->pid;
-  memcpy (&smt[1], &s[1], s->bytes_msg);
-  {
-    /* Pass the env to the communicator of queue for transmission. */
-    struct QueueEntry *qe;
-
-    qe = GNUNET_new (struct QueueEntry);
-    qe->mid = queue->mid_gen++;
-    qe->queue = queue;
-    // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
-    GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
-    GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
-    queue->queue_length++;
-    queue->tc->details.communicator.total_queue_length++;
-    GNUNET_MQ_send (queue->tc->mq, env);
-  }
-
+  queue_send_msg (queue, s, &s[1], s->bytes_msg);
   // FIXME: do something similar to the logic below
   // in defragmentation / reliability ACK handling!
 
   /* Check if this transmission somehow conclusively finished handing 'pm'
      even without any explicit ACKs */
-  if ((PMT_CORE == s->pmt)
-      && (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) {
+  if ((PMT_CORE == s->pmt) &&
+      (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
+  {
     /* Full message sent, and over reliabile channel */
-    client_send_response (pm, GNUNET_YES, pm->bytes_msg);
-  } else if ((GNUNET_TRANSPORT_CC_RELIABLE
-              == queue->tc->details.communicator.cc)
-             && (PMT_FRAGMENT_BOX == s->pmt)) {
+    client_send_response (pm);
+  }
+  else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
+            queue->tc->details.communicator.cc) &&
+           (PMT_FRAGMENT_BOX == s->pmt))
+  {
     struct PendingMessage *pos;
 
     /* Fragment sent over reliabile channel */
@@ -5342,8 +7584,9 @@ transmit_on_queue (void *cls)
     GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
     GNUNET_free (s);
     /* check if subtree is done */
-    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg)
-           && (pos != pm)) {
+    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
+           (pos != pm))
+    {
       s = pos;
       pos = s->frag_parent;
       GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
@@ -5352,130 +7595,31 @@ transmit_on_queue (void *cls)
 
     /* Was this the last applicable fragmment? */
     if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (
-        pm,
-        GNUNET_YES,
-        pm->bytes_msg /* FIXME: calculate and add overheads! */);
-  } else if (PMT_CORE != pm->pmt) {
+      client_send_response (pm);
+  }
+  else if (PMT_CORE != pm->pmt)
+  {
     /* This was an acknowledgement of some type, always free */
     free_pending_message (pm);
-  } else {
-    /* message not finished, waiting for acknowledgement */
-    struct Neighbour *neighbour = pm->target;
-    /* Update time by which we might retransmit 's' based on queue
+  }
+  else
+  {
+    /* Message not finished, waiting for acknowledgement.
+       Update time by which we might retransmit 's' based on queue
        characteristics (i.e. RTT); it takes one RTT for the message to
        arrive and the ACK to come back in the best case; but the other
        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
        retransmitting.  Note that in the future this heuristic should
        likely be improved further (measure RTT stability, consider
        message urgency and size when delaying ACKs, etc.) */
-    s->next_attempt = GNUNET_TIME_relative_to_absolute (
-      GNUNET_TIME_relative_multiply (queue->rtt, 4));
-    if (s == pm) {
-      struct PendingMessage *pos;
-
-      /* re-insert sort in neighbour list */
-      GNUNET_CONTAINER_MDLL_remove (neighbour,
-                                    neighbour->pending_msg_head,
-                                    neighbour->pending_msg_tail,
-                                    pm);
-      pos = neighbour->pending_msg_tail;
-      while (
-        (NULL != pos)
-        && (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
-        pos = pos->prev_neighbour;
-      GNUNET_CONTAINER_MDLL_insert_after (neighbour,
-                                          neighbour->pending_msg_head,
-                                          neighbour->pending_msg_tail,
-                                          pos,
-                                          pm);
-    } else {
-      /* re-insert sort in fragment list */
-      struct PendingMessage *fp = s->frag_parent;
-      struct PendingMessage *pos;
-
-      GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s);
-      pos = fp->tail_frag;
-      while ((NULL != pos)
-             && (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
-        pos = pos->prev_frag;
-      GNUNET_CONTAINER_MDLL_insert_after (frag,
-                                          fp->head_frag,
-                                          fp->tail_frag,
-                                          pos,
-                                          s);
-    }
+    update_pm_next_attempt (s,
+                            GNUNET_TIME_relative_to_absolute (
+                              GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
+                                                             4)));
   }
 
   /* finally, re-schedule queue transmission task itself */
-  schedule_transmit_on_queue (queue);
-}
-
-
-/**
- * Bandwidth tracker informs us that the delay until we
- * can transmit again changed.
- *
- * @param cls a `struct Queue` for which the delay changed
- */
-static void
-tracker_update_out_cb (void *cls)
-{
-  struct Queue *queue = cls;
-  struct Neighbour *n = queue->neighbour;
-
-  if (NULL == n->pending_msg_head) {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Bandwidth allocation updated for empty transmission queue `%s'\n",
-      queue->address);
-    return; /* no message pending, nothing to do here! */
-  }
-  GNUNET_SCHEDULER_cancel (queue->transmit_task);
-  queue->transmit_task = NULL;
-  schedule_transmit_on_queue (queue);
-}
-
-
-/**
- * Bandwidth tracker informs us that excessive outbound bandwidth was
- * allocated which is not being used.
- *
- * @param cls a `struct Queue` for which the excess was noted
- */
-static void
-tracker_excess_out_cb (void *cls)
-{
-  (void)cls;
-
-  /* FIXME: trigger excess bandwidth report to core? Right now,
-     this is done internally within transport_api2_core already,
-     but we probably want to change the logic and trigger it
-     from here via a message instead! */
-  /* TODO: maybe inform someone at this point? */
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# Excess outbound bandwidth reported",
-                            1,
-                            GNUNET_NO);
-}
-
-
-/**
- * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
- * which is not being used.
- *
- * @param cls a `struct Queue` for which the excess was noted
- */
-static void
-tracker_excess_in_cb (void *cls)
-{
-  (void)cls;
-
-  /* TODO: maybe inform somone at this point? */
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# Excess inbound bandwidth reported",
-                            1,
-                            GNUNET_NO);
+  schedule_transmit_on_queue (queue, GNUNET_NO);
 }
 
 
@@ -5491,17 +7635,19 @@ handle_del_queue_message (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
   for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue;
-       queue = queue->next_client) {
+       queue = queue->next_client)
+  {
     struct Neighbour *neighbour = queue->neighbour;
 
-    if ((dqm->qid != queue->qid)
-        || (0 != GNUNET_memcmp (&dqm->receiver, &neighbour->pid)))
+    if ((dqm->qid != queue->qid) ||
+        (0 != GNUNET_memcmp (&dqm->receiver, &neighbour->pid)))
       continue;
     free_queue (queue);
     GNUNET_SERVICE_client_continue (tc->client);
@@ -5524,8 +7670,10 @@ handle_send_message_ack (void *cls,
 {
   struct TransportClient *tc = cls;
   struct QueueEntry *qe;
+  struct PendingMessage *pm;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
@@ -5534,11 +7682,13 @@ handle_send_message_ack (void *cls,
   /* find our queue entry matching the ACK */
   qe = NULL;
   for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue;
-       queue = queue->next_client) {
+       queue = queue->next_client)
+  {
     if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
       continue;
     for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
-         qep = qep->next) {
+         qep = qep->next)
+    {
       if (qep->mid != sma->mid)
         continue;
       qe = qep;
@@ -5546,7 +7696,8 @@ handle_send_message_ack (void *cls,
     }
     break;
   }
-  if (NULL == qe) {
+  if (NULL == qe)
+  {
     /* this should never happen */
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
@@ -5560,9 +7711,11 @@ handle_send_message_ack (void *cls,
   GNUNET_SERVICE_client_continue (tc->client);
 
   /* if applicable, resume transmissions that waited on ACK */
-  if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1
-      == tc->details.communicator.total_queue_length) {
-    /* Communicator dropped below threshold, resume all queues */
+  if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
+      tc->details.communicator.total_queue_length)
+  {
+    /* Communicator dropped below threshold, resume all queues
+       incident with this client! */
     GNUNET_STATISTICS_update (
       GST_stats,
       "# Transmission throttled due to communicator queue limit",
@@ -5571,22 +7724,42 @@ handle_send_message_ack (void *cls,
     for (struct Queue *queue = tc->details.communicator.queue_head;
          NULL != queue;
          queue = queue->next_client)
-      schedule_transmit_on_queue (queue);
-  } else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) {
+      schedule_transmit_on_queue (queue, GNUNET_NO);
+  }
+  else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
+  {
     /* queue dropped below threshold; only resume this one queue */
-    GNUNET_STATISTICS_update (
-      GST_stats,
-      "# Transmission throttled due to queue queue limit",
-      -1,
-      GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Transmission throttled due to queue queue limit",
+                              -1,
+                              GNUNET_NO);
+    schedule_transmit_on_queue (qe->queue, GNUNET_NO);
   }
 
-  /* TODO: we also should react on the status! */
-  // FIXME: this probably requires queue->pm = s assignment!
-  // FIXME: react to communicator status about transmission request. We got:
-  sma->status; // OK success, SYSERR failure
-
+  if (NULL != (pm = qe->pm))
+  {
+    struct Neighbour *n;
+
+    GNUNET_assert (qe == pm->qe);
+    pm->qe = NULL;
+    /* If waiting for this communicator may have blocked transmission
+       of pm on other queues for this neighbour, force schedule
+       transmit on queue for queues of the neighbour */
+    n = pm->target;
+    if (n->pending_msg_head == pm)
+    {
+      for (struct Queue *queue = n->queue_head; NULL != queue;
+           queue = queue->next_neighbour)
+        schedule_transmit_on_queue (queue, GNUNET_NO);
+    }
+    if (GNUNET_OK != ntohl (sma->status))
+    {
+      GNUNET_log (
+        GNUNET_ERROR_TYPE_INFO,
+        "Queue failed in transmission, will try retransmission immediately\n");
+      update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
+    }
+  }
   GNUNET_free (qe);
 }
 
@@ -5610,8 +7783,9 @@ notify_client_queues (void *cls,
 
   GNUNET_assert (CT_MONITOR == tc->type);
   for (struct Queue *q = neighbour->queue_head; NULL != q;
-       q = q->next_neighbour) {
-    struct MonitorEvent me = {.rtt = q->rtt,
+       q = q->next_neighbour)
+  {
+    struct MonitorEvent me = {.rtt = q->pd.aged_rtt,
                               .cs = q->cs,
                               .num_msg_pending = q->num_msg_pending,
                               .num_bytes_pending = q->num_bytes_pending};
@@ -5634,7 +7808,8 @@ handle_monitor_start (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_NONE != tc->type) {
+  if (CT_NONE != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
@@ -5658,7 +7833,8 @@ handle_monitor_start (void *cls,
 static struct TransportClient *
 lookup_communicator (const char *prefix)
 {
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) {
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
     if (CT_COMMUNICATOR != tc->type)
       continue;
     if (0 == strcmp (prefix, tc->details.communicator.address_prefix))
@@ -5690,29 +7866,29 @@ suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char *address)
   size_t alen;
 
   prefix = GNUNET_HELLO_address_to_prefix (address);
-  if (NULL == prefix) {
+  if (NULL == prefix)
+  {
     GNUNET_break (0); /* We got an invalid address!? */
     return;
   }
   tc = lookup_communicator (prefix);
-  if (NULL == tc) {
-    GNUNET_STATISTICS_update (
-      GST_stats,
-      "# Suggestions ignored due to missing communicator",
-      1,
-      GNUNET_NO);
+  if (NULL == tc)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Suggestions ignored due to missing communicator",
+                              1,
+                              GNUNET_NO);
     return;
   }
   /* forward suggestion for queue creation to communicator */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Request #%u for `%s' communicator to create queue to `%s'\n",
-              (unsigned int)idgen,
+              (unsigned int) idgen,
               prefix,
               address);
   alen = strlen (address) + 1;
-  env = GNUNET_MQ_msg_extra (cqm,
-                             alen,
-                             GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
+  env =
+    GNUNET_MQ_msg_extra (cqm, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
   cqm->request_id = htonl (idgen++);
   cqm->receiver = *pid;
   memcpy (&cqm[1], address, alen);
@@ -5730,19 +7906,16 @@ suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char *address)
 static void
 validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
 {
-  struct GNUNET_MQ_Envelope *env;
-  struct TransportValidationChallenge *tvc;
+  struct TransportValidationChallengeMessage tvc;
 
   vs->last_challenge_use = GNUNET_TIME_absolute_get ();
-  env = GNUNET_MQ_msg (
-    tvc,
-    GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
-  tvc->reserved = htonl (0);
-  tvc->challenge = vs->challenge;
-  tvc->sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
-  // FIXME: not so easy, need to BOX this message
-  // in a transmission request! (mistake also done elsewhere!)
-  GNUNET_MQ_send (q->tc->mq, env);
+  tvc.header.type =
+    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
+  tvc.header.size = htons (sizeof (tvc));
+  tvc.reserved = htonl (0);
+  tvc.challenge = vs->challenge;
+  tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
+  queue_send_msg (q, NULL, &tvc, sizeof (tvc));
 }
 
 
@@ -5755,46 +7928,37 @@ static void
 validation_start_cb (void *cls)
 {
   struct ValidationState *vs;
-  struct Neighbour *n;
   struct Queue *q;
 
-  (void)cls;
+  (void) cls;
   validation_task = NULL;
   vs = GNUNET_CONTAINER_heap_peek (validation_heap);
   /* drop validations past their expiration */
   while (
-    (NULL != vs)
-    && (0
-        == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us)) {
+    (NULL != vs) &&
+    (0 == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us))
+  {
     free_validation_state (vs);
     vs = GNUNET_CONTAINER_heap_peek (validation_heap);
   }
   if (NULL == vs)
     return; /* woopsie, no more addresses known, should only
                happen if we're really a lonely peer */
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &vs->pid);
-  q = NULL;
-  if (NULL != n) {
-    for (struct Queue *pos = n->queue_head; NULL != pos;
-         pos = pos->next_neighbour) {
-      if (0 == strcmp (pos->address, vs->address)) {
-        q = pos;
-        break;
-      }
-    }
-  }
-  if (NULL == q) {
+  q = find_queue (&vs->pid, vs->address);
+  if (NULL == q)
+  {
     vs->awaiting_queue = GNUNET_YES;
     suggest_to_connect (&vs->pid, vs->address);
-  } else
+  }
+  else
     validation_transmit_on_queue (q, vs);
   /* Finally, reschedule next attempt */
-  vs->challenge_backoff
-    GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
-                                      MAX_VALIDATION_CHALLENGE_FREQ);
-  update_next_challenge_time (
-    vs,
-    GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
+  vs->challenge_backoff =
+    GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
+                                    MAX_VALIDATION_CHALLENGE_FREQ);
+  update_next_challenge_time (vs,
+                              GNUNET_TIME_relative_to_absolute (
+                                vs->challenge_backoff));
 }
 
 
@@ -5846,17 +8010,16 @@ check_connection_quality (void *cls,
   struct Neighbour *n = value;
   int do_inc;
 
-  (void)pid;
+  (void) pid;
   do_inc = GNUNET_NO;
-  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) {
-    if (0 != q->distance)
-      continue; /* DV does not count */
+  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+  {
     ctx->num_queues++;
     if (0 == ctx->k--)
       ctx->q = q;
     /* OPTIMIZE-FIXME: in the future, add reliability / goodput
        statistics and consider those as well here? */
-    if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
+    if (q->pd.aged_rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
       do_inc = GNUNET_YES;
   }
   if (GNUNET_YES == do_inc)
@@ -5881,10 +8044,9 @@ start_dv_learn (void *cls)
 {
   struct LearnLaunchEntry *lle;
   struct QueueQualityContext qqc;
-  struct GNUNET_MQ_Envelope *env;
-  struct TransportDVLearn *dvl;
+  struct TransportDVLearnMessage dvl;
 
-  (void)cls;
+  (void) cls;
   dvlearn_task = NULL;
   if (0 == GNUNET_CONTAINER_multipeermap_size (neighbours))
     return; /* lost all connectivity, cannot do learning */
@@ -5893,7 +8055,8 @@ start_dv_learn (void *cls)
   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
                                          &check_connection_quality,
                                          &qqc);
-  if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD) {
+  if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD)
+  {
     struct GNUNET_TIME_Relative delay;
     unsigned int factor;
 
@@ -5904,13 +8067,14 @@ start_dv_learn (void *cls)
     return;
   }
   /* remove old entries in #dvlearn_map if it has grown too big */
-  while (MAX_DV_LEARN_PENDING
-         >= GNUNET_CONTAINER_multishortmap_size (dvlearn_map)) {
+  while (MAX_DV_LEARN_PENDING >=
+         GNUNET_CONTAINER_multishortmap_size (dvlearn_map))
+  {
     lle = lle_tail;
-    GNUNET_assert (GNUNET_YES
-                   == GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
-                                                             &lle->challenge,
-                                                             lle));
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
+                                                          &lle->challenge.value,
+                                                          lle));
     GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);
     GNUNET_free (lle);
   }
@@ -5920,29 +8084,32 @@ start_dv_learn (void *cls)
                               &lle->challenge,
                               sizeof (lle->challenge));
   GNUNET_CONTAINER_DLL_insert (lle_head, lle_tail, lle);
-  GNUNET_break (GNUNET_YES
-                == GNUNET_CONTAINER_multishortmap_put (
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multishortmap_put (
                   dvlearn_map,
-                  &lle->challenge,
+                  &lle->challenge.value,
                   lle,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  env = GNUNET_MQ_msg (dvl, GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
-  dvl->num_hops = htons (0);
-  dvl->bidirectional = htons (0);
-  dvl->non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+  dvl.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
+  dvl.header.size = htons (sizeof (dvl));
+  dvl.num_hops = htons (0);
+  dvl.bidirectional = htons (0);
+  dvl.non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+  dvl.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
   {
     struct DvInitPS dvip = {.purpose.purpose = htonl (
                               GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
                             .purpose.size = htonl (sizeof (dvip)),
+                            .monotonic_time = dvl.monotonic_time,
                             .challenge = lle->challenge};
 
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                                &dvip.purpose,
-                                                &dvl->init_sig));
+    GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                                          &dvip.purpose,
+                                                          &dvl.init_sig));
   }
-  dvl->initiator = GST_my_identity;
-  dvl->challenge = lle->challenge;
+  dvl.initiator = GST_my_identity;
+  dvl.challenge = lle->challenge;
 
   qqc.quality_count = 0;
   qqc.k = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, qqc.num_queues);
@@ -5955,16 +8122,14 @@ start_dv_learn (void *cls)
 
   /* Do this as close to transmission time as possible! */
   lle->launch_time = GNUNET_TIME_absolute_get ();
-  // FIXME: not so easy, need to BOX this message
-  // in a transmission request! (mistake also done elsewhere!)
-  GNUNET_MQ_send (qqc.q->tc->mq, env);
 
+  queue_send_msg (qqc.q, NULL, &dvl, sizeof (dvl));
   /* reschedule this job, randomizing the time it runs (but no
      actual backoff!) */
-  dvlearn_task = GNUNET_SCHEDULER_add_delayed (
-    GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),
-    &start_dv_learn,
-    NULL);
+  dvlearn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (
+                                                 DV_LEARN_BASE_FREQUENCY),
+                                               &start_dv_learn,
+                                               NULL);
 }
 
 
@@ -5985,9 +8150,10 @@ check_validation_request_pending (void *cls,
   struct Queue *q = cls;
   struct ValidationState *vs = value;
 
-  (void)pid;
-  if ((GNUNET_YES == vs->awaiting_queue)
-      && (0 == strcmp (vs->address, q->address))) {
+  (void) pid;
+  if ((GNUNET_YES == vs->awaiting_queue) &&
+      (0 == strcmp (vs->address, q->address)))
+  {
     vs->awaiting_queue = GNUNET_NO;
     validation_transmit_on_queue (q, vs);
     return GNUNET_NO;
@@ -5996,6 +8162,43 @@ check_validation_request_pending (void *cls,
 }
 
 
+/**
+ * Function called with the monotonic time of a DV initiator
+ * by PEERSTORE. Updates the time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
+ */
+static void
+neighbour_dv_monotime_cb (void *cls,
+                          const struct GNUNET_PEERSTORE_Record *record,
+                          const char *emsg)
+{
+  struct Neighbour *n = cls;
+  struct GNUNET_TIME_AbsoluteNBO *mtbe;
+
+  (void) emsg;
+  if (NULL == record)
+  {
+    /* we're done with #neighbour_dv_monotime_cb() invocations,
+       continue normal processing */
+    n->get = NULL;
+    n->dv_monotime_available = GNUNET_YES;
+    return;
+  }
+  if (sizeof (*mtbe) != record->value_size)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  mtbe = record->value;
+  n->last_dv_learn_monotime =
+    GNUNET_TIME_absolute_max (n->last_dv_learn_monotime,
+                              GNUNET_TIME_absolute_ntoh (*mtbe));
+}
+
+
 /**
  * New queue became available.  Process the request.
  *
@@ -6012,7 +8215,8 @@ handle_add_queue_message (void *cls,
   const char *addr;
   uint16_t addr_len;
 
-  if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox)) {
+  if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBoxMessage))
+  {
     /* MTU so small as to be useless for transmissions,
        required for #fragment_message()! */
     GNUNET_break_op (0);
@@ -6020,48 +8224,40 @@ handle_add_queue_message (void *cls,
     return;
   }
   neighbour = lookup_neighbour (&aqm->receiver);
-  if (NULL == neighbour) {
+  if (NULL == neighbour)
+  {
     neighbour = GNUNET_new (struct Neighbour);
-    neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
     neighbour->pid = aqm->receiver;
-    GNUNET_assert (GNUNET_OK
-                   == GNUNET_CONTAINER_multipeermap_put (
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multipeermap_put (
                      neighbours,
                      &neighbour->pid,
                      neighbour,
                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    cores_send_connect_info (&neighbour->pid, GNUNET_BANDWIDTH_ZERO);
+    neighbour->get =
+      GNUNET_PEERSTORE_iterate (peerstore,
+                                "transport",
+                                &neighbour->pid,
+                                GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+                                &neighbour_dv_monotime_cb,
+                                neighbour);
   }
   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
-  addr = (const char *)&aqm[1];
+  addr = (const char *) &aqm[1];
 
   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
   queue->tc = tc;
-  queue->address = (const char *)&queue[1];
-  queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+  queue->address = (const char *) &queue[1];
+  queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
   queue->qid = aqm->qid;
   queue->mtu = ntohl (aqm->mtu);
-  queue->nt = (enum GNUNET_NetworkType)ntohl (aqm->nt);
-  queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus)ntohl (aqm->cs);
+  queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
+  queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
   queue->neighbour = neighbour;
-  GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
-                                  &tracker_update_in_cb,
-                                  queue,
-                                  GNUNET_BANDWIDTH_ZERO,
-                                  GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
-                                  &tracker_excess_in_cb,
-                                  queue);
-  GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
-                                  &tracker_update_out_cb,
-                                  queue,
-                                  GNUNET_BANDWIDTH_ZERO,
-                                  GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
-                                  &tracker_excess_out_cb,
-                                  queue);
   memcpy (&queue[1], addr, addr_len);
   /* notify monitors about new queue */
   {
-    struct MonitorEvent me = {.rtt = queue->rtt, .cs = queue->cs};
+    struct MonitorEvent me = {.rtt = queue->pd.aged_rtt, .cs = queue->cs};
 
     notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   }
@@ -6074,11 +8270,11 @@ handle_add_queue_message (void *cls,
                                 tc->details.communicator.queue_tail,
                                 queue);
   /* check if valdiations are waiting for the queue */
-  (void)GNUNET_CONTAINER_multipeermap_get_multiple (
-    validation_map,
-    &aqm->receiver,
-    &check_validation_request_pending,
-    queue);
+  (void)
+    GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                                &aqm->receiver,
+                                                &check_validation_request_pending,
+                                                queue);
   /* might be our first queue, try launching DV learning */
   if (NULL == dvlearn_task)
     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
@@ -6099,7 +8295,8 @@ handle_queue_create_ok (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
@@ -6110,14 +8307,14 @@ handle_queue_create_ok (void *cls,
                             GNUNET_NO);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Request #%u for communicator to create queue succeeded\n",
-              (unsigned int)ntohs (cqr->request_id));
+              (unsigned int) ntohs (cqr->request_id));
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
 
 /**
- * Communicator tells us that our request to create a queue failed. This usually
- * indicates that the provided address is simply invalid or that the
+ * Communicator tells us that our request to create a queue failed. This
+ * usually indicates that the provided address is simply invalid or that the
  * communicator's resources are exhausted.
  *
  * @param cls the `struct TransportClient`
@@ -6130,19 +8327,19 @@ handle_queue_create_fail (
 {
   struct TransportClient *tc = cls;
 
-  if (CT_COMMUNICATOR != tc->type) {
+  if (CT_COMMUNICATOR != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Request #%u for communicator to create queue failed\n",
-              (unsigned int)ntohs (cqr->request_id));
-  GNUNET_STATISTICS_update (
-    GST_stats,
-    "# Suggestions failed in queue creation at communicator",
-    1,
-    GNUNET_NO);
+              (unsigned int) ntohs (cqr->request_id));
+  GNUNET_STATISTICS_update (GST_stats,
+                            "# Suggestions failed in queue creation at communicator",
+                            1,
+                            GNUNET_NO);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -6160,19 +8357,21 @@ handle_suggest_cancel (void *cls, const struct ExpressPreferenceMessage *msg)
   struct TransportClient *tc = cls;
   struct PeerRequest *pr;
 
-  if (CT_APPLICATION != tc->type) {
+  if (CT_APPLICATION != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
   pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
                                           &msg->peer);
-  if (NULL == pr) {
+  if (NULL == pr)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
-  (void)stop_peer_request (tc, &pr->pid, pr);
+  (void) stop_peer_request (tc, &pr->pid, pr);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -6190,8 +8389,8 @@ check_address_consider_verify (
   void *cls,
   const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
 {
-  (void)cls;
-  (void)hdr;
+  (void) cls;
+  (void) hdr;
   return GNUNET_OK;
 }
 
@@ -6230,7 +8429,7 @@ check_known_address (void *cls,
   struct CheckKnownAddressContext *ckac = cls;
   struct ValidationState *vs = value;
 
-  (void)pid;
+  (void) pid;
   if (0 != strcmp (vs->address, ckac->address))
     return GNUNET_OK;
   ckac->vs = vs;
@@ -6256,21 +8455,24 @@ start_address_validation (const struct GNUNET_PeerIdentity *pid,
 
   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
     return; /* expired */
-  (void)GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                                    pid,
-                                                    &check_known_address,
-                                                    &ckac);
-  if (NULL != (vs = ckac.vs)) {
+  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                                     pid,
+                                                     &check_known_address,
+                                                     &ckac);
+  if (NULL != (vs = ckac.vs))
+  {
     /* if 'vs' is not currently valid, we need to speed up retrying the
      * validation */
-    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us) {
+    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
+    {
       /* reduce backoff as we got a fresh advertisement */
-      vs->challenge_backoff = GNUNET_TIME_relative_min (
-        FAST_VALIDATION_CHALLENGE_FREQ,
-        GNUNET_TIME_relative_divide (vs->challenge_backoff, 2));
-      update_next_challenge_time (
-        vs,
-        GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
+      vs->challenge_backoff =
+        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
+                                  GNUNET_TIME_relative_divide (vs->challenge_backoff,
+                                                               2));
+      update_next_challenge_time (vs,
+                                  GNUNET_TIME_relative_to_absolute (
+                                    vs->challenge_backoff));
     }
     return;
   }
@@ -6284,8 +8486,8 @@ start_address_validation (const struct GNUNET_PeerIdentity *pid,
                               &vs->challenge,
                               sizeof (vs->challenge));
   vs->address = GNUNET_strdup (address);
-  GNUNET_assert (GNUNET_YES
-                 == GNUNET_CONTAINER_multipeermap_put (
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_put (
                    validation_map,
                    &vs->pid,
                    vs,
@@ -6309,19 +8511,21 @@ handle_hello (void *cls,
   struct PeerRequest *pr = cls;
   const char *val;
 
-  if (NULL != emsg) {
+  if (NULL != emsg)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Got failure from PEERSTORE: %s\n",
                 emsg);
     return;
   }
   val = record->value;
-  if ((0 == record->value_size) || ('\0' != val[record->value_size - 1])) {
+  if ((0 == record->value_size) || ('\0' != val[record->value_size - 1]))
+  {
     GNUNET_break (0);
     return;
   }
   start_address_validation (&pr->pid,
-                            (const char *)record->value,
+                            (const char *) record->value,
                             record->expiry);
 }
 
@@ -6339,12 +8543,14 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg)
   struct TransportClient *tc = cls;
   struct PeerRequest *pr;
 
-  if (CT_NONE == tc->type) {
+  if (CT_NONE == tc->type)
+  {
     tc->type = CT_APPLICATION;
-    tc->details.application.requests
-      GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+    tc->details.application.requests =
+      GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
   }
-  if (CT_APPLICATION != tc->type) {
+  if (CT_APPLICATION != tc->type)
+  {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
@@ -6352,19 +8558,19 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client suggested we talk to %s with preference %d at rate %u\n",
               GNUNET_i2s (&msg->peer),
-              (int)ntohl (msg->pk),
-              (int)ntohl (msg->bw.value__));
+              (int) ntohl (msg->pk),
+              (int) ntohl (msg->bw.value__));
   pr = GNUNET_new (struct PeerRequest);
   pr->tc = tc;
   pr->pid = msg->peer;
   pr->bw = msg->bw;
-  pr->pk = (enum GNUNET_MQ_PreferenceKind)ntohl (msg->pk);
-  if (GNUNET_YES
-      != GNUNET_CONTAINER_multipeermap_put (
-        tc->details.application.requests,
-        &pr->pid,
-        pr,
-        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) {
+  pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
+  if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_put (
+                      tc->details.application.requests,
+                      &pr->pid,
+                      pr,
+                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+  {
     GNUNET_break (0);
     GNUNET_free (pr);
     GNUNET_SERVICE_client_drop (tc->client);
@@ -6398,17 +8604,19 @@ handle_address_consider_verify (
   enum GNUNET_NetworkType nt;
   struct GNUNET_TIME_Absolute expiration;
 
-  (void)cls;
-  // FIXME: checking that we know this address already should
+  (void) cls;
+  // OPTIMIZE-FIXME: checking that we know this address already should
   //        be done BEFORE checking the signature => HELLO API change!
-  // FIXME: pre-check: rate-limit signature verification / validation?!
-  address
-    = GNUNET_HELLO_extract_address (&hdr[1],
-                                    ntohs (hdr->header.size) - sizeof (*hdr),
-                                    &hdr->peer,
-                                    &nt,
-                                    &expiration);
-  if (NULL == address) {
+  // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
+  // validation?!
+  address =
+    GNUNET_HELLO_extract_address (&hdr[1],
+                                  ntohs (hdr->header.size) - sizeof (*hdr),
+                                  &hdr->peer,
+                                  &nt,
+                                  &expiration);
+  if (NULL == address)
+  {
     GNUNET_break_op (0);
     return;
   }
@@ -6430,7 +8638,7 @@ static int
 check_request_hello_validation (void *cls,
                                 const struct RequestHelloValidationMessage *m)
 {
-  (void)cls;
+  (void) cls;
   GNUNET_MQ_check_zero_termination (m);
   return GNUNET_OK;
 }
@@ -6450,7 +8658,7 @@ handle_request_hello_validation (void *cls,
   struct TransportClient *tc = cls;
 
   start_address_validation (&m->peer,
-                            (const char *)&m[1],
+                            (const char *) &m[1],
                             GNUNET_TIME_absolute_ntoh (m->expiration));
   GNUNET_SERVICE_client_continue (tc->client);
 }
@@ -6471,8 +8679,8 @@ free_neighbour_cb (void *cls,
 {
   struct Neighbour *neighbour = value;
 
-  (void)cls;
-  (void)pid;
+  (void) cls;
+  (void) pid;
   GNUNET_break (0); // should this ever happen?
   free_neighbour (neighbour);
 
@@ -6495,8 +8703,8 @@ free_dv_routes_cb (void *cls,
 {
   struct DistanceVector *dv = value;
 
-  (void)cls;
-  (void)pid;
+  (void) cls;
+  (void) pid;
   free_dv_route (dv);
 
   return GNUNET_OK;
@@ -6518,8 +8726,8 @@ free_ephemeral_cb (void *cls,
 {
   struct EphemeralCacheEntry *ece = value;
 
-  (void)cls;
-  (void)pid;
+  (void) cls;
+  (void) pid;
   free_ephemeral (ece);
   return GNUNET_OK;
 }
@@ -6540,13 +8748,57 @@ free_validation_state_cb (void *cls,
 {
   struct ValidationState *vs = value;
 
-  (void)cls;
-  (void)pid;
+  (void) cls;
+  (void) pid;
   free_validation_state (vs);
   return GNUNET_OK;
 }
 
 
+/**
+ * Free pending acknowledgement.
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct PendingAcknowledgement`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_pending_ack_cb (void *cls,
+                     const struct GNUNET_ShortHashCode *key,
+                     void *value)
+{
+  struct PendingAcknowledgement *pa = value;
+
+  (void) cls;
+  (void) key;
+  free_pending_acknowledgement (pa);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Free acknowledgement cummulator.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct AcknowledgementCummulator`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_ack_cummulator_cb (void *cls,
+                        const struct GNUNET_PeerIdentity *pid,
+                        void *value)
+{
+  struct AcknowledgementCummulator *ac = value;
+
+  (void) cls;
+  (void) pid;
+  GNUNET_free (ac);
+  return GNUNET_OK;
+}
+
+
 /**
  * Function called when the service shuts down.  Unloads our plugins
  * and cancels pending validations.
@@ -6557,33 +8809,57 @@ static void
 do_shutdown (void *cls)
 {
   struct LearnLaunchEntry *lle;
-  (void)cls;
+  (void) cls;
 
-  if (NULL != ephemeral_task) {
+  if (NULL != ephemeral_task)
+  {
     GNUNET_SCHEDULER_cancel (ephemeral_task);
     ephemeral_task = NULL;
   }
   GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL);
-  if (NULL != peerstore) {
+  if (NULL != peerstore)
+  {
     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
     peerstore = NULL;
   }
-  if (NULL != GST_stats) {
+  if (NULL != GST_stats)
+  {
     GNUNET_STATISTICS_destroy (GST_stats, GNUNET_NO);
     GST_stats = NULL;
   }
-  if (NULL != GST_my_private_key) {
+  if (NULL != GST_my_private_key)
+  {
     GNUNET_free (GST_my_private_key);
     GST_my_private_key = NULL;
   }
+  GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators,
+                                         &free_ack_cummulator_cb,
+                                         NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
+  ack_cummulators = NULL;
+  GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
+                                          &free_pending_ack_cb,
+                                          NULL);
+  GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+  pending_acks = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
   neighbours = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
+  GNUNET_CONTAINER_multipeermap_destroy (links);
+  links = NULL;
+  GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
+                                         &free_backtalker_cb,
+                                         NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (backtalkers);
+  backtalkers = NULL;
   GNUNET_CONTAINER_multipeermap_iterate (validation_map,
                                          &free_validation_state_cb,
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
   validation_map = NULL;
-  while (NULL != (lle = lle_head)) {
+  while (NULL != (lle = lle_head))
+  {
     GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);
     GNUNET_free (lle);
   }
@@ -6616,23 +8892,28 @@ run (void *cls,
      const struct GNUNET_CONFIGURATION_Handle *c,
      struct GNUNET_SERVICE_Handle *service)
 {
-  (void)cls;
-  (void)service;
+  (void) cls;
+  (void) service;
   /* setup globals */
   GST_cfg = c;
+  backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+  pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+  ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
+  links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
-  ephemeral_heap
-    GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  ephemeral_heap =
+    GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
   dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
                                                        GNUNET_YES);
   validation_map = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
-  validation_heap
-    = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  GST_my_private_key
-    = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
-  if (NULL == GST_my_private_key) {
+  validation_heap =
+    GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  GST_my_private_key =
+    GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
+  if (NULL == GST_my_private_key)
+  {
     GNUNET_log (
       GNUNET_ERROR_TYPE_ERROR,
       _ (
@@ -6648,7 +8929,8 @@ run (void *cls,
   GST_stats = GNUNET_STATISTICS_create ("transport", GST_cfg);
   GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
-  if (NULL == peerstore) {
+  if (NULL == peerstore)
+  {
     GNUNET_break (0);
     GNUNET_SCHEDULER_shutdown ();
     return;
@@ -6688,6 +8970,10 @@ GNUNET_SERVICE_MAIN (
                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
                          struct OutboundMessage,
                          NULL),
+  GNUNET_MQ_hd_fixed_size (client_recv_ok,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
+                           struct RecvOkMessage,
+                           NULL),
   /* communication with communicators */
   GNUNET_MQ_hd_var_size (communicator_available,
                          GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,