REST/NAMESTORE: rework API
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
index c2922dd7e9bde3a7498075f8cd6fd925b9095213..a8f70986b3277d46f4be100c5ea10256d42487f1 100644 (file)
  *
  * TODO:
  * Implement next:
- * - change transport-core API to provide proper flow control in both
- *   directions, allow multiple messages per peer simultaneously (tag
- *   confirmations with unique message ID), and replace quota-out with
- *   proper flow control; specify transmission preferences (latency,
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
  *   reliability, etc.) per message!
- * - add logging
- *
- * Later:
  * - review retransmission logic, right now there is no smartness there!
- *   => congestion control, flow control, etc
+ *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
  *
  * Optimizations:
  * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
- *    => Need 128 bit hash map though!
+ *   => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
  * - queue_send_msg and route_message both by API design have to make copies
  *   of the payload, and route_message on top of that requires a malloc/free.
- *   Change design to approximate "zero" copy better...
+ *   Change design to approximate "zero" copy better... [CPU]
  * - could avoid copying body of message into each fragment and keep
  *   fragments as just pointers into the original message and only
  *   fully build fragments just before transmission (optimization, should
- *   reduce CPU and memory use)
+ *   reduce CPU and memory use) [CPU, MEMORY]
  * - if messages are below MTU, consider adding ACKs and other stuff
- *   (requires planning at receiver, and additional MST-style demultiplex
- *    at receiver!)
+ *   to the same transmission to avoid tiny messages (requires planning at
+ *   receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
  * - When we passively learned DV (with unconfirmed freshness), we
  *   right now add the path to our list but with a zero path_valid_until
  *   time and only use it for unconfirmed routes.  However, we could consider
  *   triggering an explicit validation mechansim ourselves, specifically routing
- *   a challenge-response message over the path (OPTIMIZATION-FIXME).
+ *   a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
  */
 #define MAX_DV_DISCOVERY_SELECTION 16
 
+/**
+ * Window size. How many messages to the same target do we pass
+ * to CORE without a RECV_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define RECV_WINDOW_SIZE 4
+
 /**
  * Minimum number of hops we should forward DV learn messages
  * even if they are NOT useful for us in hope of looping
@@ -1101,6 +1107,100 @@ struct PendingMessage;
 struct DistanceVectorHop;
 
 
+/**
+ * Context from #handle_incoming_msg().  Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+
+  /**
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
+   */
+  struct CommunicatorMessageContext *next;
+
+  /**
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
+   */
+  struct CommunicatorMessageContext *prev;
+
+  /**
+   * Which communicator provided us with the message.
+   */
+  struct TransportClient *tc;
+
+  /**
+   * Additional information for flow control and about the sender.
+   */
+  struct GNUNET_TRANSPORT_IncomingMessage im;
+
+  /**
+   * Number of hops the message has travelled (if DV-routed).
+   * FIXME: make use of this in ACK handling!
+   */
+  uint16_t total_hops;
+};
+
+
+/**
+ * A virtual link is another reachable peer that is known to CORE.  It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`.  With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink
+{
+  /**
+   * Identity of the peer at the other end of the link.
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
+   */
+  struct CommunicatorMessageContext *cmc_head;
+
+  /**
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
+   */
+  struct CommunicatorMessageContext *cmc_tail;
+
+  /**
+   * Task scheduled to possibly notfiy core that this peer is no
+   * longer counting as confirmed.  Runs the #core_visibility_check(),
+   * which checks that some DV-path or a queue exists that is still
+   * considered confirmed.
+   */
+  struct GNUNET_SCHEDULER_Task *visibility_task;
+
+  /**
+   * Neighbour used by this virtual link, NULL if @e dv is used.
+   */
+  struct Neighbour *n;
+
+  /**
+   * Distance vector used by this virtual link, NULL if @e n is used.
+   */
+  struct DistanceVector *dv;
+
+  /**
+   * How many more messages can we send to core before we exhaust
+   * the receive window of CORE for this peer? If this hits zero,
+   * we must tell communicators to stop providing us more messages
+   * for this peer.  In fact, the window can go negative as we
+   * have multiple communicators, so per communicator we can go
+   * down by one into the negative range.
+   */
+  int core_recv_window;
+};
+
+
 /**
  * Data structure kept when we are waiting for an acknowledgement.
  */
