only notify core about validated queues
authorChristian Grothoff <christian@grothoff.org>
Wed, 17 Apr 2019 20:15:43 +0000 (22:15 +0200)
committerChristian Grothoff <christian@grothoff.org>
Wed, 17 Apr 2019 20:15:53 +0000 (22:15 +0200)
src/transport/gnunet-service-tng.c

index 29bf3bf95a815c9b0e9e294d07a6fa5672adb8ef..f3874724ad8ba71a3e8a899f663add94e6bdc3d1 100644 (file)
  * Implement next:
  * - route_message() implementation, including using DV data structures
  *   (but not when routing certain message types, like DV learn,
- *    MUST pay attention to content here -- or pass extra flags?)
+ *    looks like now like we need two flags (DV/no-DV, confirmed-only,
+ *    unconfirmed OK)
+ *   + NOTE: do NOT use PendingMessage for route_message(), as that is
+ *     for fragmentation/reliability and ultimately core flow control!
+ *     => route_message() should pick the queue
+ *     => in case of DV routing, route_message should BOX the message, too.
+ * - We currently do NEVER tell CORE also about DV-connections (core_visible
+ *    of `struct DistanceVector` is simply never set!)
+ *     + When? Easy if we initiated the DV and got the challenge; do that NOW
+ *        BUT what we passively learned DV (unconfirmed freshness)
+ *        => Do we trigger Challenge->Response there as well, or 'wait' for
+ *           our own DV initiations to discover?
+ *        => What about DV routes that expire? Do we also only count on
+ *           our own DV initiations for maintenance here, or do we
+ *           try to specifically re-confirm the existence of a particular path?
+ *        => OPITMIZATION-FIXME!
+ *   + Where do we track what we told core? Careful: need to check
+ *     the "core_visible' flag in both neighbours and DV before
+ *     sending out notifications to CORE!
  * - retransmission logic
  * - track RTT, distance, loss, etc. => requires extra data structures!
  *
  * FIXME (without marks in the code!):
  * - proper use/initialization of timestamps in messages exchanged
  *   during DV learning
+ * - persistence of monotonic time obtained from other peers
+ *   in PEERSTORE (by message type)
  *
  * Optimizations:
  * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
  *   against our pending message queue (requires additional per neighbour
  *   hash map to be maintained, avoids possible linear scan on pending msgs)
+ * - queue_send_msg and route_message both by API design have to make copies
+ *   of the payload, and route_message on top of that requires a malloc/free.
+ *   Change design to approximate "zero" copy better...
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
@@ -1060,6 +1083,13 @@ struct DistanceVector
    * Task scheduled to purge expired paths from @e dv_head MDLL.
    */
   struct GNUNET_SCHEDULER_Task *timeout_task;
+
+  /**
+   * Is one of the DV paths in this struct 'confirmed' and thus
+   * the cause for CORE to see this peer as connected? (Note that
+   * the same may apply to a `struct Neighbour` at the same time.)
+   */
+  int core_visible;
 };
 
 
@@ -1161,11 +1191,26 @@ struct Queue
    */
   struct GNUNET_SCHEDULER_Task *transmit_task;
 
+  /**
+   * Task scheduled to possibly notfiy core that this queue is no longer
+   * counting as confirmed.  Runs the #core_queue_visibility_check().
+   */
+  struct GNUNET_SCHEDULER_Task *visibility_task;
+
   /**
    * Our current RTT estimate for this queue.
    */
   struct GNUNET_TIME_Relative rtt;
 
+  /**
+   * How long do *we* consider this @e address to be valid?  In the past or
+   * zero if we have not yet validated it.  Can be updated based on
+   * challenge-response validations (via address validation logic), or when we
+   * receive ACKs that we can definitively map to transmissions via this
+   * queue.
+   */
+  struct GNUNET_TIME_Absolute validated_until;
+
   /**
    * Message ID generator for transmissions on this queue.
    */
@@ -1397,6 +1442,11 @@ struct Neighbour
    */
   struct GNUNET_TIME_Absolute earliest_timeout;
 
+  /**
+   * Do we have a confirmed working queue and are thus visible to
+   * CORE?
+   */
+  int core_visible;
 };
 
 
@@ -2513,6 +2563,16 @@ schedule_transmit_on_queue (struct Queue *queue)
 }
 
 
