work on DV logic and FC
authorChristian Grothoff <christian@grothoff.org>
Sun, 2 Jun 2019 12:14:50 +0000 (14:14 +0200)
committerChristian Grothoff <christian@grothoff.org>
Sun, 2 Jun 2019 12:14:50 +0000 (14:14 +0200)
src/transport/gnunet-service-tng.c

index 83c057795a1226834ea0235e2434dc8b629dee91..471ded6444616405b4b7aeac9e3e799c99bfc3f3 100644 (file)
@@ -28,7 +28,6 @@
  *   communicators do not offer flow control).
  *   We do transmit FC window sizes now.  Left:
  *   for SENDING)
- *   - Increment "outbound_fc_window_size_used" on transmission
  *   - Throttle sending if "outbound_fc_window_size_used" reaches limit
  *   - Send *new* challenge when we get close to the limit (including
  *     at the beginning when the limit is zero!)
 #define DELAY_WARN_THRESHOLD \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
+/**
+ * If a DVBox could not be forwarded after this number of
+ * seconds we drop it.
+ */
+#define DV_FORWARD_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
 /**
  * We only consider queues as "quality" connections when
  * suppressing the generation of DV initiation messages if
@@ -939,9 +945,9 @@ struct TransportFlowControlMessage
    * Used to detect one-sided connection drops. On wrap-around, the
    * flow control counters will be reset as if the connection had
    * dropped.
-   */ 
+   */
   uint32_t seq GNUNET_PACKED;
-  
+
   /**
    * Flow control window size in bytes, in NBO.
    * The receiver can send this many bytes at most.
@@ -974,7 +980,6 @@ struct TransportFlowControlMessage
    * reset the counters for the number of bytes sent!
    */
   struct GNUNET_TIME_AbsoluteNBO sender_time;
-  
 };
 
 
@@ -1300,7 +1305,7 @@ struct VirtualLink
    * Distance vector used by this virtual link, NULL if @e n is used.
    */
   struct DistanceVector *dv;
-  
+
   /**
    * Last challenge we received from @a n.
    * FIXME: where do we need this?
@@ -1308,7 +1313,7 @@ struct VirtualLink
   struct ChallengeNonceP n_challenge;
 
   /**
-   * Last challenge we used with @a n for flow control. 
+   * Last challenge we used with @a n for flow control.
    * FIXME: where do we need this?
    */
   struct ChallengeNonceP my_challenge;
@@ -1373,7 +1378,7 @@ struct VirtualLink
    * Based on the difference between how much the sender sent according
    * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message
    * (@e outbound_sent field) and how much we actually received at that
-   * time (@e incoming_fc_window_size_used).  This delta is then 
+   * time (@e incoming_fc_window_size_used).  This delta is then
    * added onto the @e incoming_fc_window_size when determining the
    * @e outbound_window_size we send to the other peer.  Initially zero.
    * May be negative if we (due to out-of-order delivery) actually received
@@ -1407,7 +1412,7 @@ struct VirtualLink
    * received.
    */
   uint32_t last_fc_seq;
-  
+
   /**
    * How many more messages can we send to CORE before we exhaust
    * the receive window of CORE for this peer? If this hits zero,
@@ -2057,7 +2062,12 @@ enum PendingMessageType
   /**
    * Reliability box.
    */
-  PMT_RELIABILITY_BOX = 2
+  PMT_RELIABILITY_BOX = 2,
+
+  /**
+   * Pending message created during #forward_dv_box().
+   */
+  PMT_DV_BOX = 3
 
 };
 
@@ -2133,8 +2143,8 @@ struct PendingMessage
   struct PendingAcknowledgement *pa_tail;
 
   /**
-   * This message, reliability boxed. Only possibly available if @e pmt is
-   * #PMT_CORE.
+   * This message, reliability *or* DV-boxed. Only possibly available
+   * if @e pmt is #PMT_CORE.
    */
   struct PendingMessage *bpm;
 
@@ -2949,7 +2959,11 @@ free_pending_message (struct PendingMessage *pm)
     GNUNET_assert (pm == pm->qe->pm);
     pm->qe->pm = NULL;
   }
-  GNUNET_free_non_null (pm->bpm);
+  if (NULL != pm->bpm)
+  {
+    free_fragment_tree (pm->bpm);
+    GNUNET_free (pm->bpm);
+  }
   GNUNET_free (pm);
 }
 
