FC work
authorChristian Grothoff <christian@grothoff.org>
Sun, 19 May 2019 11:56:22 +0000 (13:56 +0200)
committerChristian Grothoff <christian@grothoff.org>
Sun, 19 May 2019 11:56:22 +0000 (13:56 +0200)
src/include/gnunet_protocols.h
src/transport/gnunet-service-tng.c

index 9a1ef32eed27f94cf82bddb0ba36e95e6c7d4fcf..73da40038addf9e0d52b4c1de5be6e4b88339a1e 100644 (file)
@@ -3195,16 +3195,10 @@ extern "C" {
  */
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE 1303
 
-/**
- * P2P message: transport proves that an address worked and provides
- * a new challenge for the other direction.
- */
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE 1304
-
 /**
  * P2P message: transport proves that an address worked.
  */
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE 1305
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE 1304
 
 
 /* ************** NEW (NG) ATS Messages ************* */
index e00c7d5738ab5e0355cf6e3c823cea528fb6e977..f009a491b508e102be325d5421a5bb8ad37b37af 100644 (file)
  * TODO:
  * Implement next:
  * - FIXME-FC: realize transport-to-transport flow control (needed in case
- *   communicators do not offer flow control).  Note that we may not
- *   want to simply delay the ACKs as that may cause unnecessary
- *   re-transmissions. => Introduce proper flow and congestion window(s)!
+ *   communicators do not offer flow control).
+ *   We do transmit FC window sizes now.  Left:
+ *   for SENDING)
+ *   - Increment "outbound_fc_window_size_used_kb" on transmission
+ *   - Throttle sending if "outbound_fc_window_size_used_kb" reaches limit
+ *   - Send *new* challenge when we get close to the limit (including
+ *     at the beginning when the limit is zero!)
+ *   - Retransmit challenge if it goes unanswered!
+ *
+ *   for RECEIVING)
+ *   - increment incoming_fc_window_size_ram_kb when receiving
+ *     incoming packets!
+ *   - OR drop if incoming_fc_window_size_ram goes
+ *     (significantly?) beyond available_fc_window_size_kb
+ *   - decrement incoming_fc_window_size_ram_kb when CORE is done
+ *     with incoming packets!
+ *   - increment incoming_fc_window_size_used_kb when CORE is done
+ *     with incoming packets!
+ *
  * - review retransmission logic, right now there is no smartness there!
- *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
+ *   => congestion control, etc [PERFORMANCE-BASICS]
  *
  * Optimizations:
  * - When forwarding DV learn messages, if a peer is reached that
  *   AcknowledgementUUIDP altogether, as they won't be acked! [BANDWIDTH]
  *   (-> have 2nd type of acknowledgment message; low priority, as we
  *       do not have an MTU-limited *reliable* communicator)
+ * - Adapt available_fc_window_size_kb, using larger values for high-bandwidth
+ *   and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls]
+ * - Set last_window_consum_limit_kb promise properly based on
+ *   latency and bandwidth of the respective connection [GOODPUT / utilization / stalls]
+ * - re-sending challenge response without a challenge when we have
+ *   significantly increased the FC window (upon CORE being done with messages)
+ *   so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
+ *   Also can re-use signature in this case [CPU].
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
  */
 #define GOODPUT_AGING_SLOTS 4
 