+/**
+ * Check whether the CORE visibility of @a n changed. If so,
+ * check whether we need to notify CORE.
+ *
+ * @param n neighbour to perform the check for
+ */
+static void
+update_neighbour_core_visibility (struct Neighbour *n);
+
+
 /**
  * Free @a queue.
  *
@@ -2535,6 +2595,11 @@ free_queue (struct Queue *queue)
     GNUNET_SCHEDULER_cancel (queue->transmit_task);
     queue->transmit_task = NULL;
   }
+  if (NULL != queue->visibility_task)
+  {
+    GNUNET_SCHEDULER_cancel (queue->visibility_task);
+    queue->visibility_task = NULL;
+  }
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                 neighbour->queue_head,
                                 neighbour->queue_tail,
@@ -2574,9 +2639,12 @@ free_queue (struct Queue *queue)
   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
   GNUNET_free (queue);
+
+  update_neighbour_core_visibility (neighbour);
+    cores_send_disconnect_info (&neighbour->pid);
+
   if (NULL == neighbour->queue_head)
   {
-    cores_send_disconnect_info (&neighbour->pid);
     free_neighbour (neighbour);
   }
 }
@@ -3208,17 +3276,90 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
 }
 
 
+/**
+ * Send the control message @a payload on @a queue.
+ *
+ * @param queue the queue to use for transmission
+ * @param pm pending message to update once transmission is done, may be NULL!
+ * @param payload the payload to send (encapsulated in a
+ *        #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
+ * @param payload_size number of bytes in @a payload
+ */
+static void
+queue_send_msg (struct Queue *queue,
+                struct PendingMessage *pm,
+                const void *payload,
+                size_t payload_size)
+{
+  struct Neighbour *n = queue->neighbour;
+  struct GNUNET_TRANSPORT_SendMessageTo *smt;
+  struct GNUNET_MQ_Envelope *env;
+
+  env = GNUNET_MQ_msg_extra (smt,
+                             payload_size,
+                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
+  smt->qid = queue->qid;
+  smt->mid = queue->mid_gen;
+  smt->receiver = n->pid;
+  memcpy (&smt[1],
+          payload,
+          payload_size);
+  {
+    /* Pass the env to the communicator of queue for transmission. */
+    struct QueueEntry *qe;
+
+    qe = GNUNET_new (struct QueueEntry);
+    qe->mid = queue->mid_gen++;
+    qe->queue = queue;
+    // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
+    // (also, note that pm may be NULL!)
+    GNUNET_CONTAINER_DLL_insert (queue->queue_head,
+                                 queue->queue_tail,
+                                 qe);
+    GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
+    queue->queue_length++;
+    queue->tc->details.communicator.total_queue_length++;
+    GNUNET_MQ_send (queue->tc->mq,
+                    env);
+  }
+}
+
+
 /**
  * We need to transmit @a hdr to @a target.  If necessary, this may
  * involve DV routing or even broadcasting and fragmentation.
  *
  * @param target peer to receive @a hdr
- * @param hdr header of the message to route
+ * @param hdr header of the message to route and #GNUNET_free()
  */
 static void
 route_message (const struct GNUNET_PeerIdentity *target,
                struct GNUNET_MessageHeader *hdr)
 {
+  // Cases:
+  // 1: called to transmit backchannel message we initiated
+  // 2: called to transmit fragment ack
+  // 3: called to transmit reliability box
+  // 4: called to forward backchannel message
+  // 5: called to forward DV learn message (caller already picked random neighbour(s))!
+  // 6: called to forward DV Box message
+  // 7: called to forward valdiation response
+
+  // Choices:
+  // a) Send ONLY to a *confirmed* direct neighbour
+  // b) Send allowed to *unconfirmed* direct neighbour
+  // c) Route also via *confirmed* DV to target
+  // c) Route allowed via *unconfirmed  DV to target
+  // => One BIT "dv allowed or not", plus one BIT "confirmed/unconfirmed" might do!
+
+  // Case analysis:
+  //         1       2        3        4       5       6      7
+  // a       X       X        X        X       X       X      X
+  // b                                         X              X
+  // c       X       X        X        X                      X
+  // d                                                        X
+  //
+
   // FIXME: this one is tricky:
   // - we could try a direct, reliable channel
   // - if that is unavailable / for load balancing, we may try:
@@ -4305,6 +4446,8 @@ check_backchannel_encapsulation (void *cls,
 
 /**
  * Communicator gave us a backchannel encapsulation.  Process the request.
+ * (We are not the origin of the backchannel here, the communicator simply
+ * received a backchannel message and we are expected to forward it.)
  *
  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
  * @param be the message that was received
@@ -4811,6 +4954,10 @@ handle_dv_learn (void *cls,
       ilat = GNUNET_TIME_relative_multiply (network_latency,
                                             i);
       path[i] = hops[i-1].hop;
+      // FIXME: mark ALL of these as *confirmed* (with what timeout?)
+      // -- and schedule a job for the confirmation to time out! --
+      // and possibly do #cores_send_connect_info() if
+      // the respective neighbour is NOT confirmed yet!
       learn_dv_path (path,
                      i,
                      ilat);
@@ -5262,6 +5409,107 @@ update_next_challenge_time (struct ValidationState *vs,
 }
 
 
+/**
+ * Find the queue matching @a pid and @a address.
+ *
+ * @param pid peer the queue must go to
+ * @param address address the queue must use
+ * @return NULL if no such queue exists
+ */
+static struct Queue *
+find_queue (const struct GNUNET_PeerIdentity *pid,
+            const char *address)
+{
+  struct Neighbour *n;
+
+  n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+                                         pid);
+  if (NULL == n)
+    return NULL;
+  for (struct Queue *pos = n->queue_head;
+       NULL != pos;
+       pos = pos->next_neighbour)
+  {
+    if (0 == strcmp (pos->address,
+                     address))
+      return pos;
+  }
+  return NULL;
+}
+
+
+/**
+ * Task run periodically to check whether the validity of the given queue has
+ * run its course. If so, finds either another queue to take over, or clears
+ * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
+ * chance to take over, and if that fails, notifies CORE about the disconnect.
+ *
+ * @param cls a `struct Queue`
+ */
+static void
+core_queue_visibility_check (void *cls)
+{
+  struct Queue *q = cls;
+
+  q->visibility_task = NULL;
+  if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
+  {
+    q->visibility_task
+      = GNUNET_SCHEDULER_add_at (q->validated_until,
+                                 &core_queue_visibility_check,
+                                 q);
+    return;
+  }
+  update_neighbour_core_visibility (q->neighbour);
+}
+
+
+/**
+ * Check whether the CORE visibility of @a n should change.  Finds either a
+ * queue to preserve the visibility, or clears the neighbour's `core_visible`
+ * flag. In the latter case, gives DV routes a chance to take over, and if
+ * that fails, notifies CORE about the disconnect.  If so, check whether we
+ * need to notify CORE.
+ *
+ * @param n neighbour to perform the check for
+ */
+static void
+update_neighbour_core_visibility (struct Neighbour *n)
+{
+  struct DistanceVector *dv;
+
+  GNUNET_assert (GNUNET_YES == n->core_visible);
+  /* Check if _any_ queue of this neighbour is still valid, if so, schedule
+     the #core_queue_visibility_check() task for that queue */
+  for (struct Queue *q = n->queue_head;
+       NULL != q;
+       q = q->next_neighbour)
+  {
+    if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
+    {
+      /* found a valid queue, use this one */
+      q->visibility_task
+        = GNUNET_SCHEDULER_add_at (q->validated_until,
+                                   &core_queue_visibility_check,
+                                   q);
+      return;
+    }
+  }
+  n->core_visible = GNUNET_NO;
+
+  /* Check if _any_ DV route to this neighbour is currently
+     valid, if so, do NOT tell core about the loss of direct
+     connectivity (DV still counts!) */
+  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
+                                          &n->pid);
+  if (GNUNET_YES == dv->core_visible)
+    return;
+  /* Nothing works anymore, need to tell CORE about the loss of
+     connectivity! */
+  cores_send_disconnect_info (&n->pid);
+}
+
+
 /**
  * Communicator gave us a transport address validation response.  Process the request.
  *
@@ -5279,6 +5527,8 @@ handle_validation_response (void *cls,
     .vs = NULL
   };
   struct GNUNET_TIME_Absolute origin_time;
+  struct Queue *q;
+  struct DistanceVector *dv;
 
   /* check this is one of our challenges */
   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -5357,8 +5607,39 @@ handle_validation_response (void *cls,
                                    GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
                                    &peerstore_store_validation_cb,
                                    vs);
-  // FIXME: should we find the matching queue and update the RTT?
   finish_cmc_handling (cmc);
+
+  /* Finally, we now possibly have a confirmed (!) working queue,
+     update queue status (if queue still is around) */
+  q = find_queue (&vs->pid,
+                  vs->address);
+  if (NULL == q)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# Queues lost at time of successful validation",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+  q->validated_until = vs->validated_until;
+  q->rtt = vs->validation_rtt;
+  if (GNUNET_NO != q->neighbour->core_visible)
+    return; /* nothing changed, we are done here */
+  q->neighbour->core_visible = GNUNET_YES;
+  q->visibility_task
+    = GNUNET_SCHEDULER_add_at (q->validated_until,
+                               &core_queue_visibility_check,
+                               q);
+  /* Check if _any_ DV route to this neighbour is
+     currently valid, if so, do NOT tell core anything! */
+  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
+                                          &q->neighbour->pid);
+  if (GNUNET_YES == dv->core_visible)
+    return; /* nothing changed, done */
+  /* We lacked a confirmed connection to the neighbour
+     before, so tell CORE about it (finally!) */
+  cores_send_connect_info (&q->neighbour->pid,
+                           GNUNET_BANDWIDTH_ZERO);
 }
 
 