@@ -4751,16 +4765,18 @@ send_dv_to_neighbour (void *cls,
 
 /**
  * We need to transmit @a hdr to @a target.  If necessary, this may
- * involve DV routing.
+ * involve DV routing.  This function routes without applying flow
+ * control or congestion control and should only be used for control
+ * traffic.
  *
  * @param target peer to receive @a hdr
  * @param hdr header of the message to route and #GNUNET_free()
  * @param options which transmission channels are allowed
  */
 static void
-route_message (const struct GNUNET_PeerIdentity *target,
-               const struct GNUNET_MessageHeader *hdr,
-               enum RouteMessageOptions options)
+route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
+                                  const struct GNUNET_MessageHeader *hdr,
+                                  enum RouteMessageOptions options)
 {
   struct VirtualLink *vl;
   struct Neighbour *n;
@@ -4878,7 +4894,7 @@ handle_communicator_backchannel (
                 isize],
           is,
           strlen (is) + 1);
-  route_message (&cb->pid, &be->header, RMO_DV_ALLOWED);
+  route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -5303,7 +5319,7 @@ transmit_cummulative_ack_cb (void *cls)
     ap[i].ack_delay = GNUNET_TIME_relative_hton (
       GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
   }
-  route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+  route_control_message_without_fc (&ac->target, &ack->header, RMO_DV_ALLOWED);
   ac->num_acks = 0;
   ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
                                            &destroy_ack_cummulator,
@@ -5742,6 +5758,12 @@ completed_pending_message (struct PendingMessage *pm)
         (pos->frag_off == pos->bytes_msg))
       client_send_response (pos);
     return;
+  case PMT_DV_BOX:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Completed transmission of message %llu (DV Box)\n",
+                pm->logging_uuid);
+    free_pending_message (pm);
+    return;
   }
 }
 
@@ -6309,7 +6331,9 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
                                              &dhp.purpose,
                                              &dhops[nhops].hop_sig));
   }
-  route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+  route_control_message_without_fc (next_hop,
+                                    &fwd->header,
+                                    RMO_UNCONFIRMED_ALLOWED);
 }
 
 
@@ -6924,17 +6948,39 @@ forward_dv_box (struct Neighbour *next_hop,
                 const void *enc_payload,
                 uint16_t enc_payload_size)
 {
-  char buf[sizeof (struct TransportDVBoxMessage) +
-           num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size];
-  struct GNUNET_PeerIdentity *dhops =
-    (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
-
+  struct VirtualLink *vl = next_hop->vl;
+  struct PendingMessage *pm;
+  size_t msg_size;
+  char *buf;
+  struct GNUNET_PeerIdentity *dhops;
+
+  GNUNET_assert (NULL != vl);
+  msg_size = sizeof (struct TransportDVBoxMessage) +
+             num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size;
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size);
+  pm->pmt = PMT_DV_BOX;
+  pm->vl = vl;
+  pm->timeout = GNUNET_TIME_relative_to_absolute (DV_FORWARD_TIMEOUT);
+  pm->logging_uuid = logging_uuid_gen++;
+  pm->prefs = GNUNET_MQ_PRIO_BACKGROUND;
+  pm->bytes_msg = msg_size;
+  buf = (char *) &pm[1];
   memcpy (buf, hdr, sizeof (*hdr));
+  dhops =
+    (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
   memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
   memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
-  route_message (&next_hop->pid,
-                 (const struct GNUNET_MessageHeader *) buf,
-                 RMO_NONE);
+  GNUNET_CONTAINER_MDLL_insert (vl,
+                                vl->pending_msg_head,
+                                vl->pending_msg_tail,
+                                pm);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Created pending message %llu for DV Box with next hop %s (%u/%u)\n",
+              pm->logging_uuid,
+              GNUNET_i2s (&next_hop->pid),
+              (unsigned int) num_hops,
+              (unsigned int) total_hops);
+  check_vl_transmission (vl);
 }
 
 
@@ -7696,9 +7742,9 @@ handle_validation_challenge (
                                                           &tvp.purpose,
                                                           &tvr.signature));
   }
-  route_message (&cmc->im.sender,
-                 &tvr.header,
-                 RMO_ANYTHING_GOES | RMO_REDUNDANT);
+  route_control_message_without_fc (&cmc->im.sender,
+                                    &tvr.header,
+                                    RMO_ANYTHING_GOES | RMO_REDUNDANT);
   finish_cmc_handling (cmc);
 
   vl = lookup_virtual_link (&cmc->im.sender);
@@ -8459,6 +8505,8 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
     int frag;
     int relb;
 
+    if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
+      continue; /* DV messages must not be DV-routed to next hop! */
     if (pos->next_attempt.abs_value_us > now.abs_value_us)
       break; /* too early for all messages, they are sorted by next_attempt */
     if (NULL != pos->qe)
