complete CORE flow control loop
authorChristian Grothoff <christian@grothoff.org>
Tue, 30 Apr 2019 08:58:56 +0000 (10:58 +0200)
committerChristian Grothoff <christian@grothoff.org>
Tue, 30 Apr 2019 08:59:05 +0000 (10:59 +0200)
src/transport/gnunet-service-tng.c

index 825d45522ce5dd0de92dc6f0579b44e7582a6450..a8f70986b3277d46f4be100c5ea10256d42487f1 100644 (file)
  *
  * TODO:
  * Implement next:
- * - complete flow control push back from CORE via TRANSPORT to communicators:
- *   + resume communicators in handle_client_recv_ok (see FIXME)
- *   + count transmissions to CORE and suspend communicators if window is full
- * - check flow control push back from TRANSPROT to CORE:
- *   + check when to send ACKs
- * - change transport-core API to provide proper flow control in both
- *   directions, allow multiple messages per peer simultaneously (tag
- *   confirmations with unique message ID), and replace quota-out with
- *   proper flow control; specify transmission preferences (latency,
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
  *   reliability, etc.) per message!
- * - add logging
- *
- * Later:
  * - review retransmission logic, right now there is no smartness there!
- *   => congestion control, flow control, etc
+ *   => congestion control, flow control, etc [PERFORMANCE-BASICS]
  *
  * Optimizations:
  * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
- *    => Need 128 bit hash map though!
+ *   => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
  * - queue_send_msg and route_message both by API design have to make copies
  *   of the payload, and route_message on top of that requires a malloc/free.
- *   Change design to approximate "zero" copy better...
+ *   Change design to approximate "zero" copy better... [CPU]
  * - could avoid copying body of message into each fragment and keep
  *   fragments as just pointers into the original message and only
  *   fully build fragments just before transmission (optimization, should
- *   reduce CPU and memory use)
+ *   reduce CPU and memory use) [CPU, MEMORY]
  * - if messages are below MTU, consider adding ACKs and other stuff
- *   (requires planning at receiver, and additional MST-style demultiplex
- *    at receiver!)
+ *   to the same transmission to avoid tiny messages (requires planning at
+ *   receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
  * - When we passively learned DV (with unconfirmed freshness), we
  *   right now add the path to our list but with a zero path_valid_until
  *   time and only use it for unconfirmed routes.  However, we could consider
  *   triggering an explicit validation mechansim ourselves, specifically routing
- *   a challenge-response message over the path (OPTIMIZATION-FIXME).
+ *   a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
@@ -1115,6 +1106,44 @@ struct PendingMessage;
  */
 struct DistanceVectorHop;
 
+
+/**
+ * Context from #handle_incoming_msg().  Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+
+  /**
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
+   */
+  struct CommunicatorMessageContext *next;
+
+  /**
+   * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+   * flow control to unchoke.
+   */
+  struct CommunicatorMessageContext *prev;
+
+  /**
+   * Which communicator provided us with the message.
+   */
+  struct TransportClient *tc;
+
+  /**
+   * Additional information for flow control and about the sender.
+   */
+  struct GNUNET_TRANSPORT_IncomingMessage im;
+
+  /**
+   * Number of hops the message has travelled (if DV-routed).
+   * FIXME: make use of this in ACK handling!
+   */
+  uint16_t total_hops;
+};
+
+
 /**
  * A virtual link is another reachable peer that is known to CORE.  It
  * can be either a `struct Neighbour` with at least one confirmed
@@ -1130,6 +1159,18 @@ struct VirtualLink
    */
   struct GNUNET_PeerIdentity target;
 
+  /**
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
+   */
+  struct CommunicatorMessageContext *cmc_head;
+
+  /**
+   * Communicators blocked for receiving on @e target as we are waiting
+   * on the @e core_recv_window to increase.
+   */
+  struct CommunicatorMessageContext *cmc_tail;
+
   /**
    * Task scheduled to possibly notfiy core that this peer is no
    * longer counting as confirmed.  Runs the #core_visibility_check(),
@@ -1152,9 +1193,11 @@ struct VirtualLink
    * How many more messages can we send to core before we exhaust
    * the receive window of CORE for this peer? If this hits zero,
    * we must tell communicators to stop providing us more messages
-   * for this peer.
+   * for this peer.  In fact, the window can go negative as we
+   * have multiple communicators, so per communicator we can go
+   * down by one into the negative range.
    */
-  unsigned int core_recv_window;
+  int core_recv_window;
 };
 
 
@@ -3497,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm)
 
 
 /**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request.  Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
  *
  * @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- *          for transmission failure
- * @param bytes_physical amount of bandwidth consumed
  */
 static void
-client_send_response (struct PendingMessage *pm,
-                      int success,
-                      uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
@@ -3523,10 +3559,7 @@ client_send_response (struct PendingMessage *pm,
     env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
     som->peer = target->pid;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Confirming %s transmission of %u/%u bytes to %s\n",
-                (GNUNET_OK == success) ? "successful" : "failed",
-                (unsigned int) pm->bytes_msg,
-                (unsigned int) bytes_physical,
+                "Confirming transmission to %s\n",
                 GNUNET_i2s (&pm->target->pid));
     GNUNET_MQ_send (tc->mq, env);
   }