@@ -5640,19 +5921,19 @@ reliability_box_message (struct PendingMessage *pm)
     /* failed hard */
     GNUNET_break (0);
     client_send_response (pm,
-                         GNUNET_NO,
-                         0);
+                          GNUNET_NO,
+                          0);
     return NULL;
   }
   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
-                      sizeof (rbox) +
-                      pm->bytes_msg);
+                       sizeof (rbox) +
+                       pm->bytes_msg);
   bpm->target = pm->target;
   bpm->frag_parent = pm;
   GNUNET_CONTAINER_MDLL_insert (frag,
-                               pm->head_frag,
-                               pm->tail_frag,
-                               bpm);
+                                pm->head_frag,
+                                pm->tail_frag,
+                                bpm);
   bpm->timeout = pm->timeout;
   bpm->pmt = PMT_RELIABILITY_BOX;
   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
@@ -5663,65 +5944,16 @@ reliability_box_message (struct PendingMessage *pm)
   rbox.msg_uuid = pm->msg_uuid;
   msg = (char *) &bpm[1];
   memcpy (msg,
-         &rbox,
-         sizeof (rbox));
+          &rbox,
+          sizeof (rbox));
   memcpy (&msg[sizeof (rbox)],
-         &pm[1],
-         pm->bytes_msg);
+          &pm[1],
+          pm->bytes_msg);
   pm->bpm = bpm;
   return bpm;
 }
 
 
