tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
index 697c43f0d5b688bab9afa0d4845af64a81796fb0..a8f70986b3277d46f4be100c5ea10256d42487f1 100644 (file)
  *
  * TODO:
  * Implement next:
- * - retransmission logic
- * - track RTT, distance, loss, etc. => requires extra data structures!
- *
- * Later:
- * - change transport-core API to provide proper flow control in both
- *   directions, allow multiple messages per peer simultaneously (tag
- *   confirmations with unique message ID), and replace quota-out with
- *   proper flow control;
- * - if messages are below MTU, consider adding ACKs and other stuff
- *   (requires planning at receiver, and additional MST-style demultiplex
- *    at receiver!)
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
+ *   reliability, etc.) per message!
+ * - review retransmission logic, right now there is no smartness there!
+ *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
+ *
+ * Optimizations:
+ * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
+ *   => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
+ * - queue_send_msg and route_message both by API design have to make copies
+ *   of the payload, and route_message on top of that requires a malloc/free.
+ *   Change design to approximate "zero" copy better... [CPU]
  * - could avoid copying body of message into each fragment and keep
  *   fragments as just pointers into the original message and only
  *   fully build fragments just before transmission (optimization, should
- *   reduce CPU and memory use)
+ *   reduce CPU and memory use) [CPU, MEMORY]
+ * - if messages are below MTU, consider adding ACKs and other stuff
+ *   to the same transmission to avoid tiny messages (requires planning at
+ *   receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
  * - When we passively learned DV (with unconfirmed freshness), we
  *   right now add the path to our list but with a zero path_valid_until
  *   time and only use it for unconfirmed routes.  However, we could consider
  *   triggering an explicit validation mechansim ourselves, specifically routing
- *   a challenge-response message over the path (OPTIMIZATION-FIXME).
- *
- * FIXME (without marks in the code!):
- * - proper use/initialization of timestamps in messages exchanged
- *   during DV learning
- * - persistence of monotonic time obtained from other peers
- *   in PEERSTORE (by message type)
- *
- * Optimizations:
- * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
- *   against our pending message queue (requires additional per neighbour
- *   hash map to be maintained, avoids possible linear scan on pending msgs)
- * - queue_send_msg and route_message both by API design have to make copies
- *   of the payload, and route_message on top of that requires a malloc/free.
- *   Change design to approximate "zero" copy better...
+ *   a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
 #include "gnunet_signatures.h"
 #include "transport.h"
 
+/**
+ * Maximum number of messages we acknowledge together in one
+ * cummulative ACK.  Larger values may save a bit of bandwidth.
+ */
+#define MAX_CUMMULATIVE_ACKS 64
 
 /**
  * What is the size we assume for a read operation in the
  */
 #define IN_PACKET_SIZE_WITHOUT_MTU 128
 
+/**
+ * Number of slots we keep of historic data for computation of
+ * goodput / message loss ratio.
+ */
+#define GOODPUT_AGING_SLOTS 4
+
+/**
+ * Maximum number of peers we select for forwarding DVInit
+ * messages at the same time (excluding initiator).
+ */
+#define MAX_DV_DISCOVERY_SELECTION 16
+
+/**
+ * Window size. How many messages to the same target do we pass
+ * to CORE without a RECV_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define RECV_WINDOW_SIZE 4
+
 /**
  * Minimum number of hops we should forward DV learn messages
  * even if they are NOT useful for us in hope of looping
 #define DV_PATH_VALIDITY_TIMEOUT \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
 
+/**
+ * How long do we cache backchannel (struct Backtalker) information
+ * after a backchannel goes inactive?
+ */
+#define BACKCHANNEL_INACTIVITY_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+
 /**
  * How long before paths expire would we like to (re)discover DV paths? Should
  * be below #DV_PATH_VALIDITY_TIMEOUT.
 #define MAX_VALIDATION_CHALLENGE_FREQ \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
 
+/**
+ * How long until we forget about historic accumulators and thus
+ * reset the ACK counter? Should exceed the maximum time an
+ * active connection experiences without an ACK.
+ */
+#define ACK_CUMMULATOR_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+
 /**
  * What is the non-randomized base frequency at which we
  * would initiate DV learn messages?
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
+/**
+ * Unique identifier we attach to a message.
+ */
+struct MessageUUIDP
+{
+  /**
+   * Unique value, generated by incrementing the
+   * `message_uuid_ctr` of `struct Neighbour`.
+   */
+  uint64_t uuid GNUNET_PACKED;
+};
+
+
+/**
+ * Unique identifier to map an acknowledgement to a transmission.
+ */
+struct AcknowledgementUUIDP
+{
+  /**
+   * The UUID value.  Not actually a hash, but a random value.
+   */
+  struct GNUNET_ShortHashCode value;
+};
+
+
+/**
+ * Unique identifier we attach to a message.
+ */
+struct FragmentUUIDP
+{
+  /**
+   * Unique value identifying a fragment, in NBO.
+   */
+  uint32_t uuid GNUNET_PACKED;
+};
+
+
+/**
+ * Type of a nonce used for challenges.
+ */
+struct ChallengeNonceP
+{
+  /**
+   * The value of the nonce.  Note that this is NOT a hash.
+   */
+  struct GNUNET_ShortHashCode value;
+};
+
+
 /**
  * Outer layer of an encapsulated backchannel message.
  */
@@ -250,12 +332,9 @@ struct TransportBackchannelEncapsulationMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Distance the backchannel message has traveled, to be updated at
-   * each hop.  Used to bound the number of hops in case a backchannel
-   * message is broadcast and thus travels without routing
-   * information (during initial backchannel discovery).
+   * Reserved, always zero.
    */
-  uint32_t distance;
+  uint32_t reserved GNUNET_PACKED;
 
   /**
    * Target's peer identity (as backchannels may be transmitted
@@ -289,7 +368,7 @@ struct TransportBackchannelEncapsulationMessage
 /**
  * Body by which a peer confirms that it is using an ephemeral key.
  */
-struct EphemeralConfirmation
+struct EphemeralConfirmationPS
 {
 
   /**
@@ -311,7 +390,7 @@ struct EphemeralConfirmation
    * communicators must protect against replay attacks when using backchannel
    * communication!
    */
-  struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time;
 
   /**
    * Target's peer identity.
@@ -330,7 +409,7 @@ struct EphemeralConfirmation
  * Plaintext of the variable-size payload that is encrypted
  * within a `struct TransportBackchannelEncapsulationMessage`
  */
-struct TransportBackchannelRequestPayload
+struct TransportBackchannelRequestPayloadP
 {
 
   /**
@@ -344,22 +423,6 @@ struct TransportBackchannelRequestPayload
    */
   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
 
-  /**
-   * How long is this signature over the ephemeral key valid?
-   *
-   * Note that the receiver MUST IGNORE the absolute time, and only interpret
-   * the value as a mononic time and reject "older" values than the last one
-   * observed.  This is necessary as we do not want to require synchronized
-   * clocks and may not have a bidirectional communication channel.
-   *
-   * Even with this, there is no real guarantee against replay achieved here,
-   * unless the latest timestamp is persisted.  While persistence should be
-   * provided via PEERSTORE, we do not consider the mechanism reliable!  Thus,
-   * communicators must protect against replay attacks when using backchannel
-   * communication!
-   */
-  struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
-
   /**
    * Current monotonic time of the sending transport service.  Used to
    * detect replayed messages.  Note that the receiver should remember
@@ -384,7 +447,7 @@ struct TransportBackchannelRequestPayload
  * Outer layer of an encapsulated unfragmented application message sent
  * over an unreliable channel.
  */
-struct TransportReliabilityBox
+struct TransportReliabilityBoxMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
@@ -404,7 +467,27 @@ struct TransportReliabilityBox
    * messages sent over possibly unreliable channels.  Should
    * be a random.
    */
-  struct GNUNET_ShortHashCode msg_uuid;
+  struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayloadP
+{
+  /**
+   * How long was the ACK delayed for generating cummulative ACKs?
+   * Used to calculate the correct network RTT by taking the receipt
+   * time of the ack minus the transmission time of the sender minus
+   * this value.
+   */
+  struct GNUNET_TIME_RelativeNBO ack_delay;
+
+  /**
+   * UUID of a message being acknowledged.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
 };
 
 
@@ -423,19 +506,12 @@ struct TransportReliabilityAckMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Reserved. Zero.
-   */
-  uint32_t reserved GNUNET_PACKED;
-
-  /**
-   * How long was the ACK delayed relative to the average time of
-   * receipt of the messages being acknowledged?  Used to calculate
-   * the average RTT by taking the receipt time of the ack minus the
-   * average transmission time of the sender minus this value.
+   * Counter of ACKs transmitted by the sender to us. Incremented
+   * by one for each ACK, used to detect how many ACKs were lost.
    */
-  struct GNUNET_TIME_RelativeNBO avg_ack_delay;
+  uint32_t ack_counter GNUNET_PACKED;
 
-  /* followed by any number of `struct GNUNET_ShortHashCode`
+  /* followed by any number of `struct TransportCummulativeAckPayloadP`
      messages providing ACKs */
 };
 
@@ -443,7 +519,7 @@ struct TransportReliabilityAckMessage
 /**
  * Outer layer of an encapsulated fragmented application message.
  */
-struct TransportFragmentBox
+struct TransportFragmentBoxMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
@@ -453,18 +529,17 @@ struct TransportFragmentBox
   /**
    * Unique ID of this fragment (and fragment transmission!). Will
    * change even if a fragement is retransmitted to make each
-   * transmission attempt unique! Should be incremented by one for
-   * each fragment transmission. If a client receives a duplicate
-   * fragment (same @e frag_off), it must send
-   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
+   * transmission attempt unique! If a client receives a duplicate
+   * fragment (same @e frag_off for same @a msg_uuid, it must send
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately.
    */
-  uint32_t frag_uuid GNUNET_PACKED;
+  struct AcknowledgementUUIDP ack_uuid;
 
   /**
-   * Original message ID for of the message that all the1
-   * fragments belong to.  Must be the same for all fragments.
+   * Original message ID for of the message that all the fragments
+   * belong to.  Must be the same for all fragments.
    */
-  struct GNUNET_ShortHashCode msg_uuid;
+  struct MessageUUIDP msg_uuid;
 
   /**
    * Offset of this fragment in the overall message.
@@ -478,54 +553,6 @@ struct TransportFragmentBox
 };
 
 
-/**
- * Outer layer of an fragmented application message sent over a queue
- * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
- * received, the receiver has two RTTs or 64 further fragments with
- * the same basic message time to send an acknowledgement, possibly
- * acknowledging up to 65 fragments in one ACK.  ACKs must also be
- * sent immediately once all fragments were sent.
- */
-struct TransportFragmentAckMessage
-{
-  /**
-   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Unique ID of the lowest fragment UUID being acknowledged.
-   */
-  uint32_t frag_uuid GNUNET_PACKED;
-
-  /**
-   * Bitfield of up to 64 additional fragments following the
-   * @e msg_uuid being acknowledged by this message.
-   */
-  uint64_t extra_acks GNUNET_PACKED;
-
-  /**
-   * Original message ID for of the message that all the
-   * fragments belong to.
-   */
-  struct GNUNET_ShortHashCode msg_uuid;
-
-  /**
-   * How long was the ACK delayed relative to the average time of
-   * receipt of the fragments being acknowledged?  Used to calculate
-   * the average RTT by taking the receipt time of the ack minus the
-   * average transmission time of the sender minus this value.
-   */
-  struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
-  /**
-   * How long until the receiver will stop trying reassembly
-   * of this message?
-   */
-  struct GNUNET_TIME_RelativeNBO reassembly_timeout;
-};
-
-
 /**
  * Content signed by the initator during DV learning.
  *
@@ -567,7 +594,7 @@ struct DvInitPS
   /**
    * Challenge value used by the initiator to re-identify the path.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 };
 
 
@@ -607,13 +634,13 @@ struct DvHopPS
   /**
    * Challenge value used by the initiator to re-identify the path.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 };
 
 
 /**
  * An entry describing a peer on a path in a
- * `struct TransportDVLearn` message.
+ * `struct TransportDVLearnMessage` message.
  */
 struct DVPathEntryP
 {
@@ -643,7 +670,7 @@ struct DVPathEntryP
  * zero, peers that can forward to the initator should always try to
  * forward to the initiator.
  */
-struct TransportDVLearn
+struct TransportDVLearnMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
@@ -673,6 +700,20 @@ struct TransportDVLearn
    */
   struct GNUNET_TIME_RelativeNBO non_network_delay;
 
+  /**
+   * Time at the initiator when generating the signature.
+   *
+   * Note that the receiver MUST IGNORE the absolute time, and only interpret
+   * the value as a mononic time and reject "older" values than the last one
+   * observed.  This is necessary as we do not want to require synchronized
+   * clocks and may not have a bidirectional communication channel.
+   *
+   * Even with this, there is no real guarantee against replay achieved here,
+   * unless the latest timestamp is persisted.  Persistence should be
+   * provided via PEERSTORE if possible.
+   */
+  struct GNUNET_TIME_AbsoluteNBO monotonic_time;
+
   /**
    * Signature of this hop over the path, of purpose
    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
@@ -687,7 +728,7 @@ struct TransportDVLearn
   /**
    * Challenge value used by the initiator to re-identify the path.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /* Followed by @e num_hops `struct DVPathEntryP` values,
      excluding the initiator of the DV trace; the last entry is the
@@ -710,7 +751,7 @@ struct TransportDVLearn
  *
  * If a peer finds itself still on the list, it must drop the message.
  */
-struct TransportDVBox
+struct TransportDVBoxMessage
 {
   /**
    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
@@ -748,7 +789,7 @@ struct TransportDVBox
  * Message send to another peer to validate that it can indeed
  * receive messages at a particular address.
  */
-struct TransportValidationChallenge
+struct TransportValidationChallengeMessage
 {
 
   /**
@@ -764,7 +805,7 @@ struct TransportValidationChallenge
   /**
    * Challenge to be signed by the receiving peer.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * Timestamp of the sender, to be copied into the reply
@@ -795,7 +836,7 @@ struct TransportValidationPS
   /**
    * Challenge signed by the receiving peer.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 };
 
 
@@ -803,7 +844,7 @@ struct TransportValidationPS
  * Message send to a peer to respond to a
  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
  */
-struct TransportValidationResponse
+struct TransportValidationResponseMessage
 {
 
   /**
@@ -825,7 +866,7 @@ struct TransportValidationResponse
   /**
    * The challenge that was signed by the receiving peer.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * Original timestamp of the sender (was @code{sender_time}),
@@ -876,6 +917,41 @@ enum ClientType
 };
 
 
+/**
+ * Which transmission options are allowable for transmission?
+ * Interpreted bit-wise!
+ */
+enum RouteMessageOptions
+{
+  /**
+   * Only confirmed, non-DV direct neighbours.
+   */
+  RMO_NONE = 0,
+
+  /**
+   * We are allowed to use DV routing for this @a hdr
+   */
+  RMO_DV_ALLOWED = 1,
+
+  /**
+   * We are allowed to use unconfirmed queues or DV routes for this message
+   */
+  RMO_UNCONFIRMED_ALLOWED = 2,
+
+  /**
+   * Reliable and unreliable, DV and non-DV are all acceptable.
+   */
+  RMO_ANYTHING_GOES = (RMO_DV_ALLOWED | RMO_UNCONFIRMED_ALLOWED),
+
+  /**
+   * If we have multiple choices, it is OK to send this message
+   * over multiple channels at the same time to improve loss tolerance.
+   * (We do at most 2 transmissions.)
+   */
+  RMO_REDUNDANT = 4
+};
+
+
 /**
  * When did we launch this DV learning activity?
  */
@@ -895,7 +971,7 @@ struct LearnLaunchEntry
   /**
    * Challenge that uniquely identifies this activity.
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * When did we transmit the DV learn message (used to calculate RTT) and
@@ -932,6 +1008,11 @@ struct EphemeralCacheEntry
    */
   struct GNUNET_TIME_Absolute ephemeral_validity;
 
+  /**
+   * What time was @e sender_sig created
+   */
+  struct GNUNET_TIME_Absolute monotime;
+
   /**
    * Our ephemeral key.
    */
@@ -950,84 +1031,356 @@ struct EphemeralCacheEntry
 };
 
 
+/**
+ * Information we keep per #GOODPUT_AGING_SLOTS about historic
+ * (or current) transmission performance.
+ */
+struct TransmissionHistoryEntry
+{
+  /**
+   * Number of bytes actually sent in the interval.
+   */
+  uint64_t bytes_sent;
+
+  /**
+   * Number of bytes received and acknowledged by the other peer in
+   * the interval.
+   */
+  uint64_t bytes_received;
+};
+
+
+/**
+ * Performance data for a transmission possibility.
+ */
+struct PerformanceData
+{
+  /**
+   * Weighted average for the RTT.
+   */
+  struct GNUNET_TIME_Relative aged_rtt;
+
+  /**
+   * Historic performance data, using a ring buffer of#GOODPUT_AGING_SLOTS
+   * entries.
+   */
+  struct TransmissionHistoryEntry the[GOODPUT_AGING_SLOTS];
+
+  /**
+   * What was the last age when we wrote to @e the? Used to clear
+   * old entries when the age advances.
+   */
+  unsigned int last_age;
+};
+
+
 /**
  * Client connected to the transport service.
  */
 struct TransportClient;
 
-
 /**
  * A neighbour that at least one communicator is connected to.
  */
 struct Neighbour;
 
-
 /**
  * Entry in our #dv_routes table, representing a (set of) distance
  * vector routes to a particular peer.
  */
 struct DistanceVector;
 
+/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
+
 /**
  * One possible hop towards a DV target.
  */
-struct DistanceVectorHop
+struct DistanceVectorHop;
+
+
+/**
+ * Context from #handle_incoming_msg().  Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
 {
 
   /**
-   * Kept in a MDLL, sorted by @e timeout.
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
    */
-  struct DistanceVectorHop *next_dv;
+  struct CommunicatorMessageContext *next;
 
   /**
-   * Kept in a MDLL, sorted by @e timeout.
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
    */
-  struct DistanceVectorHop *prev_dv;
+  struct CommunicatorMessageContext *prev;
 
   /**
-   * Kept in a MDLL.
+   * Which communicator provided us with the message.
    */
-  struct DistanceVectorHop *next_neighbour;
+  struct TransportClient *tc;
 
   /**
-   * Kept in a MDLL.
+   * Additional information for flow control and about the sender.
    */
-  struct DistanceVectorHop *prev_neighbour;
+  struct GNUNET_TRANSPORT_IncomingMessage im;
 
   /**
-   * What would be the next hop to @e target?
+   * Number of hops the message has travelled (if DV-routed).
+   * FIXME: make use of this in ACK handling!
    */
-  struct Neighbour *next_hop;
+  uint16_t total_hops;
+};
 
+
+/**
+ * A virtual link is another reachable peer that is known to CORE.  It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`.  With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink
+{
   /**
-   * Distance vector entry this hop belongs with.
+   * Identity of the peer at the other end of the link.
    */
-  struct DistanceVector *dv;
+  struct GNUNET_PeerIdentity target;
 
   /**
-   * Array of @e distance hops to the target, excluding @e next_hop.
-   * NULL if the entire path is us to @e next_hop to `target`. Allocated
-   * at the end of this struct. Excludes the target itself!
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
    */
-  const struct GNUNET_PeerIdentity *path;
+  struct CommunicatorMessageContext *cmc_head;
 
   /**
-   * At what time do we forget about this path unless we see it again
-   * while learning?
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct CommunicatorMessageContext *cmc_tail;
 
   /**
-   * For how long is the validation of this path considered
-   * valid?
-   * Set to ZERO if the path is learned by snooping on DV learn messages
-   * initiated by other peers, and to the time at which we generated the
-   * challenge for DV learn operations this peer initiated.
+   * Task scheduled to possibly notfiy core that this peer is no
+   * longer counting as confirmed.  Runs the #core_visibility_check(),
+   * which checks that some DV-path or a queue exists that is still
+   * considered confirmed.
    */
-  struct GNUNET_TIME_Absolute path_valid_until;
+  struct GNUNET_SCHEDULER_Task *visibility_task;
 
   /**
-   * Number of hops in total to the `target` (excluding @e next_hop and `target`
+   * Neighbour used by this virtual link, NULL if @e dv is used.
+   */
+  struct Neighbour *n;
+
+  /**
+   * Distance vector used by this virtual link, NULL if @e n is used.
+   */
+  struct DistanceVector *dv;
+
+  /**
+   * How many more messages can we send to core before we exhaust
+   * the receive window of CORE for this peer? If this hits zero,
+   * we must tell communicators to stop providing us more messages
+   * for this peer.  In fact, the window can go negative as we
+   * have multiple communicators, so per communicator we can go
+   * down by one into the negative range.
+   */
+  int core_recv_window;
+};
+
+
+/**
+ * Data structure kept when we are waiting for an acknowledgement.
+ */
+struct PendingAcknowledgement
+{
+
+  /**
+   * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to its pending message.
+   */
+  struct PendingAcknowledgement *next_pm;
+
+  /**
+   * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to its pending message.
+   */
+  struct PendingAcknowledgement *prev_pm;
+
+  /**
+   * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the queue that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *next_queue;
+
+  /**
+   * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the queue that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *prev_queue;
+
+  /**
+   * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the DVH that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *next_dvh;
+
+  /**
+   * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the DVH that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *prev_dvh;
+
+  /**
+   * Pointers for the DLL of all pending acknowledgements.
+   * This list is sorted by @e transmission time.  If the list gets too
+   * long, the oldest entries are discarded.
+   */
+  struct PendingAcknowledgement *next_pa;
+
+  /**
+   * Pointers for the DLL of all pending acknowledgements.
+   * This list is sorted by @e transmission time.  If the list gets too
+   * long, the oldest entries are discarded.
+   */
+  struct PendingAcknowledgement *prev_pa;
+
+  /**
+   * Unique identifier for this transmission operation.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
+
+  /**
+   * Message that was transmitted, may be NULL if the message was ACKed
+   * via another channel.
+   */
+  struct PendingMessage *pm;
+
+  /**
+   * Distance vector path chosen for this transmission, NULL if transmission
+   * was to a direct neighbour OR if the path was forgotten in the meantime.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * Queue used for transmission, NULL if the queue has been destroyed
+   * (which may happen before we get an acknowledgement).
+   */
+  struct Queue *queue;
+
+  /**
+   * Time of the transmission, for RTT calculation.
+   */
+  struct GNUNET_TIME_Absolute transmission_time;
+
+  /**
+   * Number of bytes of the original message (to calculate bandwidth).
+   */
+  uint16_t message_size;
+};
+
+
+/**
+ * One possible hop towards a DV target.
+ */
+struct DistanceVectorHop
+{
+
+  /**
+   * Kept in a MDLL, sorted by @e timeout.
+   */
+  struct DistanceVectorHop *next_dv;
+
+  /**
+   * Kept in a MDLL, sorted by @e timeout.
+   */
+  struct DistanceVectorHop *prev_dv;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct DistanceVectorHop *next_neighbour;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct DistanceVectorHop *prev_neighbour;
+
+  /**
+   * Head of MDLL of messages routed via this path.
+   */
+  struct PendingMessage *pending_msg_head;
+
+  /**
+   * Tail of MDLL of messages routed via this path.
+   */
+  struct PendingMessage *pending_msg_tail;
+
+  /**
+   * Head of DLL of PAs that used our @a path.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs that used our @a path.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
+  /**
+   * What would be the next hop to @e target?
+   */
+  struct Neighbour *next_hop;
+
+  /**
+   * Distance vector entry this hop belongs with.
+   */
+  struct DistanceVector *dv;
+
+  /**
+   * Array of @e distance hops to the target, excluding @e next_hop.
+   * NULL if the entire path is us to @e next_hop to `target`. Allocated
+   * at the end of this struct. Excludes the target itself!
+   */
+  const struct GNUNET_PeerIdentity *path;
+
+  /**
+   * At what time do we forget about this path unless we see it again
+   * while learning?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * For how long is the validation of this path considered
+   * valid?
+   * Set to ZERO if the path is learned by snooping on DV learn messages
+   * initiated by other peers, and to the time at which we generated the
+   * challenge for DV learn operations this peer initiated.
+   */
+  struct GNUNET_TIME_Absolute path_valid_until;
+
+  /**
+   * Performance data for this transmission possibility.
+   */
+  struct PerformanceData pd;
+
+  /**
+   * Number of hops in total to the `target` (excluding @e next_hop and `target`
    * itself). Thus 0 still means a distance of 2 hops (to @e next_hop and then
    * to `target`).
    */
@@ -1063,45 +1416,13 @@ struct DistanceVector
   struct GNUNET_SCHEDULER_Task *timeout_task;
 
   /**
-   * Task scheduled to possibly notfiy core that this queue is no longer
-   * counting as confirmed.  Runs the #core_queue_visibility_check().
-   */
-  struct GNUNET_SCHEDULER_Task *visibility_task;
-
-  /**
-   * Quota at which CORE is allowed to transmit to this peer
-   * (note that the value CORE should actually be told is this
-   *  value plus the respective value in `struct Neighbour`).
-   * Should match the sum of the quotas of all of the paths.
-   *
-   * FIXME: not yet set, tricky to get right given multiple paths,
-   *        many of which may be inactive! (=> Idea: measure???)
-   * FIXME: how do we set this value initially when we tell CORE?
-   *    Options: start at a minimum value or at literally zero?
-   *         (=> Current thought: clean would be zero!)
-   */
-  struct GNUNET_BANDWIDTH_Value32NBO quota_out;
-
-  /**
-   * Is one of the DV paths in this struct 'confirmed' and thus
-   * the cause for CORE to see this peer as connected? (Note that
-   * the same may apply to a `struct Neighbour` at the same time.)
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  int core_visible;
+  struct VirtualLink *link;
 };
 
 
-/**
- * A queue is a message queue provided by a communicator
- * via which we can reach a particular neighbour.
- */
-struct Queue;
-
-/**
- * Message awaiting transmission. See detailed comments below.
- */
-struct PendingMessage;
-
 /**
  * Entry identifying transmission in one of our `struct
  * Queue` which still awaits an ACK.  This is used to
@@ -1167,6 +1488,16 @@ struct Queue
    */
   struct Queue *next_client;
 
+  /**
+   * Head of DLL of PAs that used this queue.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs that used this queue.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * Head of DLL of unacked transmission requests.
    */
@@ -1194,21 +1525,10 @@ struct Queue
 
   /**
    * Task scheduled for the time when this queue can (likely) transmit the
-   * next message. Still needs to check with the @e tracker_out to be sure.
+   * next message.
    */
   struct GNUNET_SCHEDULER_Task *transmit_task;
 
-  /**
-   * Task scheduled to possibly notfiy core that this queue is no longer
-   * counting as confirmed.  Runs the #core_queue_visibility_check().
-   */
-  struct GNUNET_SCHEDULER_Task *visibility_task;
-
-  /**
-   * Our current RTT estimate for this queue.
-   */
-  struct GNUNET_TIME_Relative rtt;
-
   /**
    * How long do *we* consider this @e address to be valid?  In the past or
    * zero if we have not yet validated it.  Can be updated based on
@@ -1219,7 +1539,13 @@ struct Queue
   struct GNUNET_TIME_Absolute validated_until;
 
   /**
-   * Message ID generator for transmissions on this queue.
+   * Performance data for this queue.
+   */
+  struct PerformanceData pd;
+
+  /**
+   * Message ID generator for transmissions on this queue to the
+   * communicator.
    */
   uint64_t mid_gen;
 
@@ -1257,16 +1583,6 @@ struct Queue
    * Connection status for this queue.
    */
   enum GNUNET_TRANSPORT_ConnectionStatus cs;
-
-  /**
-   * How much outbound bandwidth do we have available for this queue?
-   */
-  struct GNUNET_BANDWIDTH_Tracker tracker_out;
-
-  /**
-   * How much inbound bandwidth do we have available for this queue?
-   */
-  struct GNUNET_BANDWIDTH_Tracker tracker_in;
 };
 
 
@@ -1277,10 +1593,10 @@ struct ReassemblyContext
 {
 
   /**
-   * Original message ID for of the message that all the
-   * fragments belong to.
+   * Original message ID for of the message that all the fragments
+   * belong to.
    */
-  struct GNUNET_ShortHashCode msg_uuid;
+  struct MessageUUIDP msg_uuid;
 
   /**
    * Which neighbour is this context for?
@@ -1313,36 +1629,12 @@ struct ReassemblyContext
    */
   struct GNUNET_TIME_Absolute reassembly_timeout;
 
-  /**
-   * Average delay of all acks in @e extra_acks and @e frag_uuid.
-   * Should be reset to zero when @e num_acks is set to 0.
-   */
-  struct GNUNET_TIME_Relative avg_ack_delay;
-
   /**
    * Time we received the last fragment.  @e avg_ack_delay must be
    * incremented by now - @e last_frag multiplied by @e num_acks.
    */
   struct GNUNET_TIME_Absolute last_frag;
 
-  /**
-   * Bitfield of up to 64 additional fragments following @e frag_uuid
-   * to be acknowledged in the next cummulative ACK.
-   */
-  uint64_t extra_acks;
-
-  /**
-   * Unique ID of the lowest fragment UUID to be acknowledged in the
-   * next cummulative ACK.  Only valid if @e num_acks > 0.
-   */
-  uint32_t frag_uuid;
-
-  /**
-   * Number of ACKs we have accumulated so far.  Reset to 0
-   * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
-   */
-  unsigned int num_acks;
-
   /**
    * How big is the message we are reassembling in total?
    */
@@ -1377,7 +1669,7 @@ struct Neighbour
    * reassembly. May be NULL if we currently have no fragments from
    * this @e pid (lazy initialization).
    */
-  struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
+  struct GNUNET_CONTAINER_MultiHashMap32 *reassembly_map;
 
   /**
    * Heap with `struct ReassemblyContext` structs for fragments under
@@ -1424,34 +1716,34 @@ struct Neighbour
   struct Queue *queue_tail;
 
   /**
-   * Task run to cleanup pending messages that have exceeded their timeout.
+   * Handle for an operation to fetch @e last_dv_learn_monotime information from
+   * the PEERSTORE, or NULL.
    */
-  struct GNUNET_SCHEDULER_Task *timeout_task;
+  struct GNUNET_PEERSTORE_IterateContext *get;
 
   /**
-   * Quota at which CORE is allowed to transmit to this peer
-   * (note that the value CORE should actually be told is this
-   *  value plus the respective value in `struct DistanceVector`).
-   * Should match the sum of the quotas of all of the queues.
-   *
-   * FIXME: not yet set, tricky to get right given multiple queues!
-   *        (=> Idea: measure???)
-   * FIXME: how do we set this value initially when we tell CORE?
-   *    Options: start at a minimum value or at literally zero?
-   *         (=> Current thought: clean would be zero!)
+   * Handle to a PEERSTORE store operation to store this @e pid's @e
+   * @e last_dv_learn_monotime.  NULL if no PEERSTORE operation is pending.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
+  /**
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+  struct VirtualLink *link;
 
   /**
-   * What is the earliest timeout of any message in @e pending_msg_tail?
+   * Latest DVLearn monotonic time seen from this peer.  Initialized only
+   * if @e dl_monotime_available is #GNUNET_YES.
    */
-  struct GNUNET_TIME_Absolute earliest_timeout;
+  struct GNUNET_TIME_Absolute last_dv_learn_monotime;
 
   /**
-   * Do we have a confirmed working queue and are thus visible to
-   * CORE?
+   * Do we have the lastest value for @e last_dv_learn_monotime from
+   * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
    */
-  int core_visible;
+  int dv_monotime_available;
 };
 
 
@@ -1582,6 +1874,28 @@ struct PendingMessage
    */
   struct PendingMessage *prev_frag;
 
+  /**
+   * Kept in a MDLL of messages using this @a dvh (if @e dvh is
+   * non-NULL).
+   */
+  struct PendingMessage *next_dvh;
+
+  /**
+   * Kept in a MDLL of messages using this @a dvh (if @e dvh is
+   * non-NULL).
+   */
+  struct PendingMessage *prev_dvh;
+
+  /**
+   * Head of DLL of PAs for this pending message.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs for this pending message.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * This message, reliability boxed. Only possibly available if @e pmt is
    * #PMT_CORE.
@@ -1589,10 +1903,17 @@ struct PendingMessage
   struct PendingMessage *bpm;
 
   /**
-   * Target of the request.
+   * Target of the request (for transmission, may not be ultimate
+   * destination!).
    */
   struct Neighbour *target;
 
+  /**
+   * Distance vector path selected for this message, or
+   * NULL if transmitted directly.
+   */
+  struct DistanceVectorHop *dvh;
+
   /**
    * Set to non-NULL value if this message is currently being given to a
    * communicator and we are awaiting that communicator's acknowledgement.
@@ -1637,12 +1958,7 @@ struct PendingMessage
    * UUID to use for this message (used for reassembly of fragments, only
    * initialized if @e msg_uuid_set is #GNUNET_YES).
    */
-  struct GNUNET_ShortHashCode msg_uuid;
-
-  /**
-   * Counter incremented per generated fragment.
-   */
-  uint32_t frag_uuidgen;
+  struct MessageUUIDP msg_uuid;
 
   /**
    * Type of the pending message.
@@ -1669,63 +1985,123 @@ struct PendingMessage
 
 
 /**
- * One of the addresses of this peer.
+ * Acknowledgement payload.
  */
-struct AddressListEntry
+struct TransportCummulativeAckPayload
 {
-
   /**
-   * Kept in a DLL.
+   * When did we receive the message we are ACKing?  Used to calculate
+   * the delay we introduced by cummulating ACKs.
    */
-  struct AddressListEntry *next;
+  struct GNUNET_TIME_Absolute receive_time;
 
   /**
-   * Kept in a DLL.
+   * UUID of a message being acknowledged.
    */
-  struct AddressListEntry *prev;
+  struct AcknowledgementUUIDP ack_uuid;
+};
 
-  /**
-   * Which communicator provides this address?
-   */
-  struct TransportClient *tc;
 
+/**
+ * Data structure in which we track acknowledgements still to
+ * be sent to the
+ */
+struct AcknowledgementCummulator
+{
   /**
-   * The actual address.
+   * Target peer for which we are accumulating ACKs here.
    */
-  const char *address;
+  struct GNUNET_PeerIdentity target;
 
   /**
-   * Current context for storing this address in the peerstore.
+   * ACK data being accumulated.  Only @e num_acks slots are valid.
    */
-  struct GNUNET_PEERSTORE_StoreContext *sc;
+  struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS];
 
   /**
-   * Task to periodically do @e st operation.
+   * Task scheduled either to transmit the cummulative ACK message,
+   * or to clean up this data structure after extended periods of
+   * inactivity (if @e num_acks is zero).
    */
-  struct GNUNET_SCHEDULER_Task *st;
+  struct GNUNET_SCHEDULER_Task *task;
 
   /**
-   * What is a typical lifetime the communicator expects this
-   * address to have? (Always from now.)
+   * When is @e task run (only used if @e num_acks is non-zero)?
    */
-  struct GNUNET_TIME_Relative expiration;
+  struct GNUNET_TIME_Absolute min_transmission_time;
 
   /**
-   * Address identifier used by the communicator.
+   * Counter to produce the `ack_counter` in the `struct
+   * TransportReliabilityAckMessage`.  Allows the receiver to detect
+   * lost ACK messages.  Incremented by @e num_acks upon transmission.
    */
-  uint32_t aid;
+  uint32_t ack_counter;
 
   /**
-   * Network type offered by this address.
+   * Number of entries used in @e ack_uuids.  Reset to 0 upon transmission.
    */
-  enum GNUNET_NetworkType nt;
+  unsigned int num_acks;
 };
 
 
 /**
- * Client connected to the transport service.
+ * One of the addresses of this peer.
  */
-struct TransportClient
+struct AddressListEntry
+{
+
+  /**
+   * Kept in a DLL.
+   */
+  struct AddressListEntry *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct AddressListEntry *prev;
+
+  /**
+   * Which communicator provides this address?
+   */
+  struct TransportClient *tc;
+
+  /**
+   * The actual address.
+   */
+  const char *address;
+
+  /**
+   * Current context for storing this address in the peerstore.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
+  /**
+   * Task to periodically do @e st operation.
+   */
+  struct GNUNET_SCHEDULER_Task *st;
+
+  /**
+   * What is a typical lifetime the communicator expects this
+   * address to have? (Always from now.)
+   */
+  struct GNUNET_TIME_Relative expiration;
+
+  /**
+   * Address identifier used by the communicator.
+   */
+  uint32_t aid;
+
+  /**
+   * Network type offered by this address.
+   */
+  enum GNUNET_NetworkType nt;
+};
+
+
+/**
+ * Client connected to the transport service.
+ */
+struct TransportClient
 {
 
   /**
@@ -1936,7 +2312,7 @@ struct ValidationState
    * (We must not rotate more often as otherwise we may discard valid answers
    * due to packet losses, latency and reorderings on the network).
    */
-  struct GNUNET_ShortHashCode challenge;
+  struct ChallengeNonceP challenge;
 
   /**
    * Claimed address of the peer.
@@ -1964,6 +2340,65 @@ struct ValidationState
 };
 
 
+/**
+ * A Backtalker is a peer sending us backchannel messages. We use this
+ * struct to detect monotonic time violations, cache ephemeral key
+ * material (to avoid repeatedly checking signatures), and to synchronize
+ * monotonic time with the PEERSTORE.
+ */
+struct Backtalker
+{
+  /**
+   * Peer this is about.
+   */
+  struct GNUNET_PeerIdentity pid;
+
+  /**
+   * Last (valid) monotonic time received from this sender.
+   */
+  struct GNUNET_TIME_Absolute monotonic_time;
+
+  /**
+   * When will this entry time out?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * Last (valid) ephemeral key received from this sender.
+   */
+  struct GNUNET_CRYPTO_EcdhePublicKey last_ephemeral;
+
+  /**
+   * Task associated with this backtalker. Can be for timeout,
+   * or other asynchronous operations.
+   */
+  struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * Communicator context waiting on this backchannel's @e get, or NULL.
+   */
+  struct CommunicatorMessageContext *cmc;
+
+  /**
+   * Handle for an operation to fetch @e monotonic_time information from the
+   * PEERSTORE, or NULL.
+   */
+  struct GNUNET_PEERSTORE_IterateContext *get;
+
+  /**
+   * Handle to a PEERSTORE store operation for this @e pid's @e
+   * monotonic_time.  NULL if no PEERSTORE operation is pending.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
+  /**
+   * Number of bytes of the original message body that follows after this
+   * struct.
+   */
+  size_t body_size;
+};
+
+
 /**
  * Head of linked list of all clients to this service.
  */
@@ -2000,6 +2435,24 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
 
+/**
+ * Map from PIDs to `struct Backtalker` entries.  A peer is
+ * a backtalker if it recently send us backchannel messages.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers;
+
+/**
+ * Map from PIDs to `struct AcknowledgementCummulator`s.
+ * Here we track the cummulative ACKs for transmission.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
+
+/**
+ * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
+ * a `struct PendingAcknowledgement`.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+
 /**
  * Map from PIDs to `struct DistanceVector` entries describing
  * known paths to the peer.
@@ -2012,6 +2465,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
 
+/**
+ * Map from PIDs to `struct VirtualLink` entries describing
+ * links CORE knows to exist.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *links;
+
 /**
  * Map from challenges to `struct LearnLaunchEntry` values.
  */
@@ -2069,6 +2528,81 @@ static struct GNUNET_SCHEDULER_Task *dvlearn_task;
  */
 static struct GNUNET_SCHEDULER_Task *validation_task;
 
+/**
+ * The most recent PA we have created, head of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_head;
+
+/**
+ * The oldest PA we have created, tail of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_tail;
+
+/**
+ * Number of entries in the #pa_head/#pa_tail DLL.  Used to
+ * limit the size of the data structure.
+ */
+static unsigned int pa_count;
+
+
+/**
+ * Get an offset into the transmission history buffer for `struct
+ * PerformanceData`.  Note that the caller must perform the required
+ * modulo #GOODPUT_AGING_SLOTS operation before indexing into the
+ * array!
+ *
+ * An 'age' lasts 15 minute slots.
+ *
+ * @return current age of the world
+ */
+static unsigned int
+get_age ()
+{
+  struct GNUNET_TIME_Absolute now;
+
+  now = GNUNET_TIME_absolute_get ();
+  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
+}
+
+
+/**
+ * Release @a pa data structure.
+ *
+ * @param pa data structure to release
+ */
+static void
+free_pending_acknowledgement (struct PendingAcknowledgement *pa)
+{
+  struct Queue *q = pa->queue;
+  struct PendingMessage *pm = pa->pm;
+  struct DistanceVectorHop *dvh = pa->dvh;
+
+  GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
+  pa_count--;
+  if (NULL != q)
+  {
+    GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
+    pa->queue = NULL;
+  }
+  if (NULL != pm)
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+    pa->queue = NULL;
+  }
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multishortmap_remove (pending_acks,
+                                                        &pa->ack_uuid.value,
+                                                        pa));
+  GNUNET_free (pa);
+}
+
 
 /**
  * Free cached ephemeral key.
@@ -2084,6 +2618,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece)
 }
 
 
+/**
+ * Free virtual link.
+ *
+ * @param vl link data to free
+ */
+static void
+free_virtual_link (struct VirtualLink *vl)
+{
+  GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+  if (NULL != vl->visibility_task)
+  {
+    GNUNET_SCHEDULER_cancel (vl->visibility_task);
+    vl->visibility_task = NULL;
+  }
+  GNUNET_break (NULL == vl->n);
+  GNUNET_break (NULL == vl->dv);
+  GNUNET_free (vl);
+}
+
+
 /**
  * Free validation state.
  *
@@ -2165,7 +2719,22 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
 {
   struct Neighbour *n = dvh->next_hop;
   struct DistanceVector *dv = dvh->dv;
+  struct PendingAcknowledgement *pa;
+  struct PendingMessage *pm;
 
+  while (NULL != (pm = dvh->pending_msg_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh,
+                                  dvh->pending_msg_head,
+                                  dvh->pending_msg_tail,
+                                  pm);
+    pm->dvh = NULL;
+  }
+  while (NULL != (pa = dvh->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+    pa->dvh = NULL;
+  }
   GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
   GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
   GNUNET_free (dvh);
@@ -2190,8 +2759,6 @@ free_dv_route (struct DistanceVector *dv)
     GNUNET_assert (
       GNUNET_YES ==
       GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
-    if (NULL != dv->visibility_task)
-      GNUNET_SCHEDULER_cancel (dv->visibility_task);
     if (NULL != dv->timeout_task)
       GNUNET_SCHEDULER_cancel (dv->timeout_task);
     GNUNET_free (dv);
@@ -2307,9 +2874,9 @@ free_reassembly_context (struct ReassemblyContext *rc)
 
   GNUNET_assert (rc == GNUNET_CONTAINER_heap_remove_node (rc->hn));
   GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
-                                                        &rc->msg_uuid,
-                                                        rc));
+                 GNUNET_CONTAINER_multihashmap32_remove (n->reassembly_map,
+                                                         rc->msg_uuid.uuid,
+                                                         rc));
   GNUNET_free (rc);
 }
 
@@ -2353,9 +2920,7 @@ reassembly_cleanup_task (void *cls)
  * @return #GNUNET_OK (continue iteration)
  */
 static int
-free_reassembly_cb (void *cls,
-                    const struct GNUNET_ShortHashCode *key,
-                    void *value)
+free_reassembly_cb (void *cls, uint32_t key, void *value)
 {
   struct ReassemblyContext *rc = value;
 
@@ -2381,14 +2946,12 @@ free_neighbour (struct Neighbour *neighbour)
                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
                                                        &neighbour->pid,
                                                        neighbour));
-  if (NULL != neighbour->timeout_task)
-    GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
   if (NULL != neighbour->reassembly_map)
   {
-    GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
-                                            &free_reassembly_cb,
-                                            NULL);
-    GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
+    GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
+                                             &free_reassembly_cb,
+                                             NULL);
+    GNUNET_CONTAINER_multihashmap32_destroy (neighbour->reassembly_map);
     neighbour->reassembly_map = NULL;
     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
     neighbour->reassembly_heap = NULL;