@@ -1316,31 +1416,10 @@ struct DistanceVector
   struct GNUNET_SCHEDULER_Task *timeout_task;
 
   /**
-   * Task scheduled to possibly notfiy core that this queue is no longer
-   * counting as confirmed.  Runs the #core_queue_visibility_check().
-   */
-  struct GNUNET_SCHEDULER_Task *visibility_task;
-
-  /**
-   * Quota at which CORE is allowed to transmit to this peer
-   * (note that the value CORE should actually be told is this
-   *  value plus the respective value in `struct Neighbour`).
-   * Should match the sum of the quotas of all of the paths.
-   *
-   * FIXME: not yet set, tricky to get right given multiple paths,
-   *        many of which may be inactive! (=> Idea: measure???)
-   * FIXME: how do we set this value initially when we tell CORE?
-   *    Options: start at a minimum value or at literally zero?
-   *         (=> Current thought: clean would be zero!)
-   */
-  struct GNUNET_BANDWIDTH_Value32NBO quota_out;
-
-  /**
-   * Is one of the DV paths in this struct 'confirmed' and thus
-   * the cause for CORE to see this peer as connected? (Note that
-   * the same may apply to a `struct Neighbour` at the same time.)
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  int core_visible;
+  struct VirtualLink *link;
 };
 
 
@@ -1450,12 +1529,6 @@ struct Queue
    */
   struct GNUNET_SCHEDULER_Task *transmit_task;
 
-  /**
-   * Task scheduled to possibly notfiy core that this queue is no longer
-   * counting as confirmed.  Runs the #core_queue_visibility_check().
-   */
-  struct GNUNET_SCHEDULER_Task *visibility_task;
-
   /**
    * How long do *we* consider this @e address to be valid?  In the past or
    * zero if we have not yet validated it.  Can be updated based on
@@ -1642,11 +1715,6 @@ struct Neighbour
    */
   struct Queue *queue_tail;
 
-  /**
-   * Task run to cleanup pending messages that have exceeded their timeout.
-   */
-  struct GNUNET_SCHEDULER_Task *timeout_task;
-
   /**
    * Handle for an operation to fetch @e last_dv_learn_monotime information from
    * the PEERSTORE, or NULL.
@@ -1660,18 +1728,10 @@ struct Neighbour
   struct GNUNET_PEERSTORE_StoreContext *sc;
 
   /**
-   * Quota at which CORE is allowed to transmit to this peer
-   * (note that the value CORE should actually be told is this
-   *  value plus the respective value in `struct DistanceVector`).
-   * Should match the sum of the quotas of all of the queues.
-   *
-   * FIXME: not yet set, tricky to get right given multiple queues!
-   *        (=> Idea: measure???)
-   * FIXME: how do we set this value initially when we tell CORE?
-   *    Options: start at a minimum value or at literally zero?
-   *         (=> Current thought: clean would be zero!)
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+  struct VirtualLink *link;
 
   /**
    * Latest DVLearn monotonic time seen from this peer.  Initialized only
@@ -1679,17 +1739,6 @@ struct Neighbour
    */
   struct GNUNET_TIME_Absolute last_dv_learn_monotime;
 
-  /**
-   * What is the earliest timeout of any message in @e pending_msg_tail?
-   */
-  struct GNUNET_TIME_Absolute earliest_timeout;
-
-  /**
-   * Do we have a confirmed working queue and are thus visible to
-   * CORE?
-   */
-  int core_visible;
-
   /**
    * Do we have the lastest value for @e last_dv_learn_monotime from
    * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
@@ -2416,6 +2465,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
 
+/**
+ * Map from PIDs to `struct VirtualLink` entries describing
+ * links CORE knows to exist.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *links;
+
 /**
  * Map from challenges to `struct LearnLaunchEntry` values.
  */
@@ -2563,6 +2618,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece)
 }
 
 