-/**
- * Send the control message @a payload on @a queue.
- *
- * @param queue the queue to use for transmission
- * @param pm pending message to update once transmission is done, may be NULL!
- * @param payload the payload to send (encapsulated in a
- *        #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
- * @param payload_size number of bytes in @a payload
- */
-static void
-queue_send_msg (struct Queue *queue,
-                struct PendingMessage *pm,
-                const void *payload,
-                size_t payload_size)
-{
-  struct Neighbour *n = queue->neighbour;
-  struct GNUNET_TRANSPORT_SendMessageTo *smt;
-  struct GNUNET_MQ_Envelope *env;
-
-  env = GNUNET_MQ_msg_extra (smt,
-                             payload_size,
-                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
-  smt->qid = queue->qid;
-  smt->mid = queue->mid_gen;
-  smt->receiver = n->pid;
-  memcpy (&smt[1],
-          payload,
-          payload_size);
-  {
-    /* Pass the env to the communicator of queue for transmission. */
-    struct QueueEntry *qe;
-
-    qe = GNUNET_new (struct QueueEntry);
-    qe->mid = queue->mid_gen++;
-    qe->queue = queue;
-    // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
-    // (also, note that pm may be NULL!)
-    GNUNET_CONTAINER_DLL_insert (queue->queue_head,
-                                 queue->queue_tail,
-                                 qe);
-    GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
-    queue->queue_length++;
-    queue->tc->details.communicator.total_queue_length++;
-    GNUNET_MQ_send (queue->tc->mq,
-                    env);
-  }
-}
-
-
 /**
  * We believe we are ready to transmit a message on a queue. Double-checks
  * with the queue's "tracker_out" and then gives the message to the
@@ -6268,7 +6500,6 @@ static void
 validation_start_cb (void *cls)
 {
   struct ValidationState *vs;
-  struct Neighbour *n;
   struct Queue *q;
 
   (void) cls;
@@ -6284,23 +6515,8 @@ validation_start_cb (void *cls)
   if (NULL == vs)
     return; /* woopsie, no more addresses known, should only
                happen if we're really a lonely peer */
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours,
-                                         &vs->pid);
-  q = NULL;
-  if (NULL != n)
-  {
-    for (struct Queue *pos = n->queue_head;
-         NULL != pos;
-         pos = pos->next_neighbour)
-    {
-      if (0 == strcmp (pos->address,
-                       vs->address))
-      {
-        q = pos;
-        break;
-      }
-    }
-  }
+  q = find_queue (&vs->pid,
+                  vs->address);
   if (NULL == q)
   {
     vs->awaiting_queue = GNUNET_YES;
@@ -6570,8 +6786,6 @@ handle_add_queue_message (void *cls,
                                                       &neighbour->pid,
                                                       neighbour,
                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    cores_send_connect_info (&neighbour->pid,
-                             GNUNET_BANDWIDTH_ZERO);
   }
   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
   addr = (const char *) &aqm[1];