@@ -2402,7 +2965,20 @@ free_neighbour (struct Neighbour *neighbour)
       free_dv_route (dv);
   }
   if (NULL != neighbour->reassembly_timeout_task)
+  {
     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
+    neighbour->reassembly_timeout_task = NULL;
+  }
+  if (NULL != neighbour->get)
+  {
+    GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
+    neighbour->get = NULL;
+  }
+  if (NULL != neighbour->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (neighbour->sc);
+    neighbour->sc = NULL;
+  }
   GNUNET_free (neighbour);
 }
 
@@ -2412,19 +2988,16 @@ free_neighbour (struct Neighbour *neighbour)
  *
  * @param tc client to inform (must be CORE client)
  * @param pid peer the connection is for
- * @param quota_out current quota for the peer
  */
 static void
 core_send_connect_info (struct TransportClient *tc,
-                        const struct GNUNET_PeerIdentity *pid,
-                        struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+                        const struct GNUNET_PeerIdentity *pid)
 {
   struct GNUNET_MQ_Envelope *env;
   struct ConnectInfoMessage *cim;
 
   GNUNET_assert (CT_CORE == tc->type);
   env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-  cim->quota_out = quota_out;
   cim->id = *pid;
   GNUNET_MQ_send (tc->mq, env);
 }