+/**
+ * Free virtual link.
+ *
+ * @param vl link data to free
+ */
+static void
+free_virtual_link (struct VirtualLink *vl)
+{
+  GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+  if (NULL != vl->visibility_task)
+  {
+    GNUNET_SCHEDULER_cancel (vl->visibility_task);
+    vl->visibility_task = NULL;
+  }
+  GNUNET_break (NULL == vl->n);
+  GNUNET_break (NULL == vl->dv);
+  GNUNET_free (vl);
+}
+
+
 /**
  * Free validation state.
  *
@@ -2684,8 +2759,6 @@ free_dv_route (struct DistanceVector *dv)
     GNUNET_assert (
       GNUNET_YES ==
       GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
-    if (NULL != dv->visibility_task)
-      GNUNET_SCHEDULER_cancel (dv->visibility_task);
     if (NULL != dv->timeout_task)
       GNUNET_SCHEDULER_cancel (dv->timeout_task);
     GNUNET_free (dv);
@@ -2873,8 +2946,6 @@ free_neighbour (struct Neighbour *neighbour)
                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
                                                        &neighbour->pid,
                                                        neighbour));
-  if (NULL != neighbour->timeout_task)
-    GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
   if (NULL != neighbour->reassembly_map)
   {
     GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
@@ -2917,19 +2988,16 @@ free_neighbour (struct Neighbour *neighbour)
  *
  * @param tc client to inform (must be CORE client)
  * @param pid peer the connection is for
- * @param quota_out current quota for the peer
  */
 static void
 core_send_connect_info (struct TransportClient *tc,
-                        const struct GNUNET_PeerIdentity *pid,
-                        struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+                        const struct GNUNET_PeerIdentity *pid)
 {
   struct GNUNET_MQ_Envelope *env;
   struct ConnectInfoMessage *cim;
 
   GNUNET_assert (CT_CORE == tc->type);
   env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-  cim->quota_out = quota_out;
   cim->id = *pid;
   GNUNET_MQ_send (tc->mq, env);
 }
@@ -2939,11 +3007,9 @@ core_send_connect_info (struct TransportClient *tc,
  * Send message to CORE clients that we gained a connection
  *
  * @param pid peer the queue was for
- * @param quota_out current quota for the peer
  */
 static void
-cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
-                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Informing CORE clients about connection to %s\n",
@@ -2952,7 +3018,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
   {
     if (CT_CORE != tc->type)
       continue;
-    core_send_connect_info (tc, pid, quota_out);
+    core_send_connect_info (tc, pid);
   }
 }
 
@@ -3059,13 +3125,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
 
 
 /**
- * Check whether the CORE visibility of @a n changed. If so,
- * check whether we need to notify CORE.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
  *
- * @param n neighbour to perform the check for
+ * @param cls a `struct VirtualLink`
  */
 static void
-update_neighbour_core_visibility (struct Neighbour *n);
+check_link_down (void *cls)
+{
+  struct VirtualLink *vl = cls;
+  struct DistanceVector *dv = vl->dv;
+  struct Neighbour *n = vl->n;
+  struct GNUNET_TIME_Absolute dvh_timeout;
+  struct GNUNET_TIME_Absolute q_timeout;
+
+  vl->visibility_task = NULL;
+  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+       pos = pos->next_dv)
+    dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
+  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+    vl->dv = NULL;
+  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+    q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+    vl->n = NULL;
+  if ((NULL == vl->n) && (NULL == vl->dv))
+  {
+    cores_send_disconnect_info (&dv->target);
+    free_virtual_link (vl);
+    return;
+  }
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
+                             &check_link_down,
+                             vl);
+}
 
 
 /**
@@ -3083,17 +3179,13 @@ free_queue (struct Queue *queue)
   struct QueueEntry *qe;
   int maxxed;
   struct PendingAcknowledgement *pa;
+  struct VirtualLink *vl;
 
   if (NULL != queue->transmit_task)
   {
     GNUNET_SCHEDULER_cancel (queue->transmit_task);
     queue->transmit_task = NULL;
   }
-  if (NULL != queue->visibility_task)
-  {
-    GNUNET_SCHEDULER_cancel (queue->visibility_task);
-    queue->visibility_task = NULL;
-  }
   while (NULL != (pa = queue->pa_head))
   {
     GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
@@ -3139,9 +3231,12 @@ free_queue (struct Queue *queue)
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   GNUNET_free (queue);
 
-  update_neighbour_core_visibility (neighbour);
-  cores_send_disconnect_info (&neighbour->pid);
-
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+  if ((NULL != vl) && (neighbour == vl->n))
+  {
+    GNUNET_SCHEDULER_cancel (vl->visibility_task);
+    check_link_down (vl);
+  }
   if (NULL == neighbour->queue_head)
   {
     free_neighbour (neighbour);
@@ -3281,12 +3376,12 @@ notify_client_connect_info (void *cls,
                             void *value)
 {
   struct TransportClient *tc = cls;
-  struct Neighbour *neighbour = value;
 
+  (void) value;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Telling new CORE client about existing connection to %s\n",
               GNUNET_i2s (pid));
-  core_send_connect_info (tc, pid, neighbour->quota_out);
+  core_send_connect_info (tc, pid);
   return GNUNET_OK;
 }
 
@@ -3445,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm)
 
 
 /**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request.  Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
  *
  * @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- *          for transmission failure
- * @param bytes_physical amount of bandwidth consumed
  */
 static void