+/**
+ * How big is the flow control window size by default;
+ * limits per-neighbour RAM utilization (in kilobytes).
+ */
+#define DEFAULT_WINDOW_SIZE_KB 128
+
+/**
+ * For how many incoming connections do we try to create a
+ * virtual link for (at the same time!).  This does NOT
+ * limit the number of incoming connections, just the number
+ * for which we are actively trying to find working addresses
+ * in the absence (!) of our own applications wanting the
+ * link to go up.
+ */
+#define MAX_INCOMING_REQUEST 16
+
 /**
  * Maximum number of peers we select for forwarding DVInit
  * messages at the same time (excluding initiator).
@@ -818,64 +858,8 @@ struct TransportValidationChallengeMessage
   struct ChallengeNonceP challenge;
 
   /**
-   * Timestamp of the sender, to be copied into the reply
-   * to allow sender to calculate RTT.
-   */
-  struct GNUNET_TIME_AbsoluteNBO sender_time;
-};
-
-
-/**
- * Message send to another peer to answer to a validation challenge
- * and at the same time issue a challenge in the other direction.
- */
-struct TransportValidationChallengeResponseMessage
-{
-
-  /**
-   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Flow control window size in kilobytes (1024 b), in NBO.
-   * The receiver can now send this many kilobytes as per
-   * the @e received_challenge "account".
-   */
-  uint32_t fc_window_size_kb GNUNET_PACKED;
-
-  /**
-   * Challenge returned to the origin by the receiving peer.
-   */
-  struct ChallengeNonceP received_challenge;
-
-  /**
-   * The peer's signature matching the
-   * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose.
-   */
-  struct GNUNET_CRYPTO_EddsaSignature signature;
-
-  /**
-   * Fresh challenge created by the sender to be returned
-   * by the receiving peer.
-   */
-  struct ChallengeNonceP sender_challenge;
-
-  /**
-   * How long does the sender believe the address on
-   * which the challenge was received to remain valid?
-   */
-  struct GNUNET_TIME_RelativeNBO validity_duration;
-
-  /**
-   * Timestamp of the sender, to be copied into the reply
-   * to allow sender to calculate RTT.
-   */
-  struct GNUNET_TIME_AbsoluteNBO origin_time;
-
-  /**
-   * Timestamp of the sender, to be copied into the reply
-   * to allow sender to calculate RTT.
+   * Timestamp of the sender, to be copied into the reply to allow
+   * sender to calculate RTT.  Must be monotonically increasing!
    */
   struct GNUNET_TIME_AbsoluteNBO sender_time;
 };
@@ -1224,7 +1208,84 @@ struct VirtualLink
   uint64_t message_uuid_ctr;
 
   /**
-   * How many more messages can we send to core before we exhaust
+   * Sender timestamp of @e n_challenge, used to generate out-of-order
+   * challenges (as sender's timestamps must be monotonically
+   * increasing).  Note that we do not persist this monotonic time
+   * as we do not really have to worry about ancient flow control
+   * window sizes after restarts.
+   */
+  struct GNUNET_TIME_Absolute n_challenge_time;
+
+  /**
+   * Last challenge we received from @a n, for which we created the
+   * flow control window given in @e fc_window_size_kb.
+   */
+  struct ChallengeNonceP n_challenge;
+
+  /**
+   * Last challenge we used with @a n for flow control. If we receive
+   * window size increases for a different challenge, they are
+   * out-of-order and must be discarded!
+   */
+  struct ChallengeNonceP my_challenge;
+
+  /**
+   * Memory allocated for this virtual link.  Expresses how much RAM
+   * we are willing to allocate to this virtual link.  OPTIMIZE-ME:
+   * Can be adapted to dedicate more RAM to links that need it, while
+   * sticking to some overall RAM limit.  For now, set to
+   * #DEFAULT_WINDOW_SIZE_KB.
+   */
+  uint32_t available_fc_window_size_kb;
+
+  /**
+   * Memory actually used to buffer packets on this virtual link.
+   * Expresses how much RAM we are currently using for virtual link.
+   * Note that once CORE is done with a packet, we decrement the value
+   * here.
+   */
+  uint32_t incoming_fc_window_size_ram_kb;
+
+  /**
+   * Last flow control window size we provided to the other peer, in
+   * kilobytes (1024 b).  We are allowing the other peer to send this
+   * many kilobytes as per its last @e n_challenge "account".
+   */
+  uint32_t incoming_fc_window_size_kb;
+
+  /**
+   * How many bytes could we still get from the previous flow control
+   * window, in kilobytes (1024 b).  We need to consider this value
+   * when calculating what we allow for the current window due to
+   * the possibility of out-of-order challenges.
+   */
+  uint32_t last_fc_window_size_remaining_kb;
+
+  /**
+   * How much of the window did the other peer successfully use (and
+   * we already passed it on to CORE)? Must be below @e
+   * incoming_fc_window_size_kb.   We should effectively signal the
+   * other peer that the window is this much bigger at the next
+   * opportunity / challenge.
+   */
+  uint32_t incoming_fc_window_size_used_kb;
+
+  /**
+   * Our current flow control window size in kilobytes (1024 b).  We
+   * are allowed to transmit this many kilobytes to @a n as per
+   * our @e my_challenge "account".
+   */
+  uint32_t outbound_fc_window_size_kb;
+
+  /**
+   * How much of our current flow control window size have we
+   * used (in kilobytes (1024 b)).  Must be below
+   * @e outbound_fc_window_size_kb.
+   */
+  uint32_t outbound_fc_window_size_used_kb;
+
+  /**
+   * How many more messages can we send to CORE before we exhaust
    * the receive window of CORE for this peer? If this hits zero,
    * we must tell communicators to stop providing us more messages
    * for this peer.  In fact, the window can go negative as we
@@ -1786,6 +1847,35 @@ struct Neighbour
 };
 
 
+/**
+ * Another peer attempted to talk to us, we should try to establish
+ * a connection in the other direction.
+ */
+struct IncomingRequest
+{
+
+  /**
+   * Kept in a DLL.
+   */
+  struct IncomingRequest *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct IncomingRequest *prev;
+
+  /**
+   * Handle for watching the peerstore for HELLOs for this peer.
+   */
+  struct GNUNET_PEERSTORE_WatchContext *wc;
+
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity pid;
+};
+
+
 /**
  * A peer that an application (client) would like us to talk to directly.
  */