@@ -2434,17 +3007,18 @@ core_send_connect_info (struct TransportClient *tc,
  * Send message to CORE clients that we gained a connection
  *
  * @param pid peer the queue was for
- * @param quota_out current quota for the peer
  */
 static void
-cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
-                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about connection to %s\n",
+              GNUNET_i2s (pid));
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
     if (CT_CORE != tc->type)
       continue;
-    core_send_connect_info (tc, pid, quota_out);
+    core_send_connect_info (tc, pid);
   }
 }
 
@@ -2457,6 +3031,9 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
 static void
 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about disconnect from %s\n",
+              GNUNET_i2s (pid));
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
     struct GNUNET_MQ_Envelope *env;
@@ -2472,10 +3049,9 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
 
 
 /**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * We believe we are ready to transmit a message on a queue. Gives the
+ * message to the communicator for transmission (updating the tracker,
+ * and re-scheduling itself if applicable).
  *
  * @param cls the `struct Queue` to process transmissions for
  */
@@ -2501,7 +3077,6 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
   struct Neighbour *n = queue->neighbour;
   struct PendingMessage *pm = n->pending_msg_head;
   struct GNUNET_TIME_Relative out_delay;
-  unsigned int wsize;
 
   GNUNET_assert (NULL != pm);
   if (queue->tc->details.communicator.total_queue_length >=
@@ -2523,14 +3098,16 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
     return;
   }
 
-  wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */
-                            : queue->mtu;
-  out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize);
-  out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (
-                                          pm->next_attempt),
-                                        out_delay);
+  out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt);
   if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
+  {
+    GNUNET_log (
+      GNUNET_ERROR_TYPE_DEBUG,
+      "Schedule transmission on queue %llu of %s decides to run immediately\n",
+      (unsigned long long) queue->qid,
+      GNUNET_i2s (&n->pid));
     return; /* we should run immediately! */
+  }
   /* queue has changed since we were scheduled, reschedule again */
   queue->transmit_task =
     GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
@@ -2548,13 +3125,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
 
 
 /**
- * Check whether the CORE visibility of @a n changed. If so,
- * check whether we need to notify CORE.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
  *
- * @param n neighbour to perform the check for
+ * @param cls a `struct VirtualLink`
  */
 static void
-update_neighbour_core_visibility (struct Neighbour *n);
+check_link_down (void *cls)
+{
+  struct VirtualLink *vl = cls;
+  struct DistanceVector *dv = vl->dv;
+  struct Neighbour *n = vl->n;
+  struct GNUNET_TIME_Absolute dvh_timeout;
+  struct GNUNET_TIME_Absolute q_timeout;
+
+  vl->visibility_task = NULL;
+  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+    dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
+  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+    vl->dv = NULL;
+  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+    q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+    vl->n = NULL;
+  if ((NULL == vl->n) && (NULL == vl->dv))
+  {
+    cores_send_disconnect_info (&dv->target);
+    free_virtual_link (vl);
+    return;
+  }
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
+                             &check_link_down,
+                             vl);
+}
 
 
 /**
@@ -2571,17 +3178,20 @@ free_queue (struct Queue *queue)
                             .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
   struct QueueEntry *qe;
   int maxxed;
+  struct PendingAcknowledgement *pa;
+  struct VirtualLink *vl;
 
   if (NULL != queue->transmit_task)
   {
     GNUNET_SCHEDULER_cancel (queue->transmit_task);
     queue->transmit_task = NULL;
   }
-  if (NULL != queue->visibility_task)
+  while (NULL != (pa = queue->pa_head))
   {
-    GNUNET_SCHEDULER_cancel (queue->visibility_task);
-    queue->visibility_task = NULL;
+    GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
+    pa->queue = NULL;
   }
+
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                 neighbour->queue_head,
                                 neighbour->queue_tail,
@@ -2619,13 +3229,14 @@ free_queue (struct Queue *queue)
       schedule_transmit_on_queue (s, GNUNET_NO);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
-  GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
-  GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
   GNUNET_free (queue);
 
-  update_neighbour_core_visibility (neighbour);
-  cores_send_disconnect_info (&neighbour->pid);
-
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+  if ((NULL != vl) && (neighbour == vl->n))
+  {
+    GNUNET_SCHEDULER_cancel (vl->visibility_task);
+    check_link_down (vl);
+  }
   if (NULL == neighbour->queue_head)
   {
     free_neighbour (neighbour);
@@ -2704,6 +3315,7 @@ client_disconnect_cb (void *cls,
   struct TransportClient *tc = app_ctx;
 
   (void) cls;
+  (void) client;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client %p disconnected, cleaning up.\n",
               tc);
@@ -2764,9 +3376,12 @@ notify_client_connect_info (void *cls,
                             void *value)
 {
   struct TransportClient *tc = cls;
-  struct Neighbour *neighbour = value;
 
-  core_send_connect_info (tc, pid, neighbour->quota_out);
+  (void) value;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Telling new CORE client about existing connection to %s\n",
+              GNUNET_i2s (pid));
+  core_send_connect_info (tc, pid);
   return GNUNET_OK;
 }
 
@@ -2801,6 +3416,9 @@ handle_client_start (void *cls, const struct StartMessage *start)
     return;
   }
   tc->type = CT_CORE;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "New CORE client with PID %s registered\n",
+              GNUNET_i2s (&start->self));
   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
                                          &notify_client_connect_info,
                                          tc);
@@ -2844,6 +3462,9 @@ check_client_send (void *cls, const struct OutboundMessage *obm)
 
 /**
  * Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
  *
  * @param root root of the tree to free
  */
@@ -2854,7 +3475,14 @@ free_fragment_tree (struct PendingMessage *root)
 
   while (NULL != (frag = root->head_frag))
   {
+    struct PendingAcknowledgement *pa;
+
     free_fragment_tree (frag);
+    while (NULL != (pa = frag->pa_head))
+    {
+      GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+      pa->pm = NULL;
+    }
     GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
     GNUNET_free (frag);
   }
@@ -2873,6 +3501,8 @@ free_pending_message (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
+  struct DistanceVectorHop *dvh = pm->dvh;
+  struct PendingAcknowledgement *pa;
 
   if (NULL != tc)
   {
@@ -2881,10 +3511,23 @@ free_pending_message (struct PendingMessage *pm)
                                   tc->details.core.pending_msg_tail,
                                   pm);
   }
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh,
+                                  dvh->pending_msg_head,
+                                  dvh->pending_msg_tail,
+                                  pm);
+  }
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                 target->pending_msg_head,
                                 target->pending_msg_tail,
                                 pm);
+  while (NULL != (pa = pm->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+
   free_fragment_tree (pm);
   if (NULL != pm->qe)
   {
@@ -2897,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm)
 
 
 /**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request.  Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
  *
  * @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- *          for transmission failure
- * @param bytes_physical amount of bandwidth consumed
  */
 static void
-client_send_response (struct PendingMessage *pm,
-                      int success,
-                      uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
@@ -2921,10 +3557,10 @@ client_send_response (struct PendingMessage *pm,
   if (NULL != tc)
   {
     env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->success = htonl ((uint32_t) success);
-    som->bytes_msg = htons (pm->bytes_msg);
-    som->bytes_physical = htonl (bytes_physical);
     som->peer = target->pid;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Confirming transmission to %s\n",
+                GNUNET_i2s (&pm->target->pid));
     GNUNET_MQ_send (tc->mq, env);
   }
   free_pending_message (pm);
@@ -2932,79 +3568,182 @@ client_send_response (struct PendingMessage *pm,
 
 
 /**
- * Checks the message queue for a neighbour for messages that have timed
- * out and purges them.
+ * Create a DV Box message.
  *
- * @param cls a `struct Neighbour`
- */
-static void
-check_queue_timeouts (void *cls)
-{
-  struct Neighbour *n = cls;
-  struct PendingMessage *pm;
-  struct GNUNET_TIME_Absolute now;
-  struct GNUNET_TIME_Absolute earliest_timeout;
+ * @param total_hops how many hops did the message take so far
+ * @param num_hops length of the @a hops array
+ * @param origin origin of the message
+ * @param hops next peer(s) to the destination, including destination
+ * @param payload payload of the box
+ * @param payload_size number of bytes in @a payload
+ * @return boxed message (caller must #GNUNET_free() it).
+ */
+static struct TransportDVBoxMessage *
+create_dv_box (uint16_t total_hops,
+               const struct GNUNET_PeerIdentity *origin,
+               const struct GNUNET_PeerIdentity *target,
+               uint16_t num_hops,
+               const struct GNUNET_PeerIdentity *hops,
+               const void *payload,
+               uint16_t payload_size)
+{
+  struct TransportDVBoxMessage *dvb;
+  struct GNUNET_PeerIdentity *dhops;
 
-  n->timeout_task = NULL;
-  earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
-  now = GNUNET_TIME_absolute_get ();
-  for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm)
+  GNUNET_assert (UINT16_MAX <
+                 sizeof (struct TransportDVBoxMessage) +
+                   sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
+                   payload_size);
+  dvb = GNUNET_malloc (sizeof (struct TransportDVBoxMessage) +
+                       sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
+                       payload_size);
+  dvb->header.size =
+    htons (sizeof (struct TransportDVBoxMessage) +
+           sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) + payload_size);
+  dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
+  dvb->total_hops = htons (total_hops);
+  dvb->num_hops = htons (num_hops + 1);
+  dvb->origin = *origin;
+  dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
+  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
+  dhops[num_hops] = *target;
+  memcpy (&dhops[num_hops + 1], payload, payload_size);
+
+  if (GNUNET_EXTRA_LOGGING > 0)
   {
-    pm = pos->next_neighbour;
-    if (pos->timeout.abs_value_us <= now.abs_value_us)
+    char *path;
+
+    path = GNUNET_strdup (GNUNET_i2s (&dvb->origin));
+    for (unsigned int i = 0; i <= num_hops; i++)
     {
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# messages dropped (timeout before confirmation)",
-                                1,
-                                GNUNET_NO);
-      client_send_response (pm, GNUNET_NO, 0);
-      continue;
+      char *tmp;
+
+      GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
+      GNUNET_free (path);
+      path = tmp;
     }
-    earliest_timeout =
-      GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Creating DVBox for %u bytes of payload via %s\n",
+                (unsigned int) payload_size,
+                path);
+    GNUNET_free (path);
   }
-  n->earliest_timeout = earliest_timeout;
-  if (NULL != n->pending_msg_head)
-    n->timeout_task =
-      GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
+
+  return dvb;
 }
 
 
 /**
- * Client asked for transmission to a peer.  Process the request.
+ * Pick @a hops_array_length random DV paths satisfying @a options
  *
- * @param cls the client
- * @param obm the send message that was sent
+ * @param dv data structure to pick paths from
+ * @param options constraints to satisfy
+ * @param hops_array[out] set to the result
+ * @param hops_array_length length of the @a hops_array
+ * @return number of entries set in @a hops_array
  */
-static void
-handle_client_send (void *cls, const struct OutboundMessage *obm)
+static unsigned int
+pick_random_dv_hops (const struct DistanceVector *dv,
+                     enum RouteMessageOptions options,
+                     struct DistanceVectorHop **hops_array,
+                     unsigned int hops_array_length)
 {
-  struct TransportClient *tc = cls;
-  struct PendingMessage *pm;
-  const struct GNUNET_MessageHeader *obmm;
-  struct Neighbour *target;
-  uint32_t bytes_msg;
-  int was_empty;
+  uint64_t choices[hops_array_length];
+  uint64_t num_dv;
+  unsigned int dv_count;
 
-  GNUNET_assert (CT_CORE == tc->type);
-  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
-  bytes_msg = ntohs (obmm->size);
-  target = lookup_neighbour (&obm->peer);
-  if (NULL == target)
+  /* Pick random vectors, but weighted by distance, giving more weight
+     to shorter vectors */
+  num_dv = 0;
+  dv_count = 0;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
   {
-    /* Failure: don't have this peer as a neighbour (anymore).
-       Might have gone down asynchronously, so this is NOT
-       a protocol violation by CORE. Still count the event,
-       as this should be rare. */
-    struct GNUNET_MQ_Envelope *env;
-    struct SendOkMessage *som;
-
-    env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->success = htonl (GNUNET_SYSERR);
-    som->bytes_msg = htonl (bytes_msg);
-    som->bytes_physical = htonl (0);
-    som->peer = obm->peer;
-    GNUNET_MQ_send (tc->mq, env);
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
+        (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
+           .rel_value_us == 0))
+      continue; /* pos unconfirmed and confirmed required */
+    num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
+    dv_count++;
+  }
+  if (0 == dv_count)
+    return 0;
+  if (dv_count <= hops_array_length)
+  {
+    dv_count = 0;
+    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+         pos = pos->next_dv)
+      hops_array[dv_count++] = pos;
+    return dv_count;
+  }
+  for (unsigned int i = 0; i < hops_array_length; i++)
+  {
+    int ok = GNUNET_NO;
+    while (GNUNET_NO == ok)
+    {
+      choices[i] =
+        GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
+      ok = GNUNET_YES;
+      for (unsigned int j = 0; j < i; j++)
+        if (choices[i] == choices[j])
+        {
+          ok = GNUNET_NO;
+          break;
+        }
+    }
+  }
+  dv_count = 0;
+  num_dv = 0;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+  {
+    uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
+
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
+        (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
+           .rel_value_us == 0))
+      continue; /* pos unconfirmed and confirmed required */
+    for (unsigned int i = 0; i < hops_array_length; i++)
+      if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
+        hops_array[dv_count++] = pos;
+    num_dv += delta;
+  }
+  return dv_count;
+}
+
+
+/**
+ * Client asked for transmission to a peer.  Process the request.
+ *
+ * @param cls the client
+ * @param obm the send message that was sent
+ */
+static void
+handle_client_send (void *cls, const struct OutboundMessage *obm)
+{
+  struct TransportClient *tc = cls;
+  struct PendingMessage *pm;
+  const struct GNUNET_MessageHeader *obmm;
+  struct Neighbour *target;
+  struct DistanceVector *dv;
+  struct DistanceVectorHop *dvh;
+  uint32_t bytes_msg;
+  int was_empty;
+  const void *payload;
+  size_t payload_size;
+  struct TransportDVBoxMessage *dvb;
+  struct VirtualLink *vl;
+
+  GNUNET_assert (CT_CORE == tc->type);
+  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+  bytes_msg = ntohs (obmm->size);
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+  if (NULL == vl)
+  {
+    /* Failure: don't have this peer as a neighbour (anymore).
+       Might have gone down asynchronously, so this is NOT
+       a protocol violation by CORE. Still count the event,
+       as this should be rare. */
     GNUNET_SERVICE_client_continue (tc->client);
     GNUNET_STATISTICS_update (GST_stats,
                               "# messages dropped (neighbour unknown)",
@@ -3012,14 +3751,59 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                               GNUNET_NO);
     return;
   }
+  target = lookup_neighbour (&obm->peer);
+  if (NULL == target)
+    dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
+  else
+    dv = NULL;
+  GNUNET_assert ((NULL != target) || (NULL != dv));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending %u bytes to %s using %s\n",
+              bytes_msg,
+              GNUNET_i2s (&obm->peer),
+              (NULL == target) ? "distance vector path" : "direct queue");
+  if (NULL == target)
+  {
+    unsigned int res;
+    struct DistanceVectorHop *dvh;
+
+    res = pick_random_dv_hops (dv, RMO_NONE, &dvh, 1);
+    GNUNET_assert (1 == res);
+    target = dvh->next_hop;
+    dvb = create_dv_box (0,
+                         &GST_my_identity,
+                         &obm->peer,
+                         dvh->distance,
+                         dvh->path,
+                         &obm[1],
+                         bytes_msg);
+    payload = dvb;
+    payload_size = ntohs (dvb->header.size);
+  }
+  else
+  {
+    dvh = NULL;
+    dvb = NULL;
+    payload = &obm[1];
+    payload_size = bytes_msg;
+  }
+
   was_empty = (NULL == target->pending_msg_head);
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + payload_size);
   pm->client = tc;
   pm->target = target;