-client_send_response (struct PendingMessage *pm,
-                      int success,
-                      uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
@@ -3469,15 +3557,9 @@ client_send_response (struct PendingMessage *pm,
   if (NULL != tc)
   {
     env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->success = htonl ((uint32_t) success);
-    som->bytes_msg = htons (pm->bytes_msg);
-    som->bytes_physical = htonl (bytes_physical);
     som->peer = target->pid;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Confirming %s transmission of %u/%u bytes to %s\n",
-                (GNUNET_OK == success) ? "successful" : "failed",
-                (unsigned int) pm->bytes_msg,
-                (unsigned int) bytes_physical,
+                "Confirming transmission to %s\n",
                 GNUNET_i2s (&pm->target->pid));
     GNUNET_MQ_send (tc->mq, env);
   }
@@ -3485,45 +3567,6 @@ client_send_response (struct PendingMessage *pm,
 }
 
 
-/**
- * Checks the message queue for a neighbour for messages that have timed
- * out and purges them.
- *
- * @param cls a `struct Neighbour`
- */
-static void
-check_queue_timeouts (void *cls)
-{
-  struct Neighbour *n = cls;
-  struct PendingMessage *pm;
-  struct GNUNET_TIME_Absolute now;
-  struct GNUNET_TIME_Absolute earliest_timeout;
-
-  n->timeout_task = NULL;
-  earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
-  now = GNUNET_TIME_absolute_get ();
-  for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm)
-  {
-    pm = pos->next_neighbour;
-    if (pos->timeout.abs_value_us <= now.abs_value_us)
-    {
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# messages dropped (timeout before confirmation)",
-                                1,
-                                GNUNET_NO);
-      client_send_response (pm, GNUNET_NO, 0);
-      continue;
-    }
-    earliest_timeout =
-      GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
-  }
-  n->earliest_timeout = earliest_timeout;
-  if (NULL != n->pending_msg_head)
-    n->timeout_task =
-      GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
-}
-
-
 /**
  * Create a DV Box message.
  *
@@ -3689,30 +3732,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
   const void *payload;
   size_t payload_size;
   struct TransportDVBoxMessage *dvb;
+  struct VirtualLink *vl;
 
   GNUNET_assert (CT_CORE == tc->type);
   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
   bytes_msg = ntohs (obmm->size);
-  target = lookup_neighbour (&obm->peer);
-  if (NULL == target)
-    dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
-  else
-    dv = NULL;
-  if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible)))
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+  if (NULL == vl)
   {
     /* Failure: don't have this peer as a neighbour (anymore).
        Might have gone down asynchronously, so this is NOT
        a protocol violation by CORE. Still count the event,
        as this should be rare. */
-    struct GNUNET_MQ_Envelope *env;
-    struct SendOkMessage *som;
-
-    env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-    som->success = htonl (GNUNET_SYSERR);
-    som->bytes_msg = htonl (bytes_msg);
-    som->bytes_physical = htonl (0);
-    som->peer = obm->peer;
-    GNUNET_MQ_send (tc->mq, env);
     GNUNET_SERVICE_client_continue (tc->client);
     GNUNET_STATISTICS_update (GST_stats,
                               "# messages dropped (neighbour unknown)",
@@ -3720,6 +3751,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                               GNUNET_NO);
     return;
   }
+  target = lookup_neighbour (&obm->peer);
+  if (NULL == target)
+    dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
+  else
+    dv = NULL;
+  GNUNET_assert ((NULL != target) || (NULL != dv));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending %u bytes to %s using %s\n",
               bytes_msg,
@@ -3756,8 +3793,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
   pm->client = tc;
   pm->target = target;
   pm->bytes_msg = payload_size;
-  pm->timeout =
-    GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
   memcpy (&pm[1], payload, payload_size);
   GNUNET_free_non_null (dvb);
   dvb = NULL;