@@ -2356,6 +2446,13 @@ struct ValidationState
    */
   struct GNUNET_PEERSTORE_StoreContext *sc;
 
+  /**
+   * Self-imposed limit on the previous flow control window. (May be zero,
+   * if we never used data from the previous window or are establishing the
+   * connection for the first time).
+   */
+  uint32_t last_window_consum_limit_kb;
+
   /**
    * We are technically ready to send the challenge, but we are waiting for
    * the respective queue to become available for transmission.
@@ -2544,6 +2641,23 @@ static struct PendingAcknowledgement *pa_head;
  */
 static struct PendingAcknowledgement *pa_tail;
 
+/**
+ * List of incomming connections where we are trying
+ * to get a connection back established. Length
+ * kept in #ir_total.
+ */
+static struct IncomingRequest *ir_head;
+
+/**
+ * Tail of DLL starting at #ir_head.
+ */
+static struct IncomingRequest *ir_tail;
+
+/**
+ * Length of the DLL starting at #ir_head.
+ */
+static unsigned int ir_total;
+
 /**
  * Generator of `logging_uuid` in `struct PendingMessage`.
  */
@@ -2586,6 +2700,22 @@ get_age ()
 }
 
 
+/**
+ * Release @a ir data structure.
+ *
+ * @param ir data structure to release
+ */
+static void
+free_incoming_request (struct IncomingRequest *ir)
+{
+  GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
+  GNUNET_assert (ir_total > 0);
+  ir_total--;
+  GNUNET_PEERSTORE_watch_cancel (ir->wc);
+  GNUNET_free (ir);
+}
+
+
 /**
  * Release @a pa data structure.
  *
@@ -5688,6 +5818,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
   vl->dv = dv;
   dv->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->available_fc_window_size_kb = DEFAULT_WINDOW_SIZE_KB;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
   GNUNET_break (GNUNET_YES ==
@@ -7052,87 +7183,98 @@ check_incoming_msg (void *cls,
 
 
 /**
- * Communicator gave us a transport address validation challenge.  Process the
- * request.
+ * We received a @a challenge from another peer, check if we can
+ * increase the flow control window to that peer.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param tvc the message that was received
+ * @param vl virtual link
+ * @param challenge the challenge we received
+ * @param sender_time when did the peer send the message?
+ * @param last_window_consum_limit_kb maximum number of kb the sender
+ *        promises to use of the previous window (if any)
  */
 static void