-  pm->bytes_msg = bytes_msg;
-  pm->timeout =
-    GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
-  memcpy (&pm[1], &obm[1], bytes_msg);
+  pm->bytes_msg = payload_size;
+  memcpy (&pm[1], payload, payload_size);
+  GNUNET_free_non_null (dvb);
+  dvb = NULL;
+  pm->dvh = dvh;
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_insert (dvh,
+                                  dvh->pending_msg_head,
+                                  dvh->pending_msg_tail,
+                                  pm);
+  }
   GNUNET_CONTAINER_MDLL_insert (neighbour,
                                 target->pending_msg_head,
                                 target->pending_msg_tail,
@@ -3028,15 +3812,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                                 tc->details.core.pending_msg_head,
                                 tc->details.core.pending_msg_tail,
                                 pm);
-  if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
-  {
-    target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
-    if (NULL != target->timeout_task)
-      GNUNET_SCHEDULER_cancel (target->timeout_task);
-    target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
-                                                    &check_queue_timeouts,
-                                                    target);
-  }
   if (! was_empty)
     return; /* all queues must already be busy */
   for (struct Queue *queue = target->queue_head; NULL != queue;
@@ -3044,8 +3819,14 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
   {
     /* try transmission on any queue that is idle */
     if (NULL == queue->transmit_task)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Queue %llu to %s is idle, triggering transmission\n",
+                  (unsigned long long) queue->qid,
+                  GNUNET_i2s (&queue->neighbour->pid));
       queue->transmit_task =
         GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
+    }
   }
 }
 
@@ -3078,6 +3859,77 @@ check_communicator_available (
 }
 
 
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+  if (0 != ntohl (cmc->im.fc_on))
+  {
+    /* send ACK when done to communicator for flow control! */
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+    ack->reserved = htonl (0);
+    ack->fc_id = cmc->im.fc_id;
+    ack->sender = cmc->im.sender;
+    GNUNET_MQ_send (cmc->tc->mq, env);
+  }
+  GNUNET_SERVICE_client_continue (cmc->tc->client);
+  GNUNET_free (cmc);
+}
+
+
+/**
+ * Client confirms that it is done handling message(s) to a particular
+ * peer. We may now provide more messages to CORE for this peer.
+ *
+ * Notifies the respective queues that more messages can now be received.
+ *
+ * @param cls the client
+ * @param rom the message that was sent
+ */
+static void
+handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
+{
+  struct TransportClient *tc = cls;
+  struct VirtualLink *vl;
+  uint32_t delta;
+  struct CommunicatorMessageContext *cmc;
+
+  if (CT_CORE != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+  if (NULL == vl)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# RECV_OK dropped: virtual link unknown",
+                              1,
+                              GNUNET_NO);
+    GNUNET_SERVICE_client_continue (tc->client);
+    return;
+  }
+  delta = ntohl (rom->increase_window_delta);
+  vl->core_recv_window += delta;
+  if (vl->core_recv_window <= 0)
+    return;
+  /* resume communicators */
+  while (NULL != (cmc = vl->cmc_tail))
+  {
+    GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+    finish_cmc_handling (cmc);
+  }
+}
+
+
 /**
  * Communicator started.  Process the request.
  *
@@ -3094,11 +3946,18 @@ handle_communicator_available (
 
   size = ntohs (cam->header.size) - sizeof (*cam);
   if (0 == size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Receive-only communicator connected\n");
     return; /* receive-only communicator */