@@ -3777,15 +3812,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
                                 tc->details.core.pending_msg_head,
                                 tc->details.core.pending_msg_tail,
                                 pm);
-  if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
-  {
-    target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
-    if (NULL != target->timeout_task)
-      GNUNET_SCHEDULER_cancel (target->timeout_task);
-    target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
-                                                    &check_queue_timeouts,
-                                                    target);
-  }
   if (! was_empty)
     return; /* all queues must already be busy */
   for (struct Queue *queue = target->queue_head; NULL != queue;
@@ -3833,6 +3859,77 @@ check_communicator_available (
 }
 
 
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+  if (0 != ntohl (cmc->im.fc_on))
+  {
+    /* send ACK when done to communicator for flow control! */
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+    ack->reserved = htonl (0);
+    ack->fc_id = cmc->im.fc_id;
+    ack->sender = cmc->im.sender;
+    GNUNET_MQ_send (cmc->tc->mq, env);
+  }
+  GNUNET_SERVICE_client_continue (cmc->tc->client);
+  GNUNET_free (cmc);
+}
+
+
+/**
+ * Client confirms that it is done handling message(s) to a particular
+ * peer. We may now provide more messages to CORE for this peer.
+ *
+ * Notifies the respective queues that more messages can now be received.
+ *
+ * @param cls the client
+ * @param rom the message that was sent
+ */
+static void
+handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
+{
+  struct TransportClient *tc = cls;
+  struct VirtualLink *vl;
+  uint32_t delta;
+  struct CommunicatorMessageContext *cmc;
+
+  if (CT_CORE != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+  if (NULL == vl)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# RECV_OK dropped: virtual link unknown",
+                              1,
+                              GNUNET_NO);
+    GNUNET_SERVICE_client_continue (tc->client);
+    return;
+  }
+  delta = ntohl (rom->increase_window_delta);
+  vl->core_recv_window += delta;
+  if (vl->core_recv_window <= 0)
+    return;
+  /* resume communicators */
+  while (NULL != (cmc = vl->cmc_tail))
+  {
+    GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+    finish_cmc_handling (cmc);
+  }
+}
+
+
 /**
  * Communicator started.  Process the request.
  *
@@ -4090,20 +4187,18 @@ route_via_neighbour (const struct Neighbour *n,
   for (struct Queue *pos = n->queue_head; NULL != pos;
        pos = pos->next_neighbour)
   {
-    /* Count the queue with the visibility task in all cases, as
-       otherwise we may end up with no queues just because the
-       time for the visibility task just expired but the scheduler
-       just ran this task first */
     if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
-        (pos->validated_until.abs_value_us > now.abs_value_us) ||
-        (NULL != pos->visibility_task))
+        (pos->validated_until.abs_value_us > now.abs_value_us))
       candidates++;
   }
   if (0 == candidates)
   {
-    /* Given that we above check for pos->visibility task,
-       this should be strictly impossible. */
-    GNUNET_break (0);
+    /* This can happen rarely if the last confirmed queue timed
+       out just as we were beginning to process this message. */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# route selection failed (all no valid queue)",
+                              1,
+                              GNUNET_NO);
     return;
   }
   sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