-handle_validation_challenge (
-  void *cls,
-  const struct TransportValidationChallengeMessage *tvc)
-{
-  struct CommunicatorMessageContext *cmc = cls;
-  struct TransportValidationResponseMessage *tvr;
-
-  if (cmc->total_hops > 0)
-  {
-    /* DV routing is not allowed for validation challenges! */
-    GNUNET_break_op (0);
-    finish_cmc_handling (cmc);
+update_fc_window (struct VirtualLink *vl,
+                  const struct ChallengeNonceP *challenge,
+                  struct GNUNET_TIME_Absolute sender_time,
+                  uint32_t last_window_consum_limit_kb)
+{
+  if (0 == GNUNET_memcmp (challenge, &vl->n_challenge))
+  {
+    uint32_t avail;
+
+    /* Challenge identical to last one, update
+       @a last_window_consum_limit_kb (to minimum) */
+    vl->last_fc_window_size_remaining_kb =
+      GNUNET_MIN (last_window_consum_limit_kb,
+                  vl->last_fc_window_size_remaining_kb);
+    /* window could have shrunk! */
+    if (vl->available_fc_window_size_kb > vl->last_fc_window_size_remaining_kb)
+      avail =
+        vl->available_fc_window_size_kb - vl->last_fc_window_size_remaining_kb;
+    else
+      avail = 0;
+    /* guard against integer overflow */
+    if (vl->incoming_fc_window_size_used_kb + avail >=
+        vl->incoming_fc_window_size_used_kb)
+      vl->incoming_fc_window_size_kb =
+        vl->incoming_fc_window_size_used_kb + avail;
+    else
+      vl->incoming_fc_window_size_kb = UINT32_MAX;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Updated window to %u/%u kb (%u used) for virtual link to %s!\n",
+                vl->incoming_fc_window_size_kb,
+                vl->available_fc_window_size_kb,
+                vl->incoming_fc_window_size_used_kb,
+                GNUNET_i2s (&vl->target));
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received address validation challenge %s\n",
-              GNUNET_sh2s (&tvc->challenge.value));
-  tvr = GNUNET_new (struct TransportValidationResponseMessage);
-  tvr->header.type =
-    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
-  tvr->header.size = htons (sizeof (*tvr));
-  tvr->challenge = tvc->challenge;
-  tvr->origin_time = tvc->sender_time;
-  tvr->validity_duration = cmc->im.expected_address_validity;
+  if (vl->n_challenge_time.abs_value_us >= sender_time.abs_value_us)
   {
-    /* create signature */
-    struct TransportValidationPS tvp =
-      {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
-       .purpose.size = htonl (sizeof (tvp)),
-       .validity_duration = tvr->validity_duration,
-       .challenge = tvc->challenge};
-
-    GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
-                                                          &tvp.purpose,
-                                                          &tvr->signature));
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Challenges ignored: sender time not increasing",
+                              1,
+                              GNUNET_NO);
+    return;
   }
-  route_message (&cmc->im.sender,
-                 &tvr->header,
-                 RMO_ANYTHING_GOES | RMO_REDUNDANT);
-  finish_cmc_handling (cmc);
-}
-
-
-/**
- * Communicator gave us a transport address validation challenge.  Process the
- * request.
- *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param tvrc the message that was received
- */
-static void
-handle_validation_challenge_response (
-  void *cls,
-  const struct TransportValidationChallengeResponseMessage *tvrc)
-{
-  struct CommunicatorMessageContext *cmc = cls;
-  struct TransportValidationResponseMessage *tvr;
-
-
-  finish_cmc_handling (cmc);
+  /* new challenge! */
+  if (vl->incoming_fc_window_size_used_kb > last_window_consum_limit_kb)
+  {
+    /* lying peer: it already used more than it promised it would ever use! */
+    GNUNET_break_op (0);
+    last_window_consum_limit_kb = vl->incoming_fc_window_size_used_kb;
+  }
+  /* What remains is at most the difference between what we already processed
+     and what the sender promises to limit itself to. */
+  vl->last_fc_window_size_remaining_kb =
+    last_window_consum_limit_kb - vl->incoming_fc_window_size_used_kb;
+  vl->n_challenge = *challenge;
+  vl->n_challenge_time = sender_time;
+  vl->incoming_fc_window_size_used_kb = 0;
+  /* window could have shrunk! */
+  if (vl->available_fc_window_size_kb > vl->last_fc_window_size_remaining_kb)
+    vl->incoming_fc_window_size_kb =
+      vl->available_fc_window_size_kb - vl->last_fc_window_size_remaining_kb;
+  else
+    vl->incoming_fc_window_size_kb = 0;
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "New window at %u/%u kb (%u left on previous) for virtual link to %s!\n",
+    vl->incoming_fc_window_size_kb,
+    vl->available_fc_window_size_kb,
+    vl->last_fc_window_size_remaining_kb,
+    GNUNET_i2s (&vl->target));
 }
 
 
 /**
- * Closure for #check_known_challenge.
+ * Closure for #check_known_address.
  */