+  }
   tc->details.communicator.address_prefix =
     GNUNET_strdup ((const char *) &cam[1]);
   tc->details.communicator.cc =
     (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Communicator with prefix `%s' connected\n",
+              tc->details.communicator.address_prefix);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -3122,9 +3981,9 @@ check_communicator_backchannel (
 
   (void) cls;
   msize = ntohs (cb->header.size) - sizeof (*cb);
-  if (UINT16_MAX - msize >
+  if (((size_t) (UINT16_MAX - msize)) >
       sizeof (struct TransportBackchannelEncapsulationMessage) +
-        sizeof (struct TransportBackchannelRequestPayload))
+        sizeof (struct TransportBackchannelRequestPayloadP))
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
@@ -3178,24 +4037,24 @@ expire_ephemerals (void *cls)
 
 
 /**
- * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
- * one, cache it and return it.
+ * Lookup ephemeral key in our #ephemeral_map. If no valid one exists,
+ * generate one, cache it and return it.
  *
  * @param pid peer to look up ephemeral for
  * @param private_key[out] set to the private key
  * @param ephemeral_key[out] set to the key
  * @param ephemeral_sender_sig[out] set to the signature
- * @param ephemeral_validity[out] set to the validity expiration time
+ * @param monotime[out] set to the monotime used for the signature
  */
 static void
 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
-                  struct GNUNET_TIME_Absolute *ephemeral_validity)
+                  struct GNUNET_TIME_Absolute *monotime)
 {
   struct EphemeralCacheEntry *ece;
-  struct EphemeralConfirmation ec;
+  struct EphemeralConfirmationPS ec;
 
   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map, pid);
   if ((NULL != ece) &&
@@ -3209,9 +4068,9 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
   {
     ece = GNUNET_new (struct EphemeralCacheEntry);
     ece->target = *pid;
+    ece->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
     ece->ephemeral_validity =
-      GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
-                                EPHEMERAL_VALIDITY);
+      GNUNET_TIME_absolute_add (ece->monotime, EPHEMERAL_VALIDITY);
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, &ece->ephemeral_key);
@@ -3240,7 +4099,7 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
   *private_key = ece->private_key;
   *ephemeral_key = ece->ephemeral_key;
   *ephemeral_sender_sig = ece->sender_sig;
-  *ephemeral_validity = ece->ephemeral_validity;
+  *monotime = ece->monotime;
 }
 
 
@@ -3263,6 +4122,11 @@ queue_send_msg (struct Queue *queue,
   struct GNUNET_TRANSPORT_SendMessageTo *smt;
   struct GNUNET_MQ_Envelope *env;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Queueing %u bytes of payload for transmission on queue %llu to %s\n",
+              (unsigned int) payload_size,
+              (unsigned long long) queue->qid,
+              GNUNET_i2s (&queue->neighbour->pid));
   env = GNUNET_MQ_msg_extra (smt,
                              payload_size,
                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
@@ -3292,40 +4156,7 @@ queue_send_msg (struct Queue *queue,
 }
 
 
-/**
- * Which transmission options are allowable for transmission?
- * Interpreted bit-wise!
- */
-enum RouteMessageOptions
-{
-  /**
-   * Only confirmed, non-DV direct neighbours.
-   */
-  RMO_NONE = 0,
-
-  /**
-   * We are allowed to use DV routing for this @a hdr
-   */
-  RMO_DV_ALLOWED = 1,
-
-  /**
-   * We are allowed to use unconfirmed queues or DV routes for this message
-   */
-  RMO_UNCONFIRMED_ALLOWED = 2,
-
-  /**
-   * Reliable and unreliable, DV and non-DV are all acceptable.
-   */
-  RMO_ANYTHING_GOES = (RMO_DV_ALLOWED | RMO_UNCONFIRMED_ALLOWED),
-
-  /**
-   * If we have multiple choices, it is OK to send this message
-   * over multiple channels at the same time to improve loss tolerance.
-   * (We do at most 2 transmissions.)
-   */
-  RMO_REDUNDANT = 4
-};
-
+// FIXME: improve logging after this point!
 
 /**
  * Pick a queue of @a n under constraints @a options and schedule
@@ -3356,20 +4187,18 @@ route_via_neighbour (const struct Neighbour *n,
   for (struct Queue *pos = n->queue_head; NULL != pos;
        pos = pos->next_neighbour)
   {
-    /* Count the queue with the visibility task in all cases, as
-       otherwise we may end up with no queues just because the
-       time for the visibility task just expired but the scheduler
-       just ran this task first */
     if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
-        (pos->validated_until.abs_value_us > now.abs_value_us) ||
-        (NULL != pos->visibility_task))
+        (pos->validated_until.abs_value_us > now.abs_value_us))
       candidates++;
   }
   if (0 == candidates)
   {
-    /* Given that we above check for pos->visibility task,
-       this should be strictly impossible. */
-    GNUNET_break (0);
+    /* This can happen rarely if the last confirmed queue timed
+       out just as we were beginning to process this message. */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# route selection failed (all no valid queue)",
+                              1,
+                              GNUNET_NO);
     return;
   }
   sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
@@ -3381,12 +4210,8 @@ route_via_neighbour (const struct Neighbour *n,
   for (struct Queue *pos = n->queue_head; NULL != pos;
        pos = pos->next_neighbour)
   {
-    /* Count the queue with the visibility task in all cases, as
-       otherwise we may end up with no queues just because the
-       time for the visibility task just expired but the scheduler
-       just ran this task first */
-    if ((pos->validated_until.abs_value_us > now.abs_value_us) ||
-        (NULL != pos->visibility_task))
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+        (pos->validated_until.abs_value_us > now.abs_value_us))
     {
       if ((sel1 == candidates) || (sel2 == candidates))
         queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
@@ -3410,22 +4235,17 @@ forward_via_dvh (const struct DistanceVectorHop *dvh,
                  const struct GNUNET_MessageHeader *payload,
                  enum RouteMessageOptions options)
 {
-  uint16_t mlen = ntohs (payload->size);
-  char boxram[sizeof (struct TransportDVBox) +
-              (dvh->distance + 1) * sizeof (struct GNUNET_PeerIdentity) +
-              mlen] GNUNET_ALIGN;
-  struct TransportDVBox *box = (struct TransportDVBox *) boxram;
-  struct GNUNET_PeerIdentity *path = (struct GNUNET_PeerIdentity *) &box[1];
-
-  box->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
-  box->header.size = htons (sizeof (boxram));
-  box->total_hops = htons (0);
-  box->num_hops = htons (dvh->distance + 1);
-  box->origin = GST_my_identity;
-  memcpy (path, dvh->path, dvh->distance * sizeof (struct GNUNET_PeerIdentity));
-  path[dvh->distance] = dvh->dv->target;
-  memcpy (&path[dvh->distance + 1], payload, mlen);
-  route_via_neighbour (dvh->next_hop, &box->header, options);
+  struct TransportDVBoxMessage *dvb;
+
+  dvb = create_dv_box (0,
+                       &GST_my_identity,
+                       &dvh->dv->target,
+                       dvh->distance,
+                       dvh->path,
+                       payload,
+                       ntohs (payload->size));
+  route_via_neighbour (dvh->next_hop, &dvb->header, options);
+  GNUNET_free (dvb);
 }
 
 
@@ -3443,52 +4263,15 @@ route_via_dv (const struct DistanceVector *dv,
               const struct GNUNET_MessageHeader *hdr,
               enum RouteMessageOptions options)
 {
-  struct DistanceVectorHop *h1;
-  struct DistanceVectorHop *h2;
-  uint64_t num_dv;
-  uint64_t choice1;
-  uint64_t choice2;
-
-  /* Pick random vectors, but weighted by distance, giving more weight
-     to shorter vectors */
-  num_dv = 0;
-  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-       pos = pos->next_dv)
-  {
-    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
-        (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
-           .rel_value_us == 0))
-      continue; /* pos unconfirmed and confirmed required */
-    num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
-  }
-  if (0 == num_dv)
-  {
-    GNUNET_break (0);
-    return;
-  }
-  choice1 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
-  choice2 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
-  num_dv = 0;
-  h1 = NULL;
-  h2 = NULL;
-  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-       pos = pos->next_dv)
-  {
-    uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
+  struct DistanceVectorHop *hops[2];
+  unsigned int res;
 
-    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
-        (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
-           .rel_value_us == 0))
-      continue; /* pos unconfirmed and confirmed required */
-    if ((num_dv <= choice1) && (num_dv + delta > choice1))
-      h1 = pos;
-    if ((num_dv <= choice2) && (num_dv + delta > choice2))
-      h2 = pos;
-    num_dv += delta;
-  }
-  forward_via_dvh (h1, hdr, options & (~RMO_REDUNDANT));
-  if (0 == (options & RMO_REDUNDANT))
-    forward_via_dvh (h2, hdr, options & (~RMO_REDUNDANT));
+  res = pick_random_dv_hops (dv,
+                             options,
+                             hops,
+                             (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
+  for (unsigned int i = 0; i < res; i++)
+    forward_via_dvh (hops[i], hdr, options & (~RMO_REDUNDANT));
 }
 
 
@@ -3505,21 +4288,21 @@ route_message (const struct GNUNET_PeerIdentity *target,
                struct GNUNET_MessageHeader *hdr,
                enum RouteMessageOptions options)
 {
+  struct VirtualLink *vl;
   struct Neighbour *n;
   struct DistanceVector *dv;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, target);
-  dv = (0 != (options & RMO_DV_ALLOWED))
-         ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target)
-         : NULL;
+  vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+  n = vl->n;
+  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
   if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
   {
     /* if confirmed is required, and we do not have anything
        confirmed, drop respective options */
-    if ((NULL != n) && (GNUNET_NO == n->core_visible))
-      n = NULL;
-    if ((NULL != dv) && (GNUNET_NO == dv->core_visible))
-      dv = NULL;
+    if (NULL == n)
+      n = lookup_neighbour (target);
+    if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+      dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
   }
   if ((NULL == n) && (NULL == dv))
   {
@@ -3589,6 +4372,14 @@ struct BackchannelKeyState
 };
 
 
+/**
+ * Given the key material in @a km and the initialization vector
+ * @a iv, setup the key material for the backchannel in @a key.
+ *
+ * @param km raw master secret
+ * @param iv initialization vector
+ * @param key[out] symmetric cipher and HMAC state to generate
+ */
 static void
 bc_setup_key_state_from_km (const struct GNUNET_HashCode *km,
                             const struct GNUNET_ShortHashCode *iv,
@@ -3750,16 +4541,16 @@ handle_communicator_backchannel (
 {
   struct TransportClient *tc = cls;
   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
-  struct GNUNET_TIME_Absolute ephemeral_validity;
+  struct GNUNET_TIME_Absolute monotime;
   struct TransportBackchannelEncapsulationMessage *enc;
-  struct TransportBackchannelRequestPayload ppay;
+  struct TransportBackchannelRequestPayloadP ppay;
   struct BackchannelKeyState key;
   char *mpos;
   uint16_t msize;
 
   /* encapsulate and encrypt message */
   msize = ntohs (cb->header.size) - sizeof (*cb) +
-          sizeof (struct TransportBackchannelRequestPayload);
+          sizeof (struct TransportBackchannelRequestPayloadP);
   enc = GNUNET_malloc (sizeof (*enc) + msize);
   enc->header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
@@ -3769,14 +4560,12 @@ handle_communicator_backchannel (
                     &private_key,
                     &enc->ephemeral_key,
                     &ppay.sender_sig,
-                    &ephemeral_validity);
+                    &monotime);
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &enc->iv,
                               sizeof (enc->iv));
   dh_key_derive_eph_pid (&private_key, &cb->pid, &enc->iv, &key);
-  ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
-  ppay.monotonic_time =
-    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
+  ppay.monotonic_time = GNUNET_TIME_absolute_hton (monotime);
   mpos = (char *) &enc[1];
   bc_encrypt (&key, &ppay, mpos, sizeof (ppay));
   bc_encrypt (&key,
@@ -3957,30 +4746,6 @@ handle_del_address (void *cls,
 }
 
 
-/**
- * Context from #handle_incoming_msg().  Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
-  /**
-   * Which communicator provided us with the message.
-   */
-  struct TransportClient *tc;
-
-  /**
-   * Additional information for flow control and about the sender.
-   */
-  struct GNUNET_TRANSPORT_IncomingMessage im;
-
-  /**
-   * Number of hops the message has travelled (if DV-routed).
-   * FIXME: make use of this in ACK handling!
-   */
-  uint16_t total_hops;
-};
-
-
 /**
  * Given an inbound message @a msg from a communicator @a cmc,
  * demultiplex it based on the type calling the right handler.
@@ -3994,52 +4759,47 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
 
 
 /**
- * Send ACK to communicator (if requested) and free @a cmc.
+ * Communicator gave us an unencapsulated message to pass as-is to
+ * CORE.  Process the request.
  *
- * @param cmc context for which we are done handling the message
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param mh the message that was received
  */
 static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  if (0 != ntohl (cmc->im.fc_on))
+  struct CommunicatorMessageContext *cmc = cls;
+  struct VirtualLink *vl;
+  uint16_t size = ntohs (mh->size);
+
+  if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
+      (size < sizeof (struct GNUNET_MessageHeader)))
   {
-    /* send ACK when done to communicator for flow control! */
-    struct GNUNET_MQ_Envelope *env;
-    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
 
-    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
-    ack->reserved = htonl (0);
-    ack->fc_id = cmc->im.fc_id;
-    ack->sender = cmc->im.sender;
-    GNUNET_MQ_send (cmc->tc->mq, env);
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    GNUNET_SERVICE_client_drop (client);
+    return;
   }
-  GNUNET_SERVICE_client_continue (cmc->tc->client);
-  GNUNET_free (cmc);
-}
-
-
-/**
- * Communicator gave us an unencapsulated message to pass as-is to
- * CORE.  Process the request.
- *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param mh the message that was received
- */
-static void
-handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
-{
-  struct CommunicatorMessageContext *cmc = cls;
-  uint16_t size = ntohs (mh->size);
-
-  if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
-      (size < sizeof (struct GNUNET_MessageHeader)))
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+  if (NULL == vl)
   {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+    /* FIXME: sender is giving us messages for CORE but we don't have
+       the link up yet! I *suspect* this can happen right now (i.e.
+       sender has verified us, but we didn't verify sender), but if
+       we pass this on, CORE would be confused (link down, messages
+       arrive).  We should investigate more if this happens often,
+       or in a persistent manner, and possibly do "something" about
+       it. Thus logging as error for now. */
+    GNUNET_break_op (0);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (virtual link still down)",
+                              1,
+                              GNUNET_NO);
 
-    GNUNET_break (0);
     finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
     return;
   }
   /* Forward to all CORE clients */
@@ -4055,10 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
   }
-  /* FIXME: consider doing this _only_ once the message
-     was drained from the CORE MQs to extend flow control to CORE!
-     (basically, increment counter in cmc, decrement on MQ send continuation! */
-  finish_cmc_handling (cmc);
+  vl->core_recv_window--;
+  if (vl->core_recv_window > 0)
+  {
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* Wait with calling #finish_cmc_handling(cmc) until the message
+     was processed by CORE MQs (for CORE flow control)! */
+  GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
 }
 
 
@@ -4070,11 +4835,12 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_fragment_box (void *cls, const struct TransportFragmentBox *fb)
+check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
 {
   uint16_t size = ntohs (fb->header.size);
   uint16_t bsize = size - sizeof (*fb);
 
+  (void) cls;
   if (0 == bsize)
   {
     GNUNET_break_op (0);
@@ -4095,32 +4861,150 @@ check_fragment_box (void *cls, const struct TransportFragmentBox *fb)
 
 
 /**
- * Generate a fragment acknowledgement for an @a rc.
+ * Clean up an idle cummulative acknowledgement data structure.
  *
- * @param rc context to generate ACK for, @a rc ACK state is reset
+ * @param cls a `struct AcknowledgementCummulator *`
  */
 static void
-send_fragment_ack (struct ReassemblyContext *rc)
+destroy_ack_cummulator (void *cls)
 {
-  struct TransportFragmentAckMessage *ack;
+  struct AcknowledgementCummulator *ac = cls;
 
-  ack = GNUNET_new (struct TransportFragmentAckMessage);
-  ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
-  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
-  ack->frag_uuid = htonl (rc->frag_uuid);
-  ack->extra_acks = GNUNET_htonll (rc->extra_acks);
-  ack->msg_uuid = rc->msg_uuid;
-  ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
-  if (0 == rc->msg_missing)
-    ack->reassembly_timeout = GNUNET_TIME_relative_hton (
-      GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
+  ac->task = NULL;
+  GNUNET_assert (0 == ac->num_acks);
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
+  GNUNET_free (ac);
+}
+
+
+/**
+ * Do the transmission of a cummulative acknowledgement now.
+ *
+ * @param cls a `struct AcknowledgementCummulator *`
+ */
+static void
+transmit_cummulative_ack_cb (void *cls)
+{
+  struct AcknowledgementCummulator *ac = cls;
+  struct TransportReliabilityAckMessage *ack;
+  struct TransportCummulativeAckPayloadP *ap;
+
+  ac->task = NULL;
+  GNUNET_assert (0 < ac->ack_counter);
+  ack = GNUNET_malloc (sizeof (*ack) +
+                       ac->ack_counter *
+                         sizeof (struct TransportCummulativeAckPayloadP));
+  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+  ack->header.size =
+    htons (sizeof (*ack) +
+           ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+  ack->ack_counter = htonl (ac->ack_counter++);
+  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
+  for (unsigned int i = 0; i < ac->ack_counter; i++)
+  {
+    ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
+    ap[i].ack_delay = GNUNET_TIME_relative_hton (
+      GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
+  }
+  route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+  ac->num_acks = 0;
+  ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
+                                           &destroy_ack_cummulator,
+                                           ac);
+}
+
+
+/**
+ * Transmit an acknowledgement for @a ack_uuid to @a pid delaying
+ * transmission by at most @a ack_delay.
+ *
+ * @param pid target peer
+ * @param ack_uuid UUID to ack
+ * @param max_delay how long can the ACK wait
+ */
+static void
+cummulative_ack (const struct GNUNET_PeerIdentity *pid,
+                 const struct AcknowledgementUUIDP *ack_uuid,
+                 struct GNUNET_TIME_Absolute max_delay)
+{
+  struct AcknowledgementCummulator *ac;
+
+  ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
+  if (NULL == ac)
+  {
+    ac = GNUNET_new (struct AcknowledgementCummulator);
+    ac->target = *pid;
+    ac->min_transmission_time = max_delay;
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     ack_cummulators,
+                     &ac->target,
+                     ac,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  }
   else
-    ack->reassembly_timeout = GNUNET_TIME_relative_hton (
-      GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
-  route_message (&rc->neighbour->pid, &ack->header, RMO_DV_ALLOWED);
-  rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
-  rc->num_acks = 0;
-  rc->extra_acks = 0LLU;
+  {
+    if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
+    {
+      /* must run immediately, ack buffer full! */
+      GNUNET_SCHEDULER_cancel (ac->task);
+      transmit_cummulative_ack_cb (ac);
+    }
+    GNUNET_SCHEDULER_cancel (ac->task);
+    ac->min_transmission_time =
+      GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
+  }
+  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
+  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
+  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
+  ac->num_acks++;
+  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
+                                      &transmit_cummulative_ack_cb,
+                                      ac);
+}
+
+
+/**
+ * Closure for #find_by_message_uuid.
+ */
+struct FindByMessageUuidContext
+{
+  /**
+   * UUID to look for.
+   */
+  struct MessageUUIDP message_uuid;
+
+  /**
+   * Set to the reassembly context if found.
+   */
+  struct ReassemblyContext *rc;
+};
+
+
+/**
+ * Iterator called to find a reassembly context by the message UUID in the
+ * multihashmap32.
+ *
+ * @param cls a `struct FindByMessageUuidContext`
+ * @param key a key (unused)
+ * @param value a `struct ReassemblyContext`
+ * @return #GNUNET_YES if not found, #GNUNET_NO if found
+ */
+static int
+find_by_message_uuid (void *cls, uint32_t key, void *value)
+{
+  struct FindByMessageUuidContext *fc = cls;
+  struct ReassemblyContext *rc = value;
+
+  (void) key;
+  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
+  {
+    fc->rc = rc;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
 }
 
 
@@ -4132,7 +5016,7 @@ send_fragment_ack (struct ReassemblyContext *rc)
  * @param fb the message that was received
  */
 static void
-handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
+handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
 {
   struct CommunicatorMessageContext *cmc = cls;
   struct Neighbour *n;
@@ -4141,12 +5025,11 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
   uint16_t msize;
   uint16_t fsize;
   uint16_t frag_off;
-  uint32_t frag_uuid;
   char *target;
   struct GNUNET_TIME_Relative cdelay;
-  int ack_now;
+  struct FindByMessageUuidContext fc;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
+  n = lookup_neighbour (&cmc->im.sender);
   if (NULL == n)
   {
     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
@@ -4158,7 +5041,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
   }
   if (NULL == n->reassembly_map)
   {
-    n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8, GNUNET_YES);
+    n->reassembly_map = GNUNET_CONTAINER_multihashmap32_create (8);
     n->reassembly_heap =
       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
     n->reassembly_timeout_task =
@@ -4167,8 +5050,13 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
                                     n);
   }
   msize = ntohs (fb->msg_size);
-  rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map, &fb->msg_uuid);
-  if (NULL == rc)
+  fc.message_uuid = fb->msg_uuid;
+  fc.rc = NULL;
+  GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
+                                                fb->msg_uuid.uuid,
+                                                &find_by_message_uuid,
+                                                &fc);
+  if (NULL == (rc = fc.rc))
   {
     rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */
                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
@@ -4182,11 +5070,11 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
                                            rc,
                                            rc->reassembly_timeout.abs_value_us);
     GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONTAINER_multishortmap_put (
+                   GNUNET_CONTAINER_multihashmap32_put (
                      n->reassembly_map,
-                     &rc->msg_uuid,
+                     rc->msg_uuid.uuid,
                      rc,
-                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
     target = (char *) &rc[1];
     rc->bitfield = (uint8_t *) (target + rc->msg_size);
     rc->msg_missing = rc->msg_size;
@@ -4204,6 +5092,12 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
 
   /* reassemble */
   fsize = ntohs (fb->header.size) - sizeof (*fb);
+  if (0 == fsize)
+  {
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    return;
+  }
   frag_off = ntohs (fb->frag_off);
   memcpy (&target[frag_off], &fb[1], fsize);
   /* update bitfield and msg_missing */
@@ -4217,60 +5111,17 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
   }
 
   /* Compute cummulative ACK */
-  frag_uuid = ntohl (fb->frag_uuid);
   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
-  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks);
+  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
+  if (0 == rc->msg_missing)
+    cdelay = GNUNET_TIME_UNIT_ZERO;
+  cummulative_ack (&cmc->im.sender,
+                   &fb->ack_uuid,
+                   GNUNET_TIME_relative_to_absolute (cdelay));
   rc->last_frag = GNUNET_TIME_absolute_get ();
-  rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay);
-  ack_now = GNUNET_NO;
-  if (0 == rc->num_acks)
-  {
-    /* case one: first ack */
-    rc->frag_uuid = frag_uuid;
-    rc->extra_acks = 0LLU;
-    rc->num_acks = 1;
-  }
-  else if ((frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64))
-  {
-    /* case two: ack fits after existing min UUID */
-    if ((frag_uuid == rc->frag_uuid) ||
-        (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))))
-    {
-      /* duplicate fragment, ack now! */
-      ack_now = GNUNET_YES;
-    }
-    else
-    {
-      rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
-      rc->num_acks++;
-    }
-  }
-  else if ((rc->frag_uuid > frag_uuid) &&
-           (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks)) ||
-            ((rc->frag_uuid < frag_uuid + 64) &&
-             (rc->extra_acks ==
-              (rc->extra_acks &
-               ~((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))))))
-  {
-    /* can fit ack by shifting extra acks and starting at
-       frag_uid, test above esured that the bits we will
-       shift 'extra_acks' by are all zero. */
-    rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
-    rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
-    rc->frag_uuid = frag_uuid;
-    rc->num_acks++;
-  }
-  if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very
-                             aggressive. */
-    ack_now = GNUNET_YES; /* maximum acks received */
-  // FIXME: possibly also ACK based on RTT (but for that we'd need to
-  // determine the queue used for the ACK first!)
-
   /* is reassembly complete? */
   if (0 != rc->msg_missing)
   {
-    if (ack_now)
-      send_fragment_ack (rc);
     finish_cmc_handling (cmc);
     return;
   }
@@ -4284,9 +5135,8 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
     return;
   }
   /* successful reassembly */
-  send_fragment_ack (rc);
   demultiplex_with_cmc (cmc, msg);
-  /* FIXME: really free here? Might be bad if fragments are still
+  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
      en-route and we forget that we finished this reassembly immediately!
      -> keep around until timeout?
      -> shorten timeout based on ACK? */
@@ -4295,171 +5145,204 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
 
 
 /**
- * Check the @a fa against the fragments associated with @a pm.
- * If it matches, remove the matching fragments from the transmission
- * list.
+ * Communicator gave us a reliability box.  Check the message.
  *
- * @param pm pending message to check against the ack
- * @param fa the ack that was received
- * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_ack_against_pm (struct PendingMessage *pm,
-                      const struct TransportFragmentAckMessage *fa)
-{
-  int match;
-  struct PendingMessage *nxt;
-  uint32_t fs = ntohl (fa->frag_uuid);
-  uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
-
-  match = GNUNET_NO;
-  for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt)
-  {
-    const struct TransportFragmentBox *tfb =
-      (const struct TransportFragmentBox *) &pm[1];
-    uint32_t fu = ntohl (tfb->frag_uuid);
-
-    GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
-    nxt = frag->next_frag;
-    /* Check for exact match or match in the 'xtra' bitmask */
-    if ((fu == fs) ||
-        ((fu > fs) && (fu <= fs + 64) && (0 != (1LLU << (fu - fs - 1) & xtra))))
-    {
-      match = GNUNET_YES;
-      free_fragment_tree (frag);
-    }
-  }
-  return match;
+check_reliability_box (void *cls,
+                       const struct TransportReliabilityBoxMessage *rb)
+{
+  (void) cls;
+  GNUNET_MQ_check_boxed_message (rb);
+  return GNUNET_YES;
 }
 
 
 /**
- * Communicator gave us a fragment acknowledgement.  Process the request.
+ * Communicator gave us a reliability box.  Process the request.
  *
  * @param cls a `struct CommunicatorMessageContext` (must call
  * #finish_cmc_handling() when done)
- * @param fa the message that was received
+ * @param rb the message that was received
  */
 static void
-handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa)
+handle_reliability_box (void *cls,
+                        const struct TransportReliabilityBoxMessage *rb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct Neighbour *n;
-  int matched;
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &rb[1];
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n)
-  {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+  // FIXME: call cummulative_ack(), have ack_countdown influence max_delay!
+  (void) (0 == ntohl (rb->ack_countdown));
+  /* continue with inner message */
+  demultiplex_with_cmc (cmc, inbox);
+}
 
-    GNUNET_break (0);
-    finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
-    return;
-  }
-  /* FIXME-OPTIMIZE: maybe use another hash map here? */
-  matched = GNUNET_NO;
-  for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm;
-       pm = pm->prev_neighbour)
-  {
-    if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid))
-      continue;
-    matched = GNUNET_YES;
-    if (GNUNET_YES == check_ack_against_pm (pm, fa))
-    {
-      struct GNUNET_TIME_Relative avg_ack_delay =
-        GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
-      // FIXME: update RTT and other reliability data!
-      // ISSUE: we don't know which of n's queues the message(s)
-      // took (and in fact the different messages might have gone
-      // over different queues and possibly over multiple).
-      // => track queues with PendingMessages, and update RTT only if
-      //    the queue used is unique?
-      //    -> how can we get loss rates?
-      //    -> or, add extra state to Box and ACK to identify queue?
-      // IDEA: generate MULTIPLE frag-uuids per fragment and track
-      //    the queue with the fragment! (-> this logic must
-      //    be moved into check_ack_against_pm!)
-      (void) avg_ack_delay;
-    }
-    else
-    {
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# FRAGMENT_ACKS dropped, no matching fragment",
-                                1,
-                                GNUNET_NO);
-    }
-    if (NULL == pm->head_frag)
-    {
-      // if entire message is ACKed, handle that as well.
-      // => clean up PM, any post actions?
-      free_pending_message (pm);
-    }
-    else
-    {
-      struct GNUNET_TIME_Relative reassembly_timeout =
-        GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
-      // OPTIMIZE-FIXME: adjust retransmission strategy based on
-      // reassembly_timeout!
-      (void) reassembly_timeout;
-    }
-    break;
-  }
-  if (GNUNET_NO == matched)
+
+/**
+ * Check if we have advanced to another age since the last time.  If
+ * so, purge ancient statistics (more than GOODPUT_AGING_SLOTS before
+ * the current age)
+ *
+ * @param pd[in,out] data to update
+ * @param age current age
+ */
+static void
+update_pd_age (struct PerformanceData *pd, unsigned int age)
+{
+  unsigned int sage;
+
+  if (age == pd->last_age)
+    return; /* nothing to do */
+  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
+  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
   {
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# FRAGMENT_ACKS dropped, no matching pending message",
-                              1,
-                              GNUNET_NO);
+    struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
+
+    the->bytes_sent = 0;
+    the->bytes_received = 0;
   }
-  finish_cmc_handling (cmc);
+  pd->last_age = age;
 }
 
 
 /**
- * Communicator gave us a reliability box.  Check the message.
+ * Update @a pd based on the latest @a rtt and the number of bytes
+ * that were confirmed to be successfully transmitted.
  *
- * @param cls a `struct CommunicatorMessageContext`
- * @param rb the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param pd[in,out] data to update
+ * @param rtt latest round-trip time
+ * @param bytes_transmitted_ok number of bytes receiver confirmed as received
  */
-static int
-check_reliability_box (void *cls, const struct TransportReliabilityBox *rb)
+static void
+update_performance_data (struct PerformanceData *pd,
+                         struct GNUNET_TIME_Relative rtt,
+                         uint16_t bytes_transmitted_ok)
 {
-  GNUNET_MQ_check_boxed_message (rb);
-  return GNUNET_YES;
+  uint64_t nval = rtt.rel_value_us;
+  uint64_t oval = pd->aged_rtt.rel_value_us;
+  unsigned int age = get_age ();
+  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
+
+  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
+    pd->aged_rtt = rtt;
+  else
+    pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
+  update_pd_age (pd, age);
+  the->bytes_received += bytes_transmitted_ok;
 }
 
 
 /**
- * Communicator gave us a reliability box.  Process the request.
+ * We have successfully transmitted data via @a q, update metrics.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param rb the message that was received
+ * @param q queue to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
  */
 static void
-handle_reliability_box (void *cls, const struct TransportReliabilityBox *rb)
+update_queue_performance (struct Queue *q,
+                          struct GNUNET_TIME_Relative rtt,
+                          uint16_t bytes_transmitted_ok)
 {
-  struct CommunicatorMessageContext *cmc = cls;
-  const struct GNUNET_MessageHeader *inbox =
-    (const struct GNUNET_MessageHeader *) &rb[1];
+  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
+}
 
-  if (0 == ntohl (rb->ack_countdown))
-  {
-    struct TransportReliabilityAckMessage *ack;
 
-    /* FIXME: implement cummulative ACKs and ack_countdown,
-       then setting the avg_ack_delay field below: */
-    ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct GNUNET_ShortHashCode));
-    ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
-    ack->header.size =
-      htons (sizeof (*ack) + sizeof (struct GNUNET_ShortHashCode));
-    memcpy (&ack[1], &rb->msg_uuid, sizeof (struct GNUNET_ShortHashCode));
-    route_message (&cmc->im.sender, &ack->header, RMO_DV_ALLOWED);
+/**
+ * We have successfully transmitted data via @a dvh, update metrics.
+ *
+ * @param dvh distance vector path data to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
+ */
+static void
+update_dvh_performance (struct DistanceVectorHop *dvh,
+                        struct GNUNET_TIME_Relative rtt,
+                        uint16_t bytes_transmitted_ok)
+{
+  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
+}
+
+
+/**
+ * The @a pa was acknowledged, process the acknowledgement.
+ *
+ * @param pa the pending acknowledgement that was satisfied
+ * @param ack_delay artificial delay from cummulative acks created by the
+ * other peer
+ */
+static void
+handle_acknowledged (struct PendingAcknowledgement *pa,
+                     struct GNUNET_TIME_Relative ack_delay)
+{
+  struct PendingMessage *pm = pa->pm;
+  struct GNUNET_TIME_Relative delay;
+
+  delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
+  if (delay.rel_value_us > ack_delay.rel_value_us)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  else
+    delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
+  if (NULL != pa->queue)
+    update_queue_performance (pa->queue, delay, pa->message_size);
+  if (NULL != pa->dvh)
+    update_dvh_performance (pa->dvh, delay, pa->message_size);
+  if (NULL != pm)
+  {
+    if (NULL != pm->frag_parent)
+    {
+      pm = pm->frag_parent;
+      free_fragment_tree (pa->pm);
+    }
+    while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
+    {
+      struct PendingMessage *parent = pm->frag_parent;
+
+      free_fragment_tree (pm);
+      pm = parent;
+    }
+    if (NULL != pm->head_frag)
+      pm = NULL; /* we are done, otherwise free 'pm' below */
   }
-  /* continue with inner message */
-  demultiplex_with_cmc (cmc, inbox);
+  if (NULL != pm)
+    free_pending_message (pm);
+  free_pending_acknowledgement (pa);
+}
+
+
+/**
+ * Communicator gave us a reliability ack.  Check it is well-formed.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (unused)
+ * @param ra the message that was received
+ * @return #GNUNET_Ok if @a ra is well-formed
+ */
+static int
+check_reliability_ack (void *cls,
+                       const struct TransportReliabilityAckMessage *ra)
+{
+  unsigned int n_acks;
+
+  (void) cls;
+  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+           sizeof (struct TransportCummulativeAckPayloadP);
+  if (0 == n_acks)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if ((ntohs (ra->header.size) - sizeof (*ra)) !=
+      n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
 }
 
 
@@ -4475,70 +5358,34 @@ handle_reliability_ack (void *cls,
                         const struct TransportReliabilityAckMessage *ra)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct Neighbour *n;
+  const struct TransportCummulativeAckPayloadP *ack;
+  struct PendingAcknowledgement *pa;
   unsigned int n_acks;
-  const struct GNUNET_ShortHashCode *msg_uuids;
-  struct PendingMessage *nxt;
-  int matched;
+  uint32_t ack_counter;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n)
-  {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
-
-    GNUNET_break (0);
-    finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
-    return;
-  }
   n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
-           sizeof (struct GNUNET_ShortHashCode);
-  msg_uuids = (const struct GNUNET_ShortHashCode *) &ra[1];
-
-  /* FIXME-OPTIMIZE: maybe use another hash map here? */
-  matched = GNUNET_NO;
-  for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt)
+           sizeof (struct TransportCummulativeAckPayloadP);
+  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
+  for (unsigned int i = 0; i < n_acks; i++)
   {
-    int in_list;
-
-    nxt = pm->next_neighbour;
-    in_list = GNUNET_NO;
-    for (unsigned int i = 0; i < n_acks; i++)
+    pa =
+      GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
+    if (NULL == pa)
     {
-      if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid))
-        continue;
-      in_list = GNUNET_YES;
-      break;
-    }
-    if (GNUNET_NO == in_list)
+      GNUNET_STATISTICS_update (
+        GST_stats,
+        "# FRAGMENT_ACKS dropped, no matching pending message",
+        1,
+        GNUNET_NO);
       continue;
-
-    /* this pm was acked! */
-    matched = GNUNET_YES;
-    free_pending_message (pm);
-
-    {
-      struct GNUNET_TIME_Relative avg_ack_delay =
-        GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
-      // FIXME: update RTT and other reliability data!
-      // ISSUE: we don't know which of n's queues the message(s)
-      // took (and in fact the different messages might have gone
-      // over different queues and possibly over multiple).
-      // => track queues with PendingMessages, and update RTT only if
-      //    the queue used is unique?
-      //    -> how can we get loss rates?
-      //    -> or, add extra state to MSG and ACKs to identify queue?
-      //    -> if we do this, might just do the same for the avg_ack_delay!
-      (void) avg_ack_delay;
     }
+    handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
   }
-  if (GNUNET_NO == matched)
-  {
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# FRAGMENT_ACKS dropped, no matching pending message",
-                              1,
-                              GNUNET_NO);
-  }
+
+  ack_counter = htonl (ra->ack_counter);
+  (void) ack_counter; /* silence compiler warning for now */
+  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
+  // (DV and/or Neighbour?)
   finish_cmc_handling (cmc);
 }
 
@@ -4559,7 +5406,7 @@ check_backchannel_encapsulation (
 
   (void) cls;
   if ((size - sizeof (*be)) <
-      (sizeof (struct TransportBackchannelRequestPayload) +
+      (sizeof (struct TransportBackchannelRequestPayloadP) +
        sizeof (struct GNUNET_MessageHeader)))
   {
     GNUNET_break_op (0);
@@ -4569,6 +5416,256 @@ check_backchannel_encapsulation (
 }
 
 
+/**
+ * We received the plaintext @a msg from backtalker @a b. Forward
+ * it to the respective communicator.
+ *
+ * @param b a backtalker
+ * @param msg a message, consisting of a `struct GNUNET_MessageHeader`
+ *        followed by the target name of the communicator
+ * @param msg_size number of bytes in @a msg
+ */
+static void
+forward_backchannel_payload (struct Backtalker *b,
+                             const void *msg,
+                             size_t msg_size)
+{
+  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
+  struct GNUNET_MQ_Envelope *env;
+  struct TransportClient *tc;
+  const struct GNUNET_MessageHeader *mh;
+  const char *target_communicator;
+  uint16_t mhs;
+
+  /* Determine target_communicator and check @a msg is well-formed */
+  mh = msg;
+  mhs = ntohs (mh->size);
+  if (mhs <= msg_size)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  target_communicator = &((const char *) msg)[ntohs (mh->size)];
+  if ('\0' != target_communicator[msg_size - mhs - 1])
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  /* Find client providing this communicator */
+  for (tc = clients_head; NULL != tc; tc = tc->next)
+    if ((CT_COMMUNICATOR == tc->type) &&
+        (0 ==
+         strcmp (tc->details.communicator.address_prefix, target_communicator)))
+      break;
+  if (NULL == tc)
+  {
+    char *stastr;
+
+    GNUNET_asprintf (
+      &stastr,
+      "# Backchannel message dropped: target communicator `%s' unknown",
+      target_communicator);
+    GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO);
+    GNUNET_free (stastr);
+    return;
+  }
+  /* Finally, deliver backchannel message to communicator */
+  env = GNUNET_MQ_msg_extra (
+    cbi,
+    msg_size,
+    GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
+  cbi->pid = b->pid;
+  memcpy (&cbi[1], msg, msg_size);
+  GNUNET_MQ_send (tc->mq, env);
+}
+
+
+/**
+ * Free data structures associated with @a b.
+ *
+ * @param b data structure to release
+ */
+static void
+free_backtalker (struct Backtalker *b)
+{
+  if (NULL != b->get)
+  {
+    GNUNET_PEERSTORE_iterate_cancel (b->get);
+    b->get = NULL;
+    GNUNET_assert (NULL != b->cmc);
+    finish_cmc_handling (b->cmc);
+    b->cmc = NULL;
+  }
+  if (NULL != b->task)
+  {
+    GNUNET_SCHEDULER_cancel (b->task);
+    b->task = NULL;
+  }
+  if (NULL != b->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (b->sc);
+    b->sc = NULL;
+  }
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
+  GNUNET_free (b);
+}
+
+
+/**
+ * Callback to free backtalker records.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct Backtalker`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_backtalker_cb (void *cls,
+                    const struct GNUNET_PeerIdentity *pid,
+                    void *value)
+{
+  struct Backtalker *b = value;
+
+  (void) cls;
+  (void) pid;
+  free_backtalker (b);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function called when it is time to clean up a backtalker.
+ *
+ * @param cls a `struct Backtalker`
+ */
+static void
+backtalker_timeout_cb (void *cls)
+{
+  struct Backtalker *b = cls;
+
+  b->task = NULL;
+  if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
+  {
+    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    return;
+  }
+  GNUNET_assert (NULL == b->sc);
+  free_backtalker (b);
+}
+
+
+/**
+ * Function called with the monotonic time of a backtalker
+ * by PEERSTORE. Updates the time and continues processing.
+ *
+ * @param cls a `struct Backtalker`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
+ */
+static void
+backtalker_monotime_cb (void *cls,
+                        const struct GNUNET_PEERSTORE_Record *record,
+                        const char *emsg)
+{
+  struct Backtalker *b = cls;
+  struct GNUNET_TIME_AbsoluteNBO *mtbe;
+  struct GNUNET_TIME_Absolute mt;
+
+  (void) emsg;
+  if (NULL == record)
+  {
+    /* we're done with #backtalker_monotime_cb() invocations,
+       continue normal processing */
+    b->get = NULL;
+    GNUNET_assert (NULL != b->cmc);
+    finish_cmc_handling (b->cmc);
+    b->cmc = NULL;
+    if (0 != b->body_size)
+      forward_backchannel_payload (b, &b[1], b->body_size);
+    return;
+  }
+  if (sizeof (*mtbe) != record->value_size)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  mtbe = record->value;
+  mt = GNUNET_TIME_absolute_ntoh (*mtbe);
+  if (mt.abs_value_us > b->monotonic_time.abs_value_us)
+  {
+    GNUNET_STATISTICS_update (
+      GST_stats,
+      "# Backchannel messages dropped: monotonic time not increasing",
+      1,
+      GNUNET_NO);
+    b->monotonic_time = mt;
+    /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+     */
+    b->body_size = 0;
+    return;
+  }
+}
+
+
+/**
+ * Function called by PEERSTORE when the store operation of
+ * a backtalker's monotonic time is complete.
+ *
+ * @param cls the `struct Backtalker`
+ * @param success #GNUNET_OK on success
+ */
+static void
+backtalker_monotime_store_cb (void *cls, int success)
+{
+  struct Backtalker *b = cls;
+
+  if (GNUNET_OK != success)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to store backtalker's monotonic time in PEERSTORE!\n");
+  }
+  b->sc = NULL;
+  b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+}
+
+
+/**
+ * The backtalker @a b monotonic time changed. Update PEERSTORE.
+ *
+ * @param b a backtalker with updated monotonic time
+ */
+static void
+update_backtalker_monotime (struct Backtalker *b)
+{
+  struct GNUNET_TIME_AbsoluteNBO mtbe;
+
+  if (NULL != b->sc)
+  {
+    GNUNET_PEERSTORE_store_cancel (b->sc);
+    b->sc = NULL;
+  }
+  else
+  {
+    GNUNET_SCHEDULER_cancel (b->task);
+    b->task = NULL;
+  }
+  mtbe = GNUNET_TIME_absolute_hton (b->monotonic_time);
+  b->sc =
+    GNUNET_PEERSTORE_store (peerstore,
+                            "transport",
+                            &b->pid,
+                            GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+                            &mtbe,
+                            sizeof (mtbe),
+                            GNUNET_TIME_UNIT_FOREVER_ABS,
+                            GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                            &backtalker_monotime_store_cb,
+                            b);
+}
+
+
 /**
  * Communicator gave us a backchannel encapsulation.  Process the request.
  * (We are not the origin of the backchannel here, the communicator simply
@@ -4592,9 +5689,6 @@ handle_backchannel_encapsulation (
   if (0 != GNUNET_memcmp (&be->target, &GST_my_identity))
   {
     /* not for me, try to route to target */