@@ -4115,12 +4210,8 @@ route_via_neighbour (const struct Neighbour *n,
   for (struct Queue *pos = n->queue_head; NULL != pos;
        pos = pos->next_neighbour)
   {
-    /* Count the queue with the visibility task in all cases, as
-       otherwise we may end up with no queues just because the
-       time for the visibility task just expired but the scheduler
-       just ran this task first */
-    if ((pos->validated_until.abs_value_us > now.abs_value_us) ||
-        (NULL != pos->visibility_task))
+    if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+        (pos->validated_until.abs_value_us > now.abs_value_us))
     {
       if ((sel1 == candidates) || (sel2 == candidates))
         queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
@@ -4197,21 +4288,21 @@ route_message (const struct GNUNET_PeerIdentity *target,
                struct GNUNET_MessageHeader *hdr,
                enum RouteMessageOptions options)
 {
+  struct VirtualLink *vl;
   struct Neighbour *n;
   struct DistanceVector *dv;
 
-  n = lookup_neighbour (target);
-  dv = (0 != (options & RMO_DV_ALLOWED))
-         ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target)
-         : NULL;
+  vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+  n = vl->n;
+  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
   if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
   {
     /* if confirmed is required, and we do not have anything
        confirmed, drop respective options */
-    if ((NULL != n) && (GNUNET_NO == n->core_visible))
-      n = NULL;
-    if ((NULL != dv) && (GNUNET_NO == dv->core_visible))
-      dv = NULL;
+    if (NULL == n)
+      n = lookup_neighbour (target);
+    if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+      dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
   }
   if ((NULL == n) && (NULL == dv))
   {
@@ -4655,30 +4746,6 @@ handle_del_address (void *cls,
 }
 
 
-/**
- * Context from #handle_incoming_msg().  Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
-  /**
-   * Which communicator provided us with the message.
-   */
-  struct TransportClient *tc;
-
-  /**
-   * Additional information for flow control and about the sender.
-   */
-  struct GNUNET_TRANSPORT_IncomingMessage im;
-
-  /**
-   * Number of hops the message has travelled (if DV-routed).
-   * FIXME: make use of this in ACK handling!
-   */
-  uint16_t total_hops;
-};
-
-
 /**
  * Given an inbound message @a msg from a communicator @a cmc,
  * demultiplex it based on the type calling the right handler.
@@ -4691,31 +4758,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg);
 
 
-/**
- * Send ACK to communicator (if requested) and free @a cmc.
- *
- * @param cmc context for which we are done handling the message
- */
-static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
-{
-  if (0 != ntohl (cmc->im.fc_on))
-  {
-    /* send ACK when done to communicator for flow control! */
-    struct GNUNET_MQ_Envelope *env;
-    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
-
-    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
-    ack->reserved = htonl (0);
-    ack->fc_id = cmc->im.fc_id;
-    ack->sender = cmc->im.sender;
-    GNUNET_MQ_send (cmc->tc->mq, env);
-  }
-  GNUNET_SERVICE_client_continue (cmc->tc->client);
-  GNUNET_free (cmc);
-}
-
-
 /**
  * Communicator gave us an unencapsulated message to pass as-is to
  * CORE.  Process the request.
@@ -4728,6 +4770,7 @@ static void
 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
 {
   struct CommunicatorMessageContext *cmc = cls;
+  struct VirtualLink *vl;
   uint16_t size = ntohs (mh->size);
 
   if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
@@ -4740,6 +4783,25 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     GNUNET_SERVICE_client_drop (client);
     return;
   }
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+  if (NULL == vl)
+  {
+    /* FIXME: sender is giving us messages for CORE but we don't have
+       the link up yet! I *suspect* this can happen right now (i.e.
+       sender has verified us, but we didn't verify sender), but if
+       we pass this on, CORE would be confused (link down, messages
+       arrive).  We should investigate more if this happens often,
+       or in a persistent manner, and possibly do "something" about
+       it. Thus logging as error for now. */
+    GNUNET_break_op (0);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (virtual link still down)",
+                              1,
+                              GNUNET_NO);
+
+    finish_cmc_handling (cmc);
+    return;
+  }
   /* Forward to all CORE clients */
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
@@ -4753,11 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
   }
-  /* FIXME: consider doing this _only_ once the message
-     was drained from the CORE MQs to extend flow control to CORE!
-     (basically, increment counter in cmc, decrement on MQ send continuation!
-   */
-  finish_cmc_handling (cmc);
+  vl->core_recv_window--;
+  if (vl->core_recv_window > 0)
+  {
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* Wait with calling #finish_cmc_handling(cmc) until the message
+     was processed by CORE MQs (for CORE flow control)! */
+  GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
 }
 
 
@@ -5317,7 +5383,8 @@ handle_reliability_ack (void *cls,
   }
 
   ack_counter = htonl (ra->ack_counter);
-  // FIXME: track ACK losses based on ack_counter somewhere!
+  (void) ack_counter; /* silence compiler warning for now */
+  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
   // (DV and/or Neighbour?)
   finish_cmc_handling (cmc);
 }
@@ -5758,40 +5825,6 @@ path_cleanup_cb (void *cls)
     GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
 }
 