@@ -3826,6 +3859,31 @@ check_communicator_available (
 }
 
 
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+  if (0 != ntohl (cmc->im.fc_on))
+  {
+    /* send ACK when done to communicator for flow control! */
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+    ack->reserved = htonl (0);
+    ack->fc_id = cmc->im.fc_id;
+    ack->sender = cmc->im.sender;
+    GNUNET_MQ_send (cmc->tc->mq, env);
+  }
+  GNUNET_SERVICE_client_continue (cmc->tc->client);
+  GNUNET_free (cmc);
+}
+
+
 /**
  * Client confirms that it is done handling message(s) to a particular
  * peer. We may now provide more messages to CORE for this peer.
@@ -3841,6 +3899,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
   struct TransportClient *tc = cls;
   struct VirtualLink *vl;
   uint32_t delta;
+  struct CommunicatorMessageContext *cmc;
 
   if (CT_CORE != tc->type)
   {
@@ -3860,9 +3919,13 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
   }
   delta = ntohl (rom->increase_window_delta);
   vl->core_recv_window += delta;
-  if (delta == vl->core_recv_window)
+  if (vl->core_recv_window <= 0)
+    return;
+  /* resume communicators */
+  while (NULL != (cmc = vl->cmc_tail))
   {
-    // FIXME: resume communicators!
+    GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+    finish_cmc_handling (cmc);
   }
 }
 
@@ -4683,30 +4746,6 @@ handle_del_address (void *cls,
 }
 
 
-/**
- * Context from #handle_incoming_msg().  Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
-  /**
-   * Which communicator provided us with the message.
-   */
-  struct TransportClient *tc;
-
-  /**
-   * Additional information for flow control and about the sender.
-   */
-  struct GNUNET_TRANSPORT_IncomingMessage im;
-
-  /**
-   * Number of hops the message has travelled (if DV-routed).
-   * FIXME: make use of this in ACK handling!
-   */
-  uint16_t total_hops;
-};
-
-
 /**
  * Given an inbound message @a msg from a communicator @a cmc,
  * demultiplex it based on the type calling the right handler.
@@ -4719,31 +4758,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
                       const struct GNUNET_MessageHeader *msg);
 
 
-/**
- * Send ACK to communicator (if requested) and free @a cmc.
- *
- * @param cmc context for which we are done handling the message
- */
-static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
-{
-  if (0 != ntohl (cmc->im.fc_on))
-  {
-    /* send ACK when done to communicator for flow control! */
-    struct GNUNET_MQ_Envelope *env;
-    struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
-
-    env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
-    ack->reserved = htonl (0);
-    ack->fc_id = cmc->im.fc_id;
-    ack->sender = cmc->im.sender;
-    GNUNET_MQ_send (cmc->tc->mq, env);
-  }
-  GNUNET_SERVICE_client_continue (cmc->tc->client);
-  GNUNET_free (cmc);
-}
-
-
 /**
  * Communicator gave us an unencapsulated message to pass as-is to
  * CORE.  Process the request.
@@ -4756,6 +4770,7 @@ static void
 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
 {
   struct CommunicatorMessageContext *cmc = cls;
+  struct VirtualLink *vl;
   uint16_t size = ntohs (mh->size);
 
   if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
@@ -4768,6 +4783,25 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     GNUNET_SERVICE_client_drop (client);
     return;
   }
+  vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+  if (NULL == vl)
+  {
+    /* FIXME: sender is giving us messages for CORE but we don't have
+       the link up yet! I *suspect* this can happen right now (i.e.
+       sender has verified us, but we didn't verify sender), but if
+       we pass this on, CORE would be confused (link down, messages
+       arrive).  We should investigate more if this happens often,
+       or in a persistent manner, and possibly do "something" about
+       it. Thus logging as error for now. */
+    GNUNET_break_op (0);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (virtual link still down)",
+                              1,
+                              GNUNET_NO);
+
+    finish_cmc_handling (cmc);
+    return;
+  }
   /* Forward to all CORE clients */
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
@@ -4781,11 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
   }
-  /* FIXME: consider doing this _only_ once the message
-     was drained from the CORE MQs to extend flow control to CORE!
-     (basically, increment counter in cmc, decrement on MQ send continuation!
-   */
-  finish_cmc_handling (cmc);
+  vl->core_recv_window--;
+  if (vl->core_recv_window > 0)
+  {
+    finish_cmc_handling (cmc);
+    return;
+  }
+  /* Wait with calling #finish_cmc_handling(cmc) until the message
+     was processed by CORE MQs (for CORE flow control)! */
+  GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
 }
 
 
@@ -5345,7 +5383,8 @@ handle_reliability_ack (void *cls,
   }
 
   ack_counter = htonl (ra->ack_counter);
-  // FIXME: track ACK losses based on ack_counter somewhere!
+  (void) ack_counter; /* silence compiler warning for now */
+  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
   // (DV and/or Neighbour?)
   finish_cmc_handling (cmc);
 }
@@ -7380,7 +7419,7 @@ reliability_box_message (struct Queue *queue,
   {
     /* failed hard */
     GNUNET_break (0);
-    client_send_response (pm, GNUNET_NO, 0);
+    client_send_response (pm);
     return NULL;
   }
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
@@ -7531,7 +7570,7 @@ transmit_on_queue (void *cls)
       (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
   {
     /* Full message sent, and over reliabile channel */
-    client_send_response (pm, GNUNET_YES, pm->bytes_msg);
+    client_send_response (pm);
   }
   else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
             queue->tc->details.communicator.cc) &&
@@ -7556,10 +7595,7 @@ transmit_on_queue (void *cls)
 
     /* Was this the last applicable fragmment? */
     if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (
-        pm,
-        GNUNET_YES,
-        pm->bytes_msg /* FIXME: calculate and add overheads! */);
+      client_send_response (pm);
   }
   else if (PMT_CORE != pm->pmt)
   {