-    /* FIXME: someone needs to update be->distance! */
-    /* FIXME: BE routing can be special, should we put all of this
-       on 'route_message'? Maybe at least pass some more arguments? */
     route_message (&be->target,
                    GNUNET_copy_message (&be->header),
                    RMO_DV_ALLOWED);
@@ -4614,7 +5708,9 @@ handle_backchannel_encapsulation (
   }
   /* begin actual decryption */
   {
-    struct TransportBackchannelRequestPayload ppay;
+    struct Backtalker *b;
+    struct GNUNET_TIME_Absolute monotime;
+    struct TransportBackchannelRequestPayloadP ppay;
     char body[hdr_len - sizeof (ppay)];
 
     GNUNET_assert (hdr_len >=
@@ -4622,15 +5718,78 @@ handle_backchannel_encapsulation (
     bc_decrypt (&key, &ppay, hdr, sizeof (ppay));
     bc_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
     bc_key_clean (&key);
-    // FIXME: verify signatures in ppay!
-    // => check if ephemeral key is known & valid, if not
-    // => verify sig, cache ephemeral key
-    // => update monotonic_time of sender for replay detection
-
-    // FIXME: forward to specified communicator!
-    // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
+    monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
+    b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
+    if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
+    {
+      GNUNET_STATISTICS_update (
+        GST_stats,
+        "# Backchannel messages dropped: monotonic time not increasing",
+        1,
+        GNUNET_NO);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    if ((NULL == b) ||
+        (0 != GNUNET_memcmp (&b->last_ephemeral, &be->ephemeral_key)))
+    {
+      /* Check signature */
+      struct EphemeralConfirmationPS ec;
+
+      ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+      ec.purpose.size = htonl (sizeof (ec));
+      ec.target = GST_my_identity;
+      ec.ephemeral_key = be->ephemeral_key;
+      if (
+        GNUNET_OK !=
+        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
+                                    &ec.purpose,
+                                    &ppay.sender_sig,
+                                    &ppay.sender.public_key))
+      {
+        /* Signature invalid, disard! */
+        GNUNET_break_op (0);
+        finish_cmc_handling (cmc);
+        return;
+      }
+    }
+    if (NULL != b)
+    {
+      /* update key cache and mono time */
+      b->last_ephemeral = be->ephemeral_key;
+      b->monotonic_time = monotime;
+      update_backtalker_monotime (b);
+      forward_backchannel_payload (b, body, sizeof (body));
+      b->timeout =
+        GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+      finish_cmc_handling (cmc);
+      return;
+    }
+    /* setup data structure to cache signature AND check
+       monotonic time with PEERSTORE before forwarding backchannel payload */
+    b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
+    b->pid = ppay.sender;
+    b->body_size = sizeof (body);
+    memcpy (&b[1], body, sizeof (body));
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     backtalkers,
+                     &b->pid,
+                     b,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    b->monotonic_time = monotime; /* NOTE: to be checked still! */
+    b->cmc = cmc;
+    b->timeout =
+      GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+    b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+    b->get =
+      GNUNET_PEERSTORE_iterate (peerstore,
+                                "transport",
+                                &b->pid,
+                                GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+                                &backtalker_monotime_cb,
+                                b);
   }
-  finish_cmc_handling (cmc);
 }
 
 
@@ -4666,40 +5825,6 @@ path_cleanup_cb (void *cls)
     GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
 }
 
-/**
- * Task run to check whether the hops of the @a cls still
- * are validated, or if we need to core about disconnection.
- *
- * @param cls a `struct DistanceVector` (with core_visible set!)
- */
-static void
-check_dv_path_down (void *cls)
-{
-  struct DistanceVector *dv = cls;
-  struct Neighbour *n;
-
-  dv->visibility_task = NULL;
-  GNUNET_assert (GNUNET_YES == dv->core_visible);
-  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-       pos = pos->next_dv)
-  {
-    if (0 <
-        GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us)
-    {
-      dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until,
-                                                     &check_dv_path_down,
-                                                     dv);
-      return;
-    }
-  }
-  /* all paths invalid, make dv core-invisible */
-  dv->core_visible = GNUNET_NO;
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &dv->target);
-  if ((NULL != n) && (GNUNET_YES == n->core_visible))
-    return; /* no need to tell core, connection still up! */
-  cores_send_disconnect_info (&dv->target);
-}
-
 
 /**
  * The @a hop is a validated path to the respective target
@@ -4712,22 +5837,30 @@ static void
 activate_core_visible_dv_path (struct DistanceVectorHop *hop)
 {
   struct DistanceVector *dv = hop->dv;
-  struct Neighbour *n;
+  struct VirtualLink *vl;
 
-  GNUNET_assert (GNUNET_NO == dv->core_visible);
-  GNUNET_assert (NULL == dv->visibility_task);
-
-  dv->core_visible = GNUNET_YES;
-  dv->visibility_task =
-    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv);
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &dv->target);
-  if ((NULL != n) && (GNUNET_YES == n->core_visible))
-    return; /* no need to tell core, connection already up! */
-  cores_send_connect_info (&dv->target,
-                           (NULL != n)
-                             ? GNUNET_BANDWIDTH_value_sum (n->quota_out,
-                                                           dv->quota_out)
-                             : dv->quota_out);
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+  if (NULL != vl)
+  {
+    /* Link was already up, remember dv is also now available and we are done */
+    vl->dv = dv;
+    return;
+  }
+  vl = GNUNET_new (struct VirtualLink);
+  vl->target = dv->target;
+  vl->dv = dv;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
+     before, so tell CORE about it (finally!) */
+  cores_send_connect_info (&dv->target);
 }
 
 
@@ -4743,13 +5876,13 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
  *
  * @param path the path we learned, path[0] should be us,
- *             and then path contains a valid path from us to `path[path_len-1]`
- *             path[1] should be a direct neighbour (we should check!)
+ *             and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
  * @param path_len number of entries on the @a path, at least three!
  * @param network_latency how long does the message take from us to
  * `path[path_len-1]`? set to "forever" if unknown
- * @param path_valid_until how long is this path considered validated? Maybe be
- * zero.
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
  * @return #GNUNET_YES on success,
  *         #GNUNET_NO if we have better path(s) to the target
  *         #GNUNET_SYSERR if the path is useless and/or invalid
@@ -4774,7 +5907,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
     return GNUNET_SYSERR;
   }
   GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
-  next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours, &path[1]);
+  next_hop = lookup_neighbour (&path[1]);
   if (NULL == next_hop)
   {
     /* next hop must be a neighbour, otherwise this whole thing is useless! */
@@ -4782,7 +5915,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
     return GNUNET_SYSERR;
   }
   for (unsigned int i = 2; i < path_len; i++)