-/**
- * Task run to check whether the hops of the @a cls still
- * are validated, or if we need to core about disconnection.
- *
- * @param cls a `struct DistanceVector` (with core_visible set!)
- */
-static void
-check_dv_path_down (void *cls)
-{
-  struct DistanceVector *dv = cls;
-  struct Neighbour *n;
-
-  dv->visibility_task = NULL;
-  GNUNET_assert (GNUNET_YES == dv->core_visible);
-  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-       pos = pos->next_dv)
-  {
-    if (0 <
-        GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us)
-    {
-      dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until,
-                                                     &check_dv_path_down,
-                                                     dv);
-      return;
-    }
-  }
-  /* all paths invalid, make dv core-invisible */
-  dv->core_visible = GNUNET_NO;
-  n = lookup_neighbour (&dv->target);
-  if ((NULL != n) && (GNUNET_YES == n->core_visible))
-    return; /* no need to tell core, connection still up! */
-  cores_send_disconnect_info (&dv->target);
-}
-
 
 /**
  * The @a hop is a validated path to the respective target
@@ -5804,22 +5837,30 @@ static void
 activate_core_visible_dv_path (struct DistanceVectorHop *hop)
 {
   struct DistanceVector *dv = hop->dv;
-  struct Neighbour *n;
+  struct VirtualLink *vl;
 
-  GNUNET_assert (GNUNET_NO == dv->core_visible);
-  GNUNET_assert (NULL == dv->visibility_task);
-
-  dv->core_visible = GNUNET_YES;
-  dv->visibility_task =
-    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv);
-  n = lookup_neighbour (&dv->target);
-  if ((NULL != n) && (GNUNET_YES == n->core_visible))
-    return; /* no need to tell core, connection already up! */
-  cores_send_connect_info (&dv->target,
-                           (NULL != n)
-                             ? GNUNET_BANDWIDTH_value_sum (n->quota_out,
-                                                           dv->quota_out)
-                             : dv->quota_out);
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+  if (NULL != vl)
+  {
+    /* Link was already up, remember dv is also now available and we are done */
+    vl->dv = dv;
+    return;
+  }
+  vl = GNUNET_new (struct VirtualLink);
+  vl->target = dv->target;
+  vl->dv = dv;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
+     before, so tell CORE about it (finally!) */
+  cores_send_connect_info (&dv->target);
 }
 
 
@@ -5934,9 +5975,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
           GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
         GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
         GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
-        if ((GNUNET_NO == dv->core_visible) &&
-            (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until)
-                   .rel_value_us))
+        if (0 <
+            GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
           activate_core_visible_dv_path (pos);
         if (last_timeout.rel_value_us <
             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
@@ -5976,8 +6016,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
                                 next_hop->dv_head,
                                 next_hop->dv_tail,
                                 hop);
-  if ((GNUNET_NO == dv->core_visible) &&
-      (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us))
+  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
     activate_core_visible_dv_path (hop);
   return GNUNET_YES;
 }
@@ -6942,75 +6981,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
 }
 
 