@@ -8546,6 +8594,41 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
 }
 
 
+/**
+ * Function to call to further operate on the now DV encapsulated
+ * message @a hdr, forwarding it via @a next_hop under respect of
+ * @a options.
+ *
+ * @param cls a `struct PendingMessageScoreContext`
+ * @param next_hop next hop of the DV path
+ * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
+ * @param options options of the original message
+ */
+static void
+extract_box_cb (void *cls,
+                struct Neighbour *next_hop,
+                const struct GNUNET_MessageHeader *hdr,
+                enum RouteMessageOptions options)
+{
+  struct PendingMessageScoreContext *sc = cls;
+  struct PendingMessage *pm = sc->best;
+  struct PendingMessage *bpm;
+  uint16_t bsize = ntohs (hdr->size);
+
+  GNUNET_assert (NULL == pm->bpm);
+  bpm = GNUNET_malloc (sizeof (struct PendingMessage) + bsize);
+  bpm->logging_uuid = logging_uuid_gen++;
+  bpm->pmt = PMT_DV_BOX;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating DV Box %llu for original message %llu (next hop is %s)\n",
+              bpm->logging_uuid,
+              pm->logging_uuid,
+              GNUNET_i2s (&next_hop->pid));
+  memcpy (&bpm[1], hdr, bsize);
+  pm->bpm = bpm;
+}
+
+
 /**
  * We believe we are ready to transmit a `struct PendingMessage` on a
  * queue, the big question is which one!  We need to see if there is
@@ -8610,9 +8693,30 @@ transmit_on_queue (void *cls)
 
   /* Given selection in `sc`, do transmission */
   pm = sc.best;
+  if (NULL != sc.dvh)
+  {
+    GNUNET_assert (PMT_DV_BOX != pm->pmt);
+    if (NULL != sc.best->bpm)
+    {
+      /* We did this boxing before, but possibly for a different path!
+         Discard old DV box!  OPTIMIZE-ME: we might want to check if
+         it is the same and then not re-build the message... */
+      free_pending_message (sc.best->bpm);
+      sc.best->bpm = NULL;
+    }
+    encapsulate_for_dv (sc.dvh->dv,
+                        1,
+                        &sc.dvh,
+                        (const struct GNUNET_MessageHeader *) &sc.best[1],
+                        &extract_box_cb,
+                        &sc,
+                        RMO_NONE);
+    GNUNET_assert (NULL != sc.best->bpm);
+    pm = sc.best->bpm;
+  }
   if (GNUNET_YES == sc.frag)
   {
-    pm = fragment_message (queue, sc.dvh, sc.best);
+    pm = fragment_message (queue, sc.dvh, pm);
     if (NULL == pm)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -8625,7 +8729,7 @@ transmit_on_queue (void *cls)
   }
   else if (GNUNET_YES == sc.relb)
   {
-    pm = reliability_box_message (queue, sc.dvh, sc.best);
+    pm = reliability_box_message (queue, sc.dvh, pm);
     if (NULL == pm)
     {
       /* Reliability boxing failed, try next message... */
@@ -8639,8 +8743,6 @@ transmit_on_queue (void *cls)
       return;
     }
   }
-  else
-    pm = sc.best; /* no boxing required */
 
   /* Pass 'pm' for transission to the communicator */
   GNUNET_log (
@@ -8650,6 +8752,31 @@ transmit_on_queue (void *cls)
     queue->address,
     GNUNET_i2s (&n->pid),
     sc.consideration_counter);
+
+  /* Flow control: increment amount of traffic sent; if we are routing
+     via DV (and thus the ultimate target of the pending message is for
+     a different virtual link than the one of the queue), then we need
+     to use up not only the window of the direct link but also the
+     flow control window for the DV link! */
+  pm->vl->outbound_fc_window_size_used += pm->bytes_msg;
+
+  if (pm->vl != queue->neighbour->vl)
+  {
+    /* If the virtual link of the queue differs, this better be distance
+       vector routing! */
+    GNUNET_assert (NULL != sc.dvh);
+    /* If we do distance vector routing, we better not do this for a
+       message that was itself DV-routed */
+    GNUNET_assert (PMT_DV_BOX != sc.best->pmt);
+    /* We use the size of the unboxed message here, to avoid counting
+       the DV-Box header which is eaten up on the way by intermediaries */
+    queue->neighbour->vl->outbound_fc_window_size_used += sc.best->bytes_msg;
+  }
+  else
+  {
+    GNUNET_assert (NULL == sc.dvh);
+  }
+
   queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
 
   /* Check if this transmission somehow conclusively finished handing 'pm'