-    if (NULL != GNUNET_CONTAINER_multipeermap_get (neighbours, &path[i]))
+    if (NULL != lookup_neighbour (&path[i]))
     {
       /* Useless path, we have a direct connection to some hop
          in the middle of the path, so this one doesn't even
@@ -4842,9 +5975,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
           GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
         GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
         GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
-        if ((GNUNET_NO == dv->core_visible) &&
-            (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until)
-                   .rel_value_us))
+        if (0 <
+            GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
           activate_core_visible_dv_path (pos);
         if (last_timeout.rel_value_us <
             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
@@ -4878,13 +6010,13 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
   hop->path_valid_until = path_valid_until;
   hop->distance = path_len - 2;
+  hop->pd.aged_rtt = network_latency;
   GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
   GNUNET_CONTAINER_MDLL_insert (neighbour,
                                 next_hop->dv_head,
                                 next_hop->dv_tail,
                                 hop);
-  if ((GNUNET_NO == dv->core_visible) &&
-      (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us))
+  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
     activate_core_visible_dv_path (hop);
   return GNUNET_YES;
 }
@@ -4898,7 +6030,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_dv_learn (void *cls, const struct TransportDVLearn *dvl)
+check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
 {
   uint16_t size = ntohs (dvl->header.size);
   uint16_t num_hops = ntohs (dvl->num_hops);
@@ -4945,22 +6077,22 @@ check_dv_learn (void *cls, const struct TransportDVLearn *dvl)
  */
 static void
 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
-                  const struct TransportDVLearn *msg,
+                  const struct TransportDVLearnMessage *msg,
                   uint16_t bi_history,
                   uint16_t nhops,
                   const struct DVPathEntryP *hops,
                   struct GNUNET_TIME_Absolute in_time)
 {
   struct DVPathEntryP *dhops;
-  struct TransportDVLearn *fwd;
+  struct TransportDVLearnMessage *fwd;
   struct GNUNET_TIME_Relative nnd;
 
   /* compute message for forwarding */
   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
-  fwd = GNUNET_malloc (sizeof (struct TransportDVLearn) +
+  fwd = GNUNET_malloc (sizeof (struct TransportDVLearnMessage) +
                        (nhops + 1) * sizeof (struct DVPathEntryP));
   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
-  fwd->header.size = htons (sizeof (struct TransportDVLearn) +
+  fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
                             (nhops + 1) * sizeof (struct DVPathEntryP));
   fwd->num_hops = htons (nhops + 1);
   fwd->bidirectional = htons (bi_history);
@@ -4982,45 +6114,255 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
                           .succ = *next_hop,
                           .challenge = msg->challenge};
 
-    GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                             &dhp.purpose,
-                                             &dhops[nhops].hop_sig));
-  }
-  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                             &dhp.purpose,
+                                             &dhops[nhops].hop_sig));
+  }
+  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+}
+
+
+/**
+ * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
+ *
+ * @param sender_monotonic_time monotonic time of the initiator
+ * @param init the signer
+ * @param challenge the challenge that was signed
+ * @param init_sig signature presumably by @a init
+ * @return #GNUNET_OK if the signature is valid
+ */
+static int
+validate_dv_initiator_signature (
+  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
+  const struct GNUNET_PeerIdentity *init,
+  const struct ChallengeNonceP *challenge,
+  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
+{
+  struct DvInitPS ip = {.purpose.purpose = htonl (
+                          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+                        .purpose.size = htonl (sizeof (ip)),
+                        .monotonic_time = sender_monotonic_time,
+                        .challenge = *challenge};
+
+  if (
+    GNUNET_OK !=
+    GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
+                                &ip.purpose,
+                                init_sig,
+                                &init->public_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
+ */
+struct NeighbourSelectionContext
+{
+  /**
+   * Original message we received.
+   */
+  const struct TransportDVLearnMessage *dvl;
+
+  /**
+   * The hops taken.
+   */
+  const struct DVPathEntryP *hops;
+
+  /**
+   * Time we received the message.
+   */
+  struct GNUNET_TIME_Absolute in_time;
+
+  /**
+   * Offsets of the selected peers.
+   */
+  uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
+
+  /**
+   * Number of peers eligible for selection.
+   */
+  unsigned int num_eligible;
+
+  /**
+   * Number of peers that were selected for forwarding.
+   */
+  unsigned int num_selections;
+
+  /**
+   * Number of hops in @e hops
+   */
+  uint16_t nhops;
+
+  /**
+   * Bitmap of bidirectional connections encountered.
+   */
+  uint16_t bi_history;
+};
+
+
+/**
+ * Function called for each neighbour during #handle_dv_learn.
+ *
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
+ */
+static int
+dv_neighbour_selection (void *cls,
+                        const struct GNUNET_PeerIdentity *pid,
+                        void *value)
+{
+  struct NeighbourSelectionContext *nsc = cls;
+
+  (void) value;
+  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+    return GNUNET_YES; /* skip initiator */
+  for (unsigned int i = 0; i < nsc->nhops; i++)
+    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+      return GNUNET_YES; /* skip peers on path */
+  nsc->num_eligible++;
+  return GNUNET_YES;
 }
 
 
 /**
- * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
+ * Function called for each neighbour during #handle_dv_learn.
+ * We call #forward_dv_learn() on the neighbour(s) selected
+ * during #dv_neighbour_selection().
  *
- * @param init the signer
- * @param challenge the challenge that was signed
- * @param init_sig signature presumably by @a init
- * @return #GNUNET_OK if the signature is valid
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
  */
 static int
-validate_dv_initiator_signature (
-  const struct GNUNET_PeerIdentity *init,
-  const struct GNUNET_ShortHashCode *challenge,
-  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
+dv_neighbour_transmission (void *cls,
+                           const struct GNUNET_PeerIdentity *pid,
+                           void *value)
 {
-  struct DvInitPS ip = {.purpose.purpose = htonl (
-                          GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
-                        .purpose.size = htonl (sizeof (ip)),
-                        .challenge = *challenge};
+  struct NeighbourSelectionContext *nsc = cls;
 
-  if (
-    GNUNET_OK !=
-    GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
-                                &ip.purpose,
-                                init_sig,
-                                &init->public_key))
+  (void) value;
+  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+    return GNUNET_YES; /* skip initiator */
+  for (unsigned int i = 0; i < nsc->nhops; i++)
+    if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+      return GNUNET_YES; /* skip peers on path */
+  for (unsigned int i = 0; i < nsc->num_selections; i++)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    if (nsc->selections[i] == nsc->num_eligible)
+    {
+      forward_dv_learn (pid,
+                        nsc->dvl,
+                        nsc->bi_history,
+                        nsc->nhops,
+                        nsc->hops,
+                        nsc->in_time);
+      break;
+    }
   }
-  return GNUNET_OK;
+  nsc->num_eligible++;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Computes the number of neighbours we should forward a DVInit
+ * message to given that it has so far taken @a hops_taken hops
+ * though the network and that the number of neighbours we have
+ * in total is @a neighbour_count, out of which @a eligible_count
+ * are not yet on the path.
+ *
+ * NOTE: technically we might want to include NSE in the formula to
+ * get a better grip on the overall network size. However, for now
+ * using NSE here would create a dependency issue in the build system.
+ * => Left for later, hardcoded to 50 for now.
+ *
+ * The goal of the fomula is that we want to reach a total of LOG(NSE)
+ * peers via DV (`target_total`).  We want the reach to be spread out
+ * over various distances to the origin, with a bias towards shorter
+ * distances.
+ *
+ * We make the strong assumption that the network topology looks
+ * "similar" at other hops, in particular the @a neighbour_count
+ * should be comparable at other hops.
+ *
+ * If the local neighbourhood is densely connected, we expect that @a
+ * eligible_count is close to @a neighbour_count minus @a hops_taken
+ * as a lot of the path is already known. In that case, we should
+ * forward to few(er) peers to try to find a path out of the
+ * neighbourhood. OTOH, if @a eligible_count is close to @a
+ * neighbour_count, we should forward to many peers as we are either
+ * still close to the origin (i.e.  @a hops_taken is small) or because
+ * we managed to get beyond a local cluster.  We express this as
+ * the `boost_factor` using the square of the fraction of eligible
+ * neighbours (so if only 50% are eligible, we boost by 1/4, but if
+ * 99% are eligible, the 'boost' will be almost 1).
+ *
+ * Second, the more hops we have taken, the larger the problem of an
+ * exponential traffic explosion gets.  So we take the `target_total`,
+ * and compute our degree such that at each distance d 2^{-d} peers
+ * are selected (corrected by the `boost_factor`).
+ *
+ * @param hops_taken number of hops DVInit has travelled so far
+ * @param neighbour_count number of neighbours we have in total
+ * @param eligible_count number of neighbours we could in
+ *        theory forward to
+ */
+static unsigned int
+calculate_fork_degree (unsigned int hops_taken,
+                       unsigned int neighbour_count,
+                       unsigned int eligible_count)
+{
+  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
+  double eligible_ratio =
+    ((double) eligible_count) / ((double) neighbour_count);
+  double boost_factor = eligible_ratio * eligible_ratio;
+  unsigned int rnd;
+  double left;
+
+  if (hops_taken >= 64)
+    return 0; /* precaution given bitshift below */
+  for (unsigned int i = 1; i < hops_taken; i++)
+  {
+    /* For each hop, subtract the expected number of targets
+       reached at distance d (so what remains divided by 2^d) */
+    target_total -= (target_total * boost_factor / (1LLU << i));
+  }
+  rnd =
+    (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
+  /* round up or down probabilistically depending on how close we were
+     when floor()ing to rnd */
+  left = target_total - (double) rnd;
+  if (UINT32_MAX * left >
+      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
+    rnd++; /* round up */
+  return rnd;
+}
+
+
+/**
+ * Function called when peerstore is done storing a DV monotonic time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param success #GNUNET_YES if peerstore was successful
+ */
+static void
+neighbour_store_dvmono_cb (void *cls, int success)
+{
+  struct Neighbour *n = cls;
+
+  n->sc = NULL;
+  if (GNUNET_YES != success)
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to store other peer's monotonic time in peerstore!\n");
 }
 
 
@@ -5032,7 +6374,7 @@ validate_dv_initiator_signature (
  * @param dvl the message that was received
  */
 static void
-handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
+handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
 {
   struct CommunicatorMessageContext *cmc = cls;
   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
@@ -5043,6 +6385,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
   int do_fwd;
   int did_initiator;
   struct GNUNET_TIME_Absolute in_time;
+  struct Neighbour *n;
 
   nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
   bi_history = ntohs (dvl->bidirectional);
@@ -5077,18 +6420,67 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
   /* continue communicator here, everything else can happen asynchronous! */
   finish_cmc_handling (cmc);
 
-  /* OPTIMIZE-FIXME: Technically, we only need to bother checking
-     the initiator signature if we send the message back to the initiator... */
-  if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator,
-                                                    &dvl->challenge,
-                                                    &dvl->init_sig))
+  n = lookup_neighbour (&dvl->initiator);
+  if (NULL != n)
   {
-    GNUNET_break_op (0);
-    return;
+    if ((n->dv_monotime_available == GNUNET_YES) &&
+        (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
+         n->last_dv_learn_monotime.abs_value_us))
+    {
+      GNUNET_STATISTICS_update (GST_stats,
+                                "# DV learn discarded due to time travel",
+                                1,
+                                GNUNET_NO);
+      return;
+    }
+    if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
+                                                      &dvl->initiator,
+                                                      &dvl->challenge,
+                                                      &dvl->init_sig))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
+    if (GNUNET_YES == n->dv_monotime_available)
+    {
+      if (NULL != n->sc)
+        GNUNET_PEERSTORE_store_cancel (n->sc);
+      n->sc =
+        GNUNET_PEERSTORE_store (peerstore,
+                                "transport",
+                                &dvl->initiator,
+                                GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+                                &dvl->monotonic_time,
+                                sizeof (dvl->monotonic_time),
+                                GNUNET_TIME_UNIT_FOREVER_ABS,
+                                GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                                &neighbour_store_dvmono_cb,
+                                n);
+    }
+  }
+  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
+     If signature verification load too high, implement random drop strategy */
+  for (unsigned int i = 0; i < nhops; i++)
+  {
+    struct DvHopPS dhp = {.purpose.purpose =
+                            htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+                          .purpose.size = htonl (sizeof (dhp)),
+                          .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+                          .succ = (nhops - 1 == i) ? GST_my_identity
+                                                   : hops[i + 1].hop,
+                          .challenge = dvl->challenge};
+
+    if (GNUNET_OK !=
+        GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
+                                    &dhp.purpose,
+                                    &hops[i].hop_sig,
+                                    &hops[i].hop.public_key))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
   }
-  // FIXME: asynchronously (!) verify hop-by-hop signatures!
-  // => if signature verification load too high, implement random drop
-  // strategy!?
 
   do_fwd = GNUNET_YES;
   if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
@@ -5207,17 +6599,34 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
   if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
                    (GNUNET_NO == did_initiator)))
   {
-    /* FIXME: loop over all neighbours, pick those with low
-       queues AND that are not yet on the path; possibly
-       adapt threshold to nhops! */
-#if FIXME
-    forward_dv_learn (NULL, // fill in peer from iterator here!
-                      dvl,
-                      bi_history,
-                      nhops,
-                      hops,
-                      in_time);
-#endif
+    /* Pick random neighbours that are not yet on the path */
+    struct NeighbourSelectionContext nsc;
+    unsigned int n_cnt;
+
+    n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
+    nsc.nhops = nhops;
+    nsc.dvl = dvl;
+    nsc.bi_history = bi_history;
+    nsc.hops = hops;
+    nsc.in_time = in_time;
+    nsc.num_eligible = 0;
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &dv_neighbour_selection,
+                                           &nsc);
+    if (0 == nsc.num_eligible)
+      return; /* done here, cannot forward to anyone else */
+    nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
+    nsc.num_selections =
+      GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
+    for (unsigned int i = 0; i < nsc.num_selections; i++)
+      nsc.selections[i] =
+        (nsc.num_selections == n_cnt)
+          ? i /* all were selected, avoid collisions by chance */
+          : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
+    nsc.num_eligible = 0;
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &dv_neighbour_transmission,
+                                           &nsc);
   }
 }
 
@@ -5230,7 +6639,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
  * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_dv_box (void *cls, const struct TransportDVBox *dvb)
+check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
   uint16_t size = ntohs (dvb->header.size);
   uint16_t num_hops = ntohs (dvb->num_hops);