-struct CheckKnownChallengeContext
+struct CheckKnownAddressContext
 {
   /**
-   * Set to the challenge we are looking for.
+   * Set to the address we are looking for.
    */
-  const struct ChallengeNonceP *challenge;
+  const char *address;
 
   /**
    * Set to a matching validation state, if one was found.
@@ -7143,51 +7285,29 @@ struct CheckKnownChallengeContext
 
 /**
  * Test if the validation state in @a value matches the
- * challenge from @a cls.
+ * address from @a cls.
  *
- * @param cls a `struct CheckKnownChallengeContext`
+ * @param cls a `struct CheckKnownAddressContext`
  * @param pid unused (must match though)
  * @param value a `struct ValidationState`
  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
  */
 static int
-check_known_challenge (void *cls,
-                       const struct GNUNET_PeerIdentity *pid,
-                       void *value)
+check_known_address (void *cls,
+                     const struct GNUNET_PeerIdentity *pid,
+                     void *value)
 {
-  struct CheckKnownChallengeContext *ckac = cls;
+  struct CheckKnownAddressContext *ckac = cls;
   struct ValidationState *vs = value;
 
   (void) pid;
-  if (0 != GNUNET_memcmp (&vs->challenge, ckac->challenge))
+  if (0 != strcmp (vs->address, ckac->address))
     return GNUNET_OK;
   ckac->vs = vs;
   return GNUNET_NO;
 }
 
 
-/**
- * Function called when peerstore is done storing a
- * validated address.
- *
- * @param cls a `struct ValidationState`
- * @param success #GNUNET_YES on success
- */
-static void
-peerstore_store_validation_cb (void *cls, int success)
-{
-  struct ValidationState *vs = cls;
-
-  vs->sc = NULL;
-  if (GNUNET_YES == success)
-    return;
-  GNUNET_STATISTICS_update (GST_stats,
-                            "# Peerstore failed to store foreign address",
-                            1,
-                            GNUNET_NO);
-}
-
-
 /**
  * Task run periodically to validate some address based on #validation_heap.
  *
@@ -7234,25 +7354,279 @@ update_next_challenge_time (struct ValidationState *vs,
 
 
 /**
- * Find the queue matching @a pid and @a address.
+ * Start address validation.
  *
- * @param pid peer the queue must go to
- * @param address address the queue must use
- * @return NULL if no such queue exists
+ * @param pid peer the @a address is for
+ * @param address an address to reach @a pid (presumably)
  */
-static struct Queue *
-find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
+static void
+start_address_validation (const struct GNUNET_PeerIdentity *pid,
+                          const char *address)
 {
-  struct Neighbour *n;
+  struct GNUNET_TIME_Absolute now;
+  struct ValidationState *vs;
+  struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
 
-  n = lookup_neighbour (pid);
-  if (NULL == n)
-    return NULL;
-  for (struct Queue *pos = n->queue_head; NULL != pos;
-       pos = pos->next_neighbour)
+  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+                                                     pid,
+                                                     &check_known_address,
+                                                     &ckac);
+  if (NULL != (vs = ckac.vs))
   {
-    if (0 == strcmp (pos->address, address))
-      return pos;
+    /* if 'vs' is not currently valid, we need to speed up retrying the
+     * validation */
+    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
+    {
+      /* reduce backoff as we got a fresh advertisement */
+      vs->challenge_backoff =
+        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
+                                  GNUNET_TIME_relative_divide (vs->challenge_backoff,
+                                                               2));
+      update_next_challenge_time (vs,
+                                  GNUNET_TIME_relative_to_absolute (
+                                    vs->challenge_backoff));
+    }
+    return;
+  }
+  now = GNUNET_TIME_absolute_get ();
+  vs = GNUNET_new (struct ValidationState);
+  vs->pid = *pid;
+  vs->valid_until =
+    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
+  vs->first_challenge_use = now;
+  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                              &vs->challenge,
+                              sizeof (vs->challenge));
+  vs->address = GNUNET_strdup (address);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Starting address validation `%s' of peer %s using challenge %s\n",
+              address,
+              GNUNET_i2s (pid),
+              GNUNET_sh2s (&vs->challenge.value));
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_put (
+                   validation_map,
+                   &vs->pid,
+                   vs,
+                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  update_next_challenge_time (vs, now);
+}
+
+
+/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure, a `struct IncomingRequest`
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ */
+static void
+handle_hello_for_incoming (void *cls,
+                           const struct GNUNET_PEERSTORE_Record *record,
+                           const char *emsg)
+{
+  struct IncomingRequest *ir = cls;
+  const char *val;
+
+  if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Got failure from PEERSTORE: %s\n",
+                emsg);
+    return;
+  }
+  val = record->value;
+  if ((0 == record->value_size) || ('\0' != val[record->value_size - 1]))
+  {
+    GNUNET_break (0);
+    return;
+  }
+  start_address_validation (&ir->pid, (const char *) record->value);
+}
+
+
+/**
+ * Communicator gave us a transport address validation challenge.  Process the
+ * request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param tvc the message that was received
+ */
+static void
+handle_validation_challenge (
+  void *cls,
+  const struct TransportValidationChallengeMessage *tvc)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  struct TransportValidationResponseMessage tvr;
+  struct VirtualLink *vl;
+  struct GNUNET_TIME_RelativeNBO validity_duration;
+  struct IncomingRequest *ir;
+  struct Neighbour *n;
+
+  /* We use a validity_duration of 0 for DV-routed messages,
+     as we can neither control the validity and need to allow
+     the receiver to tell DV paths from direct connections */
+  if (cmc->total_hops > 0)
+    validity_duration = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+  else
+    validity_duration = cmc->im.expected_address_validity;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received address validation challenge %s\n",
+              GNUNET_sh2s (&tvc->challenge.value));
+  /* If we have a virtual link, we use this mechanism to signal the
+     size of the flow control window, and to allow the sender
+     to ask for increases. If for us the virtual link is still down,
+     we will always give a window size of zero. */
+  vl = lookup_virtual_link (&cmc->im.sender);
+  if (NULL != vl)
+    update_fc_window (vl,
+                      &tvc->challenge,
+                      GNUNET_TIME_absolute_ntoh (tvc->sender_time),
+                      ntohl (tvc->last_window_consum_limit_kb));
+  tvr.header.type =
+    htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
+  tvr.header.size = htons (sizeof (tvr));
+  tvr.fc_window_size_kb =
+    htonl ((NULL == vl) ? 0 : vl->incoming_fc_window_size_kb);
+  tvr.challenge = tvc->challenge;
+  tvr.origin_time = tvc->sender_time;
+  tvr.validity_duration = validity_duration;
+  {
+    /* create signature */
+    struct TransportValidationPS tvp =
+      {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+       .purpose.size = htonl (sizeof (tvp)),
+       .validity_duration = validity_duration,
+       .challenge = tvc->challenge};
+
+    GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                                          &tvp.purpose,
+                                                          &tvr.signature));
+  }
+  route_message (&cmc->im.sender,
+                 &tvr.header,
+                 RMO_ANYTHING_GOES | RMO_REDUNDANT);
+  finish_cmc_handling (cmc);
+  if (NULL != vl)
+    return;
+
+  /* For us, the link is still down, but we need bi-directional
+     connections (for flow-control and for this to be useful for
+     CORE), so we must try to bring the link up! */
+
+  /* (1) Check existing queues, if any, we may be lucky! */
+  n = lookup_neighbour (&cmc->im.sender);
+  if (NULL != n)
+    for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+      start_address_validation (&cmc->im.sender, q->address);
+  /* (2) Also try to see if we have addresses in PEERSTORE for this peer
+     we could use */
+  for (ir = ir_head; NULL != ir; ir = ir->next)
+    if (0 == GNUNET_memcmp (&ir->pid, &cmc->im.sender))
+      return; /* we are already trying */
+  ir = GNUNET_new (struct IncomingRequest);
+  ir->pid = cmc->im.sender;
+  GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
+  ir->wc = GNUNET_PEERSTORE_watch (peerstore,
+                                   "transport",
+                                   &ir->pid,
+                                   GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
+                                   &handle_hello_for_incoming,
+                                   ir);
+  ir_total++;
+  /* Bound attempts we do in parallel here, might otherwise get excessive */
+  while (ir_total > MAX_INCOMING_REQUEST)
+    free_incoming_request (ir_head);
+}
+
+
+/**
+ * Closure for #check_known_challenge.
+ */
+struct CheckKnownChallengeContext
+{
+  /**
+   * Set to the challenge we are looking for.
+   */
+  const struct ChallengeNonceP *challenge;
+
+  /**
+   * Set to a matching validation state, if one was found.
+   */
+  struct ValidationState *vs;
+};
+
+
+/**
+ * Test if the validation state in @a value matches the
+ * challenge from @a cls.
+ *
+ * @param cls a `struct CheckKnownChallengeContext`
+ * @param pid unused (must match though)
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
+ */
+static int
+check_known_challenge (void *cls,
+                       const struct GNUNET_PeerIdentity *pid,
+                       void *value)
+{
+  struct CheckKnownChallengeContext *ckac = cls;
+  struct ValidationState *vs = value;
+
+  (void) pid;
+  if (0 != GNUNET_memcmp (&vs->challenge, ckac->challenge))
+    return GNUNET_OK;
+  ckac->vs = vs;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Function called when peerstore is done storing a
+ * validated address.
+ *
+ * @param cls a `struct ValidationState`
+ * @param success #GNUNET_YES on success
+ */
+static void
+peerstore_store_validation_cb (void *cls, int success)
+{
+  struct ValidationState *vs = cls;
+
+  vs->sc = NULL;
+  if (GNUNET_YES == success)
+    return;
+  GNUNET_STATISTICS_update (GST_stats,
+                            "# Peerstore failed to store foreign address",
+                            1,
+                            GNUNET_NO);
+}
+
+
+/**
+ * Find the queue matching @a pid and @a address.
+ *
+ * @param pid peer the queue must go to
+ * @param address address the queue must use
+ * @return NULL if no such queue exists
+ */
+static struct Queue *
+find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
+{
+  struct Neighbour *n;
+
+  n = lookup_neighbour (pid);
+  if (NULL == n)
+    return NULL;
+  for (struct Queue *pos = n->queue_head; NULL != pos;
+       pos = pos->next_neighbour)
+  {
+    if (0 == strcmp (pos->address, address))
+      return pos;
   }
   return NULL;
 }
@@ -7387,10 +7761,23 @@ handle_validation_response (
   if (NULL != vl)
   {
     /* Link was already up, remember n is also now available and we are done */
-    vl->n = n;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Virtual link to %s could now also direct neighbour!\n",
-                GNUNET_i2s (&vs->pid));
+    if (NULL == vl->n)
+    {
+      vl->n = n;
+      n->vl = vl;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Virtual link to %s could now also direct neighbour!\n",
+                  GNUNET_i2s (&vs->pid));
+    }
+    else
+    {
+      GNUNET_assert (n == vl->n);
+    }
+    if (0 == GNUNET_memcmp (&vl->my_challenge, &tvr->challenge))
+    {
+      /* Update window size if the challenge matches */
+      vl->outbound_fc_window_size_kb = ntohl (tvr->fc_window_size_kb);
+    }
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -7401,6 +7788,9 @@ handle_validation_response (
   vl->n = n;
   n->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->available_fc_window_size_kb = DEFAULT_WINDOW_SIZE_KB;
+  vl->outbound_fc_window_size_kb = ntohl (tvr->fc_window_size_kb);
+  vl->my_challenge = tvr->challenge;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
   GNUNET_break (GNUNET_YES ==
@@ -7478,11 +7868,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
        struct TransportValidationChallengeMessage,
        &cmc),
-     GNUNET_MQ_hd_fixed_size (
-       validation_challenge_response,
-       GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE,
-       struct TransportValidationChallengeResponseMessage,
-       &cmc),
      GNUNET_MQ_hd_fixed_size (
        validation_response,
        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
@@ -8398,11 +8783,11 @@ validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
 {
   struct TransportValidationChallengeMessage tvc;
 
-  vs->last_challenge_use = GNUNET_TIME_absolute_get ();
+  vs->last_challenge_use = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
   tvc.header.type =
     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
   tvc.header.size = htons (sizeof (tvc));
-  tvc.last_window_consum_limit_kb = htonl (0); // FIXME!
+  tvc.last_window_consum_limit_kb = htonl (vs->last_window_consum_limit_kb);
   tvc.challenge = vs->challenge;
   tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -8892,120 +9277,17 @@ handle_suggest_cancel (void *cls, const struct ExpressPreferenceMessage *msg)
 }
 
 
-/**
- * Closure for #check_known_address.
- */
-struct CheckKnownAddressContext
-{
-  /**
-   * Set to the address we are looking for.
-   */
-  const char *address;
-
-  /**
-   * Set to a matching validation state, if one was found.
-   */
-  struct ValidationState *vs;
-};
-
-
-/**
- * Test if the validation state in @a value matches the
- * address from @a cls.
- *
- * @param cls a `struct CheckKnownAddressContext`
- * @param pid unused (must match though)
- * @param value a `struct ValidationState`
- * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
- */
-static int
-check_known_address (void *cls,
-                     const struct GNUNET_PeerIdentity *pid,
-                     void *value)
-{
-  struct CheckKnownAddressContext *ckac = cls;
-  struct ValidationState *vs = value;
-
-  (void) pid;
-  if (0 != strcmp (vs->address, ckac->address))
-    return GNUNET_OK;
-  ckac->vs = vs;
-  return GNUNET_NO;
-}
-
-
-/**
- * Start address validation.
- *
- * @param pid peer the @a address is for
- * @param address an address to reach @a pid (presumably)
- */
-static void
-start_address_validation (const struct GNUNET_PeerIdentity *pid,
-                          const char *address)
-{
-  struct GNUNET_TIME_Absolute now;
-  struct ValidationState *vs;
-  struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
-
-  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                                     pid,
-                                                     &check_known_address,
-                                                     &ckac);
-  if (NULL != (vs = ckac.vs))
-  {
-    /* if 'vs' is not currently valid, we need to speed up retrying the
-     * validation */
-    if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
-    {
-      /* reduce backoff as we got a fresh advertisement */
-      vs->challenge_backoff =
-        GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
-                                  GNUNET_TIME_relative_divide (vs->challenge_backoff,
-                                                               2));
-      update_next_challenge_time (vs,
-                                  GNUNET_TIME_relative_to_absolute (
-                                    vs->challenge_backoff));
-    }
-    return;
-  }
-  now = GNUNET_TIME_absolute_get ();
-  vs = GNUNET_new (struct ValidationState);
-  vs->pid = *pid;
-  vs->valid_until =
-    GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
-  vs->first_challenge_use = now;
-  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
-  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                              &vs->challenge,
-                              sizeof (vs->challenge));
-  vs->address = GNUNET_strdup (address);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Starting address validation `%s' of peer %s using challenge %s\n",
-              address,
-              GNUNET_i2s (pid),
-              GNUNET_sh2s (&vs->challenge.value));
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multipeermap_put (
-                   validation_map,
-                   &vs->pid,
-                   vs,
-                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  update_next_challenge_time (vs, now);
-}
-
-
 /**
  * Function called by PEERSTORE for each matching record.
  *
- * @param cls closure
+ * @param cls closure, a `struct PeerRequest`
  * @param record peerstore record information
  * @param emsg error message, or NULL if no errors
  */
 static void
-handle_hello (void *cls,
-              const struct GNUNET_PEERSTORE_Record *record,
-              const char *emsg)
+handle_hello_for_client (void *cls,
+                         const struct GNUNET_PEERSTORE_Record *record,
+                         const char *emsg)
 {
   struct PeerRequest *pr = cls;
   const char *val;
@@ -9077,7 +9359,7 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg)
                                    "transport",
                                    &pr->pid,
                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
-                                   &handle_hello,
+                                   &handle_hello_for_client,
                                    pr);
   GNUNET_SERVICE_client_continue (tc->client);
 }
@@ -9286,6 +9568,9 @@ do_shutdown (void *cls)
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
   validation_map = NULL;
+  while (NULL != ir_head)
+    free_incoming_request (ir_head);
+  GNUNET_assert (0 == ir_total);
   while (NULL != (lle = lle_head))
   {
     GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);