-/**
- * Task run periodically to check whether the validity of the given queue has
- * run its course. If so, finds either another queue to take over, or clears
- * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
- * chance to take over, and if that fails, notifies CORE about the disconnect.
- *
- * @param cls a `struct Queue`
- */
-static void
-core_queue_visibility_check (void *cls)
-{
-  struct Queue *q = cls;
-
-  q->visibility_task = NULL;
-  if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
-  {
-    q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
-                                                  &core_queue_visibility_check,
-                                                  q);
-    return;
-  }
-  update_neighbour_core_visibility (q->neighbour);
-}
-
-
-/**
- * Check whether the CORE visibility of @a n should change.  Finds either a
- * queue to preserve the visibility, or clears the neighbour's `core_visible`
- * flag. In the latter case, gives DV routes a chance to take over, and if
- * that fails, notifies CORE about the disconnect.  If so, check whether we
- * need to notify CORE.
- *
- * @param n neighbour to perform the check for
- */
-static void
-update_neighbour_core_visibility (struct Neighbour *n)
-{
-  struct DistanceVector *dv;
-
-  GNUNET_assert (GNUNET_YES == n->core_visible);
-  /* Check if _any_ queue of this neighbour is still valid, if so, schedule
-     the #core_queue_visibility_check() task for that queue */
-  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
-  {
-    if (0 !=
-        GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
-    {
-      /* found a valid queue, use this one */
-      q->visibility_task =
-        GNUNET_SCHEDULER_add_at (q->validated_until,
-                                 &core_queue_visibility_check,
-                                 q);
-      return;
-    }
-  }
-  n->core_visible = GNUNET_NO;
-
-  /* Check if _any_ DV route to this neighbour is currently
-     valid, if so, do NOT tell core about the loss of direct
-     connectivity (DV still counts!) */
-  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
-  if (GNUNET_YES == dv->core_visible)
-    return;
-  /* Nothing works anymore, need to tell CORE about the loss of
-     connectivity! */
-  cores_send_disconnect_info (&n->pid);
-}
-
-
 /**
  * Communicator gave us a transport address validation response.  Process the
  * request.
@@ -7030,8 +7000,8 @@ handle_validation_response (
                                             .vs = NULL};
   struct GNUNET_TIME_Absolute origin_time;
   struct Queue *q;
-  struct DistanceVector *dv;
   struct Neighbour *n;
+  struct VirtualLink *vl;
 
   /* check this is one of our challenges */
   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -7129,24 +7099,28 @@ handle_validation_response (
   q->validated_until = vs->validated_until;
   q->pd.aged_rtt = vs->validation_rtt;
   n = q->neighbour;
-  if (GNUNET_NO != n->core_visible)
-    return; /* nothing changed, we are done here */
-  n->core_visible = GNUNET_YES;
-  q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
-                                                &core_queue_visibility_check,
-                                                q);
-  /* Check if _any_ DV route to this neighbour is
-     currently valid, if so, do NOT tell core anything! */
-  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
-  if ((NULL != dv) && (GNUNET_YES == dv->core_visible))
-    return; /* nothing changed, done */
-  /* We lacked a confirmed connection to the neighbour
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+  if (NULL != vl)
+  {
+    /* Link was already up, remember n is also now available and we are done */
+    vl->n = n;
+    return;
+  }
+  vl = GNUNET_new (struct VirtualLink);
+  vl->target = n->pid;
+  vl->n = n;
+  vl->core_recv_window = RECV_WINDOW_SIZE;
+  vl->visibility_task =
+    GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (
+                  links,
+                  &vl->target,
+                  vl,
+                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  /* We lacked a confirmed connection to the target
      before, so tell CORE about it (finally!) */
-  cores_send_connect_info (&n->pid,
-                           (NULL != dv)
-                             ? GNUNET_BANDWIDTH_value_sum (dv->quota_out,
-                                                           n->quota_out)
-                             : n->quota_out);
+  cores_send_connect_info (&n->pid);
 }
 
 
@@ -7445,7 +7419,7 @@ reliability_box_message (struct Queue *queue,
   {
     /* failed hard */
     GNUNET_break (0);
-    client_send_response (pm, GNUNET_NO, 0);
+    client_send_response (pm);
     return NULL;
   }
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
@@ -7596,7 +7570,7 @@ transmit_on_queue (void *cls)
       (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
   {
     /* Full message sent, and over reliabile channel */
-    client_send_response (pm, GNUNET_YES, pm->bytes_msg);
+    client_send_response (pm);
   }
   else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
             queue->tc->details.communicator.cc) &&
@@ -7621,10 +7595,7 @@ transmit_on_queue (void *cls)
 
     /* Was this the last applicable fragmment? */
     if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (
-        pm,
-        GNUNET_YES,
-        pm->bytes_msg /* FIXME: calculate and add overheads! */);
+      client_send_response (pm);
   }
   else if (PMT_CORE != pm->pmt)
   {
@@ -8256,7 +8227,6 @@ handle_add_queue_message (void *cls,
   if (NULL == neighbour)
   {
     neighbour = GNUNET_new (struct Neighbour);
-    neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
     neighbour->pid = aqm->receiver;
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multipeermap_put (
@@ -8872,8 +8842,12 @@ do_shutdown (void *cls)
                                           NULL);
   GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
   pending_acks = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
   neighbours = NULL;
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
+  GNUNET_CONTAINER_multipeermap_destroy (links);
+  links = NULL;
   GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
                                          &free_backtalker_cb,
                                          NULL);
@@ -8926,6 +8900,7 @@ run (void *cls,
   pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
   ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
+  links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
   ephemeral_heap =
@@ -8995,6 +8970,10 @@ GNUNET_SERVICE_MAIN (
                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
                          struct OutboundMessage,
                          NULL),
+  GNUNET_MQ_hd_fixed_size (client_recv_ok,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
+                           struct RecvOkMessage,
+                           NULL),
   /* communication with communicators */
   GNUNET_MQ_hd_var_size (communicator_available,
                          GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,