@@ -5292,26 +6701,17 @@ forward_dv_box (struct Neighbour *next_hop,
                 const void *payload,
                 uint16_t payload_size)
 {
-  struct TransportDVBox *dvb;
-  struct GNUNET_PeerIdentity *dhops;
+  struct TransportDVBoxMessage *dvb;
 
-  GNUNET_assert (UINT16_MAX < sizeof (struct TransportDVBox) +
-                                sizeof (struct GNUNET_PeerIdentity) * num_hops +
-                                payload_size);
-  dvb = GNUNET_malloc (sizeof (struct TransportDVBox) +
-                       sizeof (struct GNUNET_PeerIdentity) * num_hops +
+  dvb = create_dv_box (total_hops,
+                       origin,
+                       &hops[num_hops - 1] /* == target */,
+                       num_hops - 1 /* do not count target twice */,
+                       hops,
+                       payload,
                        payload_size);
-  dvb->header.size =
-    htons (sizeof (struct TransportDVBox) +
-           sizeof (struct GNUNET_PeerIdentity) * num_hops + payload_size);
-  dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
-  dvb->total_hops = htons (total_hops);
-  dvb->num_hops = htons (num_hops);
-  dvb->origin = *origin;
-  dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
-  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
-  memcpy (&dhops[num_hops], payload, payload_size);
   route_message (&next_hop->pid, &dvb->header, RMO_NONE);
+  GNUNET_free (dvb);
 }
 
 
@@ -5323,7 +6723,7 @@ forward_dv_box (struct Neighbour *next_hop,
  * @param dvb the message that was received
  */
 static void
-handle_dv_box (void *cls, const struct TransportDVBox *dvb)
+handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
 {
   struct CommunicatorMessageContext *cmc = cls;
   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
@@ -5347,7 +6747,7 @@ handle_dv_box (void *cls, const struct TransportDVBox *dvb)
         finish_cmc_handling (cmc);
         return;
       }
-      n = GNUNET_CONTAINER_multipeermap_get (neighbours, &hops[i]);
+      n = lookup_neighbour (&hops[i]);
       if (NULL == n)
         continue;
       forward_dv_box (n,
@@ -5407,11 +6807,12 @@ check_incoming_msg (void *cls,
  * @param tvc the message that was received
  */
 static void
-handle_validation_challenge (void *cls,
-                             const struct TransportValidationChallenge *tvc)
+handle_validation_challenge (
+  void *cls,
+  const struct TransportValidationChallengeMessage *tvc)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct TransportValidationResponse *tvr;
+  struct TransportValidationResponseMessage *tvr;
 
   if (cmc->total_hops > 0)
   {
@@ -5420,7 +6821,7 @@ handle_validation_challenge (void *cls,
     finish_cmc_handling (cmc);
     return;
   }
-  tvr = GNUNET_new (struct TransportValidationResponse);
+  tvr = GNUNET_new (struct TransportValidationResponseMessage);
   tvr->header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
   tvr->header.size = htons (sizeof (*tvr));
@@ -5454,7 +6855,7 @@ struct CheckKnownChallengeContext
   /**
    * Set to the challenge we are looking for.
    */
-  const struct GNUNET_ShortHashCode *challenge;
+  const struct ChallengeNonceP *challenge;
 
   /**
    * Set to a matching validation state, if one was found.
@@ -5567,7 +6968,7 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
 {
   struct Neighbour *n;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, pid);
+  n = lookup_neighbour (pid);
   if (NULL == n)
     return NULL;
   for (struct Queue *pos = n->queue_head; NULL != pos;
@@ -5580,75 +6981,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
 }
 
 
-/**
- * Task run periodically to check whether the validity of the given queue has
- * run its course. If so, finds either another queue to take over, or clears
- * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
- * chance to take over, and if that fails, notifies CORE about the disconnect.
- *
- * @param cls a `struct Queue`
- */
-static void
-core_queue_visibility_check (void *cls)
-{
-  struct Queue *q = cls;
-
-  q->visibility_task = NULL;
-  if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
-  {
-    q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
-                                                  &core_queue_visibility_check,
-                                                  q);
-    return;
-  }
-  update_neighbour_core_visibility (q->neighbour);
-}
-
-
-/**
- * Check whether the CORE visibility of @a n should change.  Finds either a
- * queue to preserve the visibility, or clears the neighbour's `core_visible`
- * flag. In the latter case, gives DV routes a chance to take over, and if
- * that fails, notifies CORE about the disconnect.  If so, check whether we
- * need to notify CORE.
- *
- * @param n neighbour to perform the check for
- */
-static void
-update_neighbour_core_visibility (struct Neighbour *n)
-{
-  struct DistanceVector *dv;
-
-  GNUNET_assert (GNUNET_YES == n->core_visible);
-  /* Check if _any_ queue of this neighbour is still valid, if so, schedule
-     the #core_queue_visibility_check() task for that queue */
-  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
-  {
-    if (0 !=
-        GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
-    {
-      /* found a valid queue, use this one */
-      q->visibility_task =
-        GNUNET_SCHEDULER_add_at (q->validated_until,
-                                 &core_queue_visibility_check,
-                                 q);
-      return;
-    }
-  }
-  n->core_visible = GNUNET_NO;
-
-  /* Check if _any_ DV route to this neighbour is currently
-     valid, if so, do NOT tell core about the loss of direct
-     connectivity (DV still counts!) */
-  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
-  if (GNUNET_YES == dv->core_visible)
-    return;
-  /* Nothing works anymore, need to tell CORE about the loss of
-     connectivity! */
-  cores_send_disconnect_info (&n->pid);
-}
-
-
 /**
  * Communicator gave us a transport address validation response.  Process the
  * request.
@@ -5658,8 +6990,9 @@ update_neighbour_core_visibility (struct Neighbour *n)
  * @param tvr the message that was received
  */
 static void
-handle_validation_response (void *cls,
-                            const struct TransportValidationResponse *tvr)
+handle_validation_response (
+  void *cls,
+  const struct TransportValidationResponseMessage *tvr)
 {
   struct CommunicatorMessageContext *cmc = cls;
   struct ValidationState *vs;
@@ -5667,8 +7000,8 @@ handle_validation_response (void *cls,
                                             .vs = NULL};
   struct GNUNET_TIME_Absolute origin_time;
   struct Queue *q;
-  struct DistanceVector *dv;
   struct Neighbour *n;
+  struct VirtualLink *vl;
 
   /* check this is one of our challenges */
   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -5764,26 +7097,30 @@ handle_validation_response (void *cls,
     return;
   }
   q->validated_until = vs->validated_until;
-  q->rtt = vs->validation_rtt;
+  q->pd.aged_rtt = vs->validation_rtt;
   n = q->neighbour;
-  if (GNUNET_NO != n->core_visible)
-    return; /* nothing changed, we are done here */
-  n->core_visible = GNUNET_YES;
-  q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
-                                                &core_queue_visibility_check,
-                                                q);
-  /* Check if _any_ DV route to this neighbour is
-     currently valid, if so, do NOT tell core anything! */
-  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
-  if ((NULL != dv) && (GNUNET_YES == dv->core_visible))
-    return; /* nothing changed, done */
-  /* We lacked a confirmed connection to the neighbour
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+  if (NULL != vl)
+  {
+    /* Link was already up, remember n is also now available and we are done */
+    vl->n = n;
+    return;
+  }
+  vl = GNUNET_new (struct VirtualLink);
+  vl->target = n->pid;
+  vl->n = n;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
      before, so tell CORE about it (finally!) */
-  cores_send_connect_info (&n->pid,
-                           (NULL != dv)
-                             ? GNUNET_BANDWIDTH_value_sum (dv->quota_out,
-                                                           n->quota_out)
-                             : n->quota_out);
+  cores_send_connect_info (&n->pid);
 }
 
 
@@ -5820,41 +7157,37 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
   struct GNUNET_MQ_MessageHandler handlers[] =
     {GNUNET_MQ_hd_var_size (fragment_box,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
-                            struct TransportFragmentBox,
+                            struct TransportFragmentBoxMessage,
                             &cmc),
-     GNUNET_MQ_hd_fixed_size (fragment_ack,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
-                              struct TransportFragmentAckMessage,
-                              &cmc),
      GNUNET_MQ_hd_var_size (reliability_box,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
-                            struct TransportReliabilityBox,
+                            struct TransportReliabilityBoxMessage,
+                            &cmc),
+     GNUNET_MQ_hd_var_size (reliability_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+                            struct TransportReliabilityAckMessage,
                             &cmc),
-     GNUNET_MQ_hd_fixed_size (reliability_ack,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
-                              struct TransportReliabilityAckMessage,
-                              &cmc),
      GNUNET_MQ_hd_var_size (backchannel_encapsulation,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
                             struct TransportBackchannelEncapsulationMessage,
                             &cmc),
      GNUNET_MQ_hd_var_size (dv_learn,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
-                            struct TransportDVLearn,
+                            struct TransportDVLearnMessage,
                             &cmc),
      GNUNET_MQ_hd_var_size (dv_box,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
-                            struct TransportDVBox,
+                            struct TransportDVBoxMessage,
                             &cmc),
      GNUNET_MQ_hd_fixed_size (
        validation_challenge,
        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
-       struct TransportValidationChallenge,
+       struct TransportValidationChallengeMessage,
        &cmc),
      GNUNET_MQ_hd_fixed_size (
        validation_response,
        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
-       struct TransportValidationResponse,
+       struct TransportValidationResponseMessage,
        &cmc),
      GNUNET_MQ_handler_end ()};
   int ret;
@@ -5897,25 +7230,6 @@ check_add_queue_message (void *cls,
 }
 
 
-/**
- * Bandwidth tracker informs us that the delay until we should receive
- * more has changed.
- *
- * @param cls a `struct Queue` for which the delay changed
- */
-static void
-tracker_update_in_cb (void *cls)
-{
-  struct Queue *queue = cls;
-  struct GNUNET_TIME_Relative in_delay;
-  unsigned int rsize;
-
-  rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
-  in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, rsize);
-  // FIXME: how exactly do we do inbound flow control?
-}
-
-
 /**
  * If necessary, generates the UUID for a @a pm
  *
@@ -5933,28 +7247,77 @@ set_pending_message_uuid (struct PendingMessage *pm)
 }
 
 
+/**
+ * Setup data structure waiting for acknowledgements.
+ *
+ * @param queue queue the @a pm will be sent over
+ * @param dvh path the message will take, may be NULL
+ * @param pm the pending message for transmission
+ * @return corresponding fresh pending acknowledgement
+ */
+static struct PendingAcknowledgement *
+prepare_pending_acknowledgement (struct Queue *queue,
+                                 struct DistanceVectorHop *dvh,
+                                 struct PendingMessage *pm)
+{
+  struct PendingAcknowledgement *pa;
+
+  pa = GNUNET_new (struct PendingAcknowledgement);
+  pa->queue = queue;
+  pa->dvh = dvh;
+  pa->pm = pm;
+  do
+  {
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                                &pa->ack_uuid,
+                                sizeof (pa->ack_uuid));
+  } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
+                           pending_acks,
+                           &pa->ack_uuid.value,
+                           pa,
+                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
+  GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
+  if (NULL != dvh)
+    GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
+  pa->transmission_time = GNUNET_TIME_absolute_get ();
+  pa->message_size = pm->bytes_msg;
+  return pa;
+}
+
+
 /**
  * Fragment the given @a pm to the given @a mtu.  Adds
  * additional fragments to the neighbour as well. If the
  * @a mtu is too small, generates and error for the @a pm
  * and returns NULL.
  *
+ * @param queue which queue to fragment for
+ * @param dvh path the message will take, or NULL
  * @param pm pending message to fragment for transmission
- * @param mtu MTU to apply
  * @return new message to transmit
  */
 static struct PendingMessage *
-fragment_message (struct PendingMessage *pm, uint16_t mtu)
+fragment_message (struct Queue *queue,
+                  struct DistanceVectorHop *dvh,
+                  struct PendingMessage *pm)
 {
+  struct PendingAcknowledgement *pa;
   struct PendingMessage *ff;
+  uint16_t mtu;
 
+  pa = prepare_pending_acknowledgement (queue, dvh, pm);
+  mtu = (0 == queue->mtu)
+          ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
+          : queue->mtu;
   set_pending_message_uuid (pm);
 
   /* This invariant is established in #handle_add_queue_message() */
-  GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
+  GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
 
   /* select fragment for transmission, descending the tree if it has
-     been expanded until we are at a leaf or at a fragment that is small enough
+     been expanded until we are at a leaf or at a fragment that is small
+     enough
    */
   ff = pm;
   while (((ff->bytes_msg > mtu) || (pm == ff)) &&
@@ -5967,7 +7330,7 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
   {
     /* Did not yet calculate all fragments, calculate next fragment */
     struct PendingMessage *frag;
-    struct TransportFragmentBox tfb;
+    struct TransportFragmentBoxMessage tfb;
     const char *orig;
     char *msg;
     uint16_t fragmax;
@@ -5979,26 +7342,28 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
     msize = ff->bytes_msg;
     if (pm != ff)
     {
-      const struct TransportFragmentBox *tfbo;
+      const struct TransportFragmentBoxMessage *tfbo;
 
-      tfbo = (const struct TransportFragmentBox *) orig;
-      orig += sizeof (struct TransportFragmentBox);
-      msize -= sizeof (struct TransportFragmentBox);
+      tfbo = (const struct TransportFragmentBoxMessage *) orig;
+      orig += sizeof (struct TransportFragmentBoxMessage);
+      msize -= sizeof (struct TransportFragmentBoxMessage);
       xoff = ntohs (tfbo->frag_off);
     }
-    fragmax = mtu - sizeof (struct TransportFragmentBox);
+    fragmax = mtu - sizeof (struct TransportFragmentBoxMessage);
     fragsize = GNUNET_MIN (msize - ff->frag_off, fragmax);
-    frag = GNUNET_malloc (sizeof (struct PendingMessage) +
-                          sizeof (struct TransportFragmentBox) + fragsize);
+    frag =
+      GNUNET_malloc (sizeof (struct PendingMessage) +
+                     sizeof (struct TransportFragmentBoxMessage) + fragsize);
     frag->target = pm->target;
     frag->frag_parent = ff;
     frag->timeout = pm->timeout;
-    frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
+    frag->bytes_msg = sizeof (struct TransportFragmentBoxMessage) + fragsize;
     frag->pmt = PMT_FRAGMENT_BOX;
     msg = (char *) &frag[1];
     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
-    tfb.header.size = htons (sizeof (struct TransportFragmentBox) + fragsize);
-    tfb.frag_uuid = htonl (pm->frag_uuidgen++);
+    tfb.header.size =
+      htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
+    tfb.ack_uuid = pa->ack_uuid;
     tfb.msg_uuid = pm->msg_uuid;
     tfb.frag_off = htons (ff->frag_off + xoff);
     tfb.msg_size = htons (pm->bytes_msg);
@@ -6029,13 +7394,18 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
  * @a pm).  If the @a pm is already fragmented or reliability boxed,
  * or itself an ACK, this function simply returns @a pm.
  *
+ * @param queue which queue to prepare transmission for
+ * @param dvh path the message will take, or NULL
  * @param pm pending message to box for transmission over unreliabile queue
  * @return new message to transmit
  */
 static struct PendingMessage *
-reliability_box_message (struct PendingMessage *pm)
+reliability_box_message (struct Queue *queue,
+                         struct DistanceVectorHop *dvh,
+                         struct PendingMessage *pm)
 {
-  struct TransportReliabilityBox rbox;
+  struct TransportReliabilityBoxMessage rbox;
+  struct PendingAcknowledgement *pa;
   struct PendingMessage *bpm;
   char *msg;
 
@@ -6049,9 +7419,11 @@ reliability_box_message (struct PendingMessage *pm)
   {
     /* failed hard */
     GNUNET_break (0);
-    client_send_response (pm, GNUNET_NO, 0);
+    client_send_response (pm);
     return NULL;
   }
+  pa = prepare_pending_acknowledgement (queue, dvh, pm);
+
   bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
                        pm->bytes_msg);
   bpm->target = pm->target;
@@ -6064,7 +7436,8 @@ reliability_box_message (struct PendingMessage *pm)
   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
-  rbox.msg_uuid = pm->msg_uuid;
+
+  rbox.ack_uuid = pa->ack_uuid;
   msg = (char *) &bpm[1];
   memcpy (msg, &rbox, sizeof (rbox));
   memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
@@ -6128,8 +7501,8 @@ update_pm_next_attempt (struct PendingMessage *pm,
 
 
 /**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
+ * We believe we are ready to transmit a message on a queue.
+ * Gives the message to the
  * communicator for transmission (updating the tracker, and re-scheduling
  * itself if applicable).
  *
@@ -6161,7 +7534,7 @@ transmit_on_queue (void *cls)
     return; /* do it later */
   overhead = 0;
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    overhead += sizeof (struct TransportReliabilityBox);
+    overhead += sizeof (struct TransportReliabilityBoxMessage);
   s = pm;
   if ( ( (0 != queue->mtu) &&
         (pm->bytes_msg + overhead > queue->mtu) ) ||
@@ -6169,11 +7542,7 @@ transmit_on_queue (void *cls)
        (NULL != pm->head_frag /* fragments already exist, should
                                 respect that even if MTU is 0 for
                                 this queue */) )
-    s = fragment_message (s,
-                          (0 == queue->mtu)
-                            ? UINT16_MAX -
-                                sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
-                            : queue->mtu);
+    s = fragment_message (queue, pm->dvh, s);
   if (NULL == s)
   {
     /* Fragmentation failed, try next message... */
@@ -6181,7 +7550,8 @@ transmit_on_queue (void *cls)
     return;
   }
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    s = reliability_box_message (s);
+    // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
+    s = reliability_box_message (queue, pm->dvh, s);
   if (NULL == s)
   {
     /* Reliability boxing failed, try next message... */
@@ -6200,7 +7570,7 @@ transmit_on_queue (void *cls)
       (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
   {
     /* Full message sent, and over reliabile channel */
-    client_send_response (pm, GNUNET_YES, pm->bytes_msg);
+    client_send_response (pm);
   }
   else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
             queue->tc->details.communicator.cc) &&
@@ -6225,10 +7595,7 @@ transmit_on_queue (void *cls)
 
     /* Was this the last applicable fragmment? */
     if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (
-        pm,
-        GNUNET_YES,
-        pm->bytes_msg /* FIXME: calculate and add overheads! */);
+      client_send_response (pm);
   }
   else if (PMT_CORE != pm->pmt)
   {
@@ -6247,7 +7614,8 @@ transmit_on_queue (void *cls)
        message urgency and size when delaying ACKs, etc.) */
     update_pm_next_attempt (s,
                             GNUNET_TIME_relative_to_absolute (
-                              GNUNET_TIME_relative_multiply (queue->rtt, 4)));
+                              GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
+                                                             4)));
   }
 
   /* finally, re-schedule queue transmission task itself */
@@ -6255,73 +7623,6 @@ transmit_on_queue (void *cls)
 }
 
 
-/**
- * Bandwidth tracker informs us that the delay until we
- * can transmit again changed.
- *
- * @param cls a `struct Queue` for which the delay changed
- */
-static void
-tracker_update_out_cb (void *cls)
-{
-  struct Queue *queue = cls;
-  struct Neighbour *n = queue->neighbour;
-
-  if (NULL == n->pending_msg_head)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Bandwidth allocation updated for empty transmission queue `%s'\n",
-                queue->address);
-    return; /* no message pending, nothing to do here! */
-  }
-  GNUNET_SCHEDULER_cancel (queue->transmit_task);
-  queue->transmit_task = NULL;
-  schedule_transmit_on_queue (queue, GNUNET_NO);
-}
-
-
-/**
- * Bandwidth tracker informs us that excessive outbound bandwidth was
- * allocated which is not being used.
- *
- * @param cls a `struct Queue` for which the excess was noted
- */
-static void
-tracker_excess_out_cb (void *cls)
-{
-  (void) cls;
-
-  /* FIXME: trigger excess bandwidth report to core? Right now,
-     this is done internally within transport_api2_core already,
-     but we probably want to change the logic and trigger it
-     from here via a message instead! */
-  /* TODO: maybe inform someone at this point? */
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# Excess outbound bandwidth reported",
-                            1,
-                            GNUNET_NO);
-}
-
-
-/**
- * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
- * which is not being used.
- *
- * @param cls a `struct Queue` for which the excess was noted
- */
-static void
-tracker_excess_in_cb (void *cls)
-{
-  (void) cls;
-
-  /* TODO: maybe inform somone at this point? */
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# Excess inbound bandwidth reported",
-                            1,
-                            GNUNET_NO);
-}
-
-
 /**
  * Queue to a peer went down.  Process the request.
  *
@@ -6484,7 +7785,7 @@ notify_client_queues (void *cls,
   for (struct Queue *q = neighbour->queue_head; NULL != q;
        q = q->next_neighbour)
   {
-    struct MonitorEvent me = {.rtt = q->rtt,
+    struct MonitorEvent me = {.rtt = q->pd.aged_rtt,
                               .cs = q->cs,
                               .num_msg_pending = q->num_msg_pending,
                               .num_bytes_pending = q->num_bytes_pending};
@@ -6605,7 +7906,7 @@ suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char *address)
 static void
 validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
 {
-  struct TransportValidationChallenge tvc;
+  struct TransportValidationChallengeMessage tvc;
 
   vs->last_challenge_use = GNUNET_TIME_absolute_get ();
   tvc.header.type =
@@ -6718,7 +8019,7 @@ check_connection_quality (void *cls,
       ctx->q = q;
     /* OPTIMIZE-FIXME: in the future, add reliability / goodput
        statistics and consider those as well here? */
-    if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
+    if (q->pd.aged_rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
       do_inc = GNUNET_YES;
   }
   if (GNUNET_YES == do_inc)
@@ -6743,7 +8044,7 @@ start_dv_learn (void *cls)
 {
   struct LearnLaunchEntry *lle;
   struct QueueQualityContext qqc;
-  struct TransportDVLearn dvl;
+  struct TransportDVLearnMessage dvl;
 
   (void) cls;
   dvlearn_task = NULL;
@@ -6772,7 +8073,7 @@ start_dv_learn (void *cls)
     lle = lle_tail;
     GNUNET_assert (GNUNET_YES ==
                    GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
-                                                          &lle->challenge,
+                                                          &lle->challenge.value,
                                                           lle));
     GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);
     GNUNET_free (lle);
@@ -6786,7 +8087,7 @@ start_dv_learn (void *cls)
   GNUNET_break (GNUNET_YES ==
                 GNUNET_CONTAINER_multishortmap_put (
                   dvlearn_map,
-                  &lle->challenge,
+                  &lle->challenge.value,
                   lle,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   dvl.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
@@ -6794,10 +8095,13 @@ start_dv_learn (void *cls)
   dvl.num_hops = htons (0);
   dvl.bidirectional = htons (0);
   dvl.non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+  dvl.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
   {
     struct DvInitPS dvip = {.purpose.purpose = htonl (
                               GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
                             .purpose.size = htonl (sizeof (dvip)),
+                            .monotonic_time = dvl.monotonic_time,
                             .challenge = lle->challenge};
 
     GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
@@ -6858,6 +8162,43 @@ check_validation_request_pending (void *cls,
 }
 
 
+/**
+ * Function called with the monotonic time of a DV initiator
+ * by PEERSTORE. Updates the time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
+ */
+static void
+neighbour_dv_monotime_cb (void *cls,
+                          const struct GNUNET_PEERSTORE_Record *record,
+                          const char *emsg)
+{
+  struct Neighbour *n = cls;
+  struct GNUNET_TIME_AbsoluteNBO *mtbe;
+
+  (void) emsg;
+  if (NULL == record)
+  {
+    /* we're done with #neighbour_dv_monotime_cb() invocations,
+       continue normal processing */
+    n->get = NULL;
+    n->dv_monotime_available = GNUNET_YES;
+    return;
+  }
+  if (sizeof (*mtbe) != record->value_size)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  mtbe = record->value;
+  n->last_dv_learn_monotime =
+    GNUNET_TIME_absolute_max (n->last_dv_learn_monotime,
+                              GNUNET_TIME_absolute_ntoh (*mtbe));
+}
+
+
 /**
  * New queue became available.  Process the request.
  *
@@ -6874,7 +8215,7 @@ handle_add_queue_message (void *cls,
   const char *addr;
   uint16_t addr_len;
 
-  if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
+  if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBoxMessage))
   {
     /* MTU so small as to be useless for transmissions,
        required for #fragment_message()! */
@@ -6886,7 +8227,6 @@ handle_add_queue_message (void *cls,
   if (NULL == neighbour)
   {
     neighbour = GNUNET_new (struct Neighbour);
-    neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
     neighbour->pid = aqm->receiver;
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multipeermap_put (
@@ -6894,6 +8234,13 @@ handle_add_queue_message (void *cls,
                      &neighbour->pid,
                      neighbour,
                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    neighbour->get =
+      GNUNET_PEERSTORE_iterate (peerstore,
+                                "transport",
+                                &neighbour->pid,
+                                GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+                                &neighbour_dv_monotime_cb,
+                                neighbour);
   }
   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
   addr = (const char *) &aqm[1];
@@ -6901,30 +8248,16 @@ handle_add_queue_message (void *cls,
   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
   queue->tc = tc;
   queue->address = (const char *) &queue[1];
-  queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+  queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
   queue->qid = aqm->qid;
   queue->mtu = ntohl (aqm->mtu);
   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
   queue->neighbour = neighbour;
-  GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
-                                  &tracker_update_in_cb,
-                                  queue,
-                                  GNUNET_BANDWIDTH_ZERO,
-                                  GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
-                                  &tracker_excess_in_cb,
-                                  queue);
-  GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
-                                  &tracker_update_out_cb,
-                                  queue,
-                                  GNUNET_BANDWIDTH_ZERO,
-                                  GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
-                                  &tracker_excess_out_cb,
-                                  queue);
   memcpy (&queue[1], addr, addr_len);
   /* notify monitors about new queue */
   {
-    struct MonitorEvent me = {.rtt = queue->rtt, .cs = queue->cs};
+    struct MonitorEvent me = {.rtt = queue->pd.aged_rtt, .cs = queue->cs};
 
     notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   }
@@ -6980,8 +8313,8 @@ handle_queue_create_ok (void *cls,
 
 
 /**
- * Communicator tells us that our request to create a queue failed. This usually
- * indicates that the provided address is simply invalid or that the
+ * Communicator tells us that our request to create a queue failed. This
+ * usually indicates that the provided address is simply invalid or that the
  * communicator's resources are exhausted.
  *
  * @param cls the `struct TransportClient`
@@ -7274,7 +8607,8 @@ handle_address_consider_verify (
   (void) cls;
   // OPTIMIZE-FIXME: checking that we know this address already should
   //        be done BEFORE checking the signature => HELLO API change!
-  // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / validation?!
+  // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
+  // validation?!
   address =
     GNUNET_HELLO_extract_address (&hdr[1],
                                   ntohs (hdr->header.size) - sizeof (*hdr),
@@ -7421,6 +8755,50 @@ free_validation_state_cb (void *cls,
 }
 
 
+/**
+ * Free pending acknowledgement.
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct PendingAcknowledgement`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_pending_ack_cb (void *cls,
+                     const struct GNUNET_ShortHashCode *key,
+                     void *value)
+{
+  struct PendingAcknowledgement *pa = value;
+
+  (void) cls;
+  (void) key;
+  free_pending_acknowledgement (pa);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Free acknowledgement cummulator.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct AcknowledgementCummulator`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_ack_cummulator_cb (void *cls,
+                        const struct GNUNET_PeerIdentity *pid,
+                        void *value)
+{
+  struct AcknowledgementCummulator *ac = value;
+
+  (void) cls;
+  (void) pid;
+  GNUNET_free (ac);
+  return GNUNET_OK;
+}
+
+
 /**
  * Function called when the service shuts down.  Unloads our plugins
  * and cancels pending validations.
@@ -7454,8 +8832,27 @@ do_shutdown (void *cls)
     GNUNET_free (GST_my_private_key);
     GST_my_private_key = NULL;
   }
+  GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators,
+                                         &free_ack_cummulator_cb,
+                                         NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
+  ack_cummulators = NULL;
+  GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
+                                          &free_pending_ack_cb,
+                                          NULL);
+  GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+  pending_acks = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
   neighbours = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
+  GNUNET_CONTAINER_multipeermap_destroy (links);
+  links = NULL;
+  GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
+                                         &free_backtalker_cb,
+                                         NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (backtalkers);
+  backtalkers = NULL;
   GNUNET_CONTAINER_multipeermap_iterate (validation_map,
                                          &free_validation_state_cb,
                                          NULL);
@@ -7499,7 +8896,11 @@ run (void *cls,
   (void) service;
   /* setup globals */
   GST_cfg = c;
+  backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+  pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+  ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
+  links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
   ephemeral_heap =
@@ -7569,6 +8970,10 @@ GNUNET_SERVICE_MAIN (
                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
                          struct OutboundMessage,
                          NULL),
+  GNUNET_MQ_hd_fixed_size (client_recv_ok,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
+                           struct RecvOkMessage,
+                           NULL),
   /* communication with communicators */
   GNUNET_MQ_hd_var_size (communicator_available,
                          GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,