misc work on TNG
authorChristian Grothoff <christian@grothoff.org>
Mon, 15 Apr 2019 09:32:51 +0000 (11:32 +0200)
committerChristian Grothoff <christian@grothoff.org>
Mon, 15 Apr 2019 09:34:42 +0000 (11:34 +0200)
src/include/gnunet_time_lib.h
src/transport/gnunet-service-tng.c
src/util/scheduler.c
src/util/time.c

index 482ae52d881f88da750c1c7be084e014bc2fe586..d0da267a9504763b5e7a67dd235087cebb86d71c 100644 (file)
@@ -189,6 +189,16 @@ struct GNUNET_TIME_Relative
 GNUNET_TIME_randomized_backoff(struct GNUNET_TIME_Relative rt, struct GNUNET_TIME_Relative threshold);
 
 
+/**
+ * Return a random time value between 0.5*r and 1.5*r.
+ * 
+ * @param r input time for scaling
+ * @return randomized time
+ */ 
+struct GNUNET_TIME_Relative
+GNUNET_TIME_randomize(struct GNUNET_TIME_Relative r);
+
+
 /**
  * Return relative time of 0ms.
  */
index 2dd68bcc80a6c9b9db57a9a57fc688f8f1787591..568e5b1d7449429116d0fcafee265b9a6dab6485 100644 (file)
  */
 #define MAX_DV_HOPS_ALLOWED 16
 
+/**
+ * Maximum number of DV learning activities we may
+ * have pending at the same time.
+ */
+#define MAX_DV_LEARN_PENDING 64
+
 /**
  * Maximum number of DV paths we keep simultaneously to the same target.
  */
  */
 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
+/**
+ * We only consider queues as "quality" connections when      
+ * suppressing the generation of DV initiation messages if 
+ * the latency of the queue is below this threshold.
+ */
+#define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
+
 /**
  * How long do we consider a DV path valid if we see no
  * further updates on it? Note: the value chosen here might be too low!
  */
 #define MAX_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
 
+/**
+ * What is the non-randomized base frequency at which we
+ * would initiate DV learn messages?
+ */
+#define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
+
+/**
+ * How many good connections (confirmed, bi-directional, not DV)
+ * do we need to have to suppress initiating DV learn messages?
+ */
+#define DV_LEARN_QUALITY_THRESHOLD 100
+  
 /**
  * When do we forget an invalid address for sure?
  */
@@ -791,6 +816,36 @@ enum ClientType
 };
 
 
+/**
+ * When did we launch this DV learning activity? 
+ */
+struct LearnLaunchEntry
+{
+
+  /**
+   * Kept (also) in a DLL sorted by launch time.
+   */
+  struct LearnLaunchEntry *prev;
+
+  /**
+   * Kept (also) in a DLL sorted by launch time.
+   */
+  struct LearnLaunchEntry *next;
+
+  /**
+   * Challenge that uniquely identifies this activity.
+   */
+  struct GNUNET_ShortHashCode challenge;
+
+  /**
+   * When did we transmit the DV learn message (used to
+   * calculate RTT).
+   */
+  struct GNUNET_TIME_Absolute launch_time;
+
+};
+
+
 /**
  * Entry in our cache of ephemeral keys we currently use.
  * This way, we only sign an ephemeral once per @e target,
@@ -1060,6 +1115,7 @@ struct Queue
 
   /**
    * Distance to the target of this queue.
+   * FIXME: needed? DV is done differently these days...
    */
   uint32_t distance;
 
@@ -1813,6 +1869,21 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
 
+/**
+ * Map from challenges to `struct LearnLaunchEntry` values.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
+
+/**
+ * Head of a DLL sorted by launch time.
+ */
+static struct LearnLaunchEntry *lle_head;
+
+/**
+ * Tail of a DLL sorted by launch time.
+ */
+static struct LearnLaunchEntry *lle_tail;
+
 /**
  * MIN Heap sorted by "next_challenge" to `struct ValidationState` entries
  * sorting addresses we are aware of by when we should next try to (re)validate
@@ -1845,6 +1916,11 @@ static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
  */
 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
 
+/**
+ * Task run to initiate DV learning.
+ */
+static struct GNUNET_SCHEDULER_Task *dvlearn_task;
+
 /**
  * Task to run address validation.
  */
@@ -2088,6 +2164,7 @@ client_connect_cb (void *cls,
 {
   struct TransportClient *tc;
 
+  (void) cls;
   tc = GNUNET_new (struct TransportClient);
   tc->client = client;
   tc->mq = mq;
@@ -2501,6 +2578,7 @@ client_disconnect_cb (void *cls,
 {
   struct TransportClient *tc = app_ctx;
 
+  (void) cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client %p disconnected, cleaning up.\n",
               tc);
@@ -2732,7 +2810,7 @@ client_send_response (struct PendingMessage *pm,
     som->bytes_physical = htonl (bytes_physical);
     som->peer = target->pid;
     GNUNET_MQ_send (tc->mq,
-                   env);
+                    env);
   }
   free_pending_message (pm);
 }
@@ -2935,6 +3013,7 @@ check_communicator_backchannel (void *cls,
   uint16_t msize;
   uint16_t isize;
 
+  (void) cls;
   msize = ntohs (cb->header.size) - sizeof (*cb);
   if (UINT16_MAX - msize >
       sizeof (struct TransportBackchannelEncapsulationMessage) +
@@ -3761,6 +3840,7 @@ check_backchannel_encapsulation (void *cls,
 {
   uint16_t size = ntohs (be->header.size);
 
+  (void) cls;
   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
@@ -4012,6 +4092,7 @@ check_dv_learn (void *cls,
   uint16_t num_hops = ntohs (dvl->num_hops);
   const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
 
+  (void) cls;
   if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
   {
     GNUNET_break_op (0);
@@ -4280,7 +4361,7 @@ handle_dv_learn (void *cls,
         break;
       }
       if ( (GNUNET_NO == iret) &&
-           (nhops - 1 == i) )
+           (nhops == i + 1) )
       {
         /* we have better paths, and this is the longest target,
            so there cannot be anything interesting later */
@@ -4359,6 +4440,7 @@ check_dv_box (void *cls,
   uint16_t isize;
   uint16_t itype;
 
+  (void) cls;
   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
   {
     GNUNET_break_op (0);
@@ -5239,6 +5321,8 @@ tracker_update_out_cb (void *cls)
 static void
 tracker_excess_out_cb (void *cls)
 {
+  (void) cls;
+    
   /* FIXME: trigger excess bandwidth report to core? Right now,
      this is done internally within transport_api2_core already,
      but we probably want to change the logic and trigger it
@@ -5261,6 +5345,8 @@ tracker_excess_out_cb (void *cls)
 static void
 tracker_excess_in_cb (void *cls)
 {
+  (void) cls;
+
   /* TODO: maybe inform somone at this point? */
   GNUNET_STATISTICS_update (GST_stats,
                             "# Excess inbound bandwidth reported",
@@ -5560,6 +5646,8 @@ validation_transmit_on_queue (struct Queue *q,
   tvc->reserved = htonl (0);
   tvc->challenge = vs->challenge;
   tvc->sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
+  // FIXME: not so easy, need to BOX this message
+  // in a transmission request! (mistake also done elsewhere!)
   GNUNET_MQ_send (q->tc->mq,
                   env);
 }
@@ -5624,6 +5712,193 @@ validation_start_cb (void *cls)
 }
 
 
+/**
+ * Closure for #check_connection_quality.
+ */
+struct QueueQualityContext
+{
+  /**
+   * Set to the @e k'th queue encountered.
+   */ 
+  struct Queue *q;
+
+  /**
+   * Set to the number of quality queues encountered.
+   */
+  unsigned int quality_count;
+
+  /**
+   * Set to the total number of queues encountered.
+   */ 
+  unsigned int num_queues;
+
+  /**
+   * Decremented for each queue, for selection of the
+   * k-th queue in @e q.
+   */
+  unsigned int k;
+
+};
+
+
+/**
+ * Check whether any queue to the given neighbour is
+ * of a good "quality" and if so, increment the counter.
+ * Also counts the total number of queues, and returns
+ * the k-th queue found.
+ *
+ * @param cls a `struct QueueQualityContext *` with counters
+ * @param pid peer this is about
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+check_connection_quality (void *cls,
+                          const struct GNUNET_PeerIdentity *pid,
+                          void *value)
+{
+  struct QueueQualityContext *ctx = cls;
+  struct Neighbour *n = value;
+  int do_inc;
+
+  (void) pid;
+  do_inc = GNUNET_NO;
+  for (struct Queue *q = n->queue_head;
+       NULL != q;
+       q = q->next_neighbour)
+  {
+    if (0 != q->distance)
+      continue; /* DV does not count */
+    ctx->num_queues++;
+    if (0 == ctx->k--)
+      ctx->q = q;
+    /* OPTIMIZE-FIXME: in the future, add reliability / goodput
+       statistics and consider those as well here? */
+    if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
+      do_inc = GNUNET_YES;
+  }
+  if (GNUNET_YES == do_inc)
+    ctx->quality_count++;
+  return GNUNET_OK;
+}
+
+
+/**
+ * Task run when we CONSIDER initiating a DV learn 
+ * process. We first check that sending out a message is
+ * even possible (queues exist), then that it is desirable
+ * (if not, reschedule the task for later), and finally
+ * we may then begin the job.  If there are too many
+ * entries in the #dvlearn_map, we purge the oldest entry
+ * using #lle_tail.
+ *
+ * @param cls NULL
+ */
+static void
+start_dv_learn (void *cls)
+{
+  struct LearnLaunchEntry *lle;
+  struct QueueQualityContext qqc;
+  struct GNUNET_MQ_Envelope *env;
+  struct TransportDVLearn *dvl;
+
+  (void) cls;
+  dvlearn_task = NULL;
+  if (0 ==
+      GNUNET_CONTAINER_multipeermap_size (neighbours))
+    return; /* lost all connectivity, cannot do learning */
+  qqc.quality_count = 0;
+  qqc.num_queues = 0;
+  GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                         &check_connection_quality,
+                                         &qqc);
+  if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD)
+  {
+    struct GNUNET_TIME_Relative delay;
+    unsigned int factor;
+
+    /* scale our retries by how far we are above the threshold */
+    factor = qqc.quality_count / DV_LEARN_QUALITY_THRESHOLD;
+    delay = GNUNET_TIME_relative_multiply (DV_LEARN_BASE_FREQUENCY,
+                                           factor);
+    dvlearn_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                 &start_dv_learn,
+                                                 NULL);
+    return;
+  }
+  /* remove old entries in #dvlearn_map if it has grown too big */
+  while (MAX_DV_LEARN_PENDING >=
+         GNUNET_CONTAINER_multishortmap_size (dvlearn_map))
+  {
+    lle = lle_tail;
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
+                                                          &lle->challenge,
+                                                          lle));
+    GNUNET_CONTAINER_DLL_remove (lle_head,
+                                 lle_tail,
+                                 lle);
+    GNUNET_free (lle);
+  }
+  /* setup data structure for learning */
+  lle = GNUNET_new (struct LearnLaunchEntry);
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                              &lle->challenge,
+                              sizeof (lle->challenge));
+  GNUNET_CONTAINER_DLL_insert (lle_head,
+                               lle_tail,
+                               lle);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multishortmap_put (dvlearn_map,
+                                                    &lle->challenge,
+                                                    lle,
+                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  env = GNUNET_MQ_msg (dvl,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
+  dvl->num_hops = htons (0);
+  dvl->bidirectional = htons (0);
+  dvl->non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+  {
+    struct DvInitPS dvip = {
+      .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+      .purpose.size = htonl (sizeof (dvip)),
+      .challenge = lle->challenge
+    };
+
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+                                             &dvip.purpose,
+                                             &dvl->init_sig));
+  }
+  dvl->initiator = GST_my_identity;
+  dvl->challenge = lle->challenge;
+
+  qqc.quality_count = 0;
+  qqc.k = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                    qqc.num_queues);
+  qqc.num_queues = 0;
+  qqc.q = NULL;
+  GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                         &check_connection_quality,
+                                         &qqc);
+  GNUNET_assert (NULL != qqc.q);
+  
+  /* Do this as close to transmission time as possible! */
+  lle->launch_time = GNUNET_TIME_absolute_get (); 
+  // FIXME: not so easy, need to BOX this message
+  // in a transmission request! (mistake also done elsewhere!)
+  GNUNET_MQ_send (qqc.q->tc->mq,
+                  env);
+
+  /* reschedule this job, randomizing the time it runs (but no 
+     actual backoff!) */
+  dvlearn_task
+    = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),
+                                    &start_dv_learn,
+                                    NULL);
+}
+
+
 /**
  * A new queue has been created, check if any address validation
  * requests have been waiting for it.
@@ -5747,6 +6022,10 @@ handle_add_queue_message (void *cls,
                                                      &aqm->receiver,
                                                      &check_validation_request_pending,
                                                      queue);
+  /* might be our first queue, try launching DV learning */
+  if (NULL == dvlearn_task)
+    dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn,
+                                             NULL);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -6109,6 +6388,7 @@ static int
 check_request_hello_validation (void *cls,
                                 const struct RequestHelloValidationMessage *m)
 {
+  (void) cls;
   GNUNET_MQ_check_zero_termination (m);
   return GNUNET_OK;
 }
@@ -6234,6 +6514,7 @@ free_validation_state_cb (void *cls,
 static void
 do_shutdown (void *cls)
 {
+  struct LearnLaunchEntry *lle;
   (void) cls;
 
   if (NULL != ephemeral_task)
@@ -6268,6 +6549,15 @@ do_shutdown (void *cls)
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
   validation_map = NULL;
+  while (NULL != (lle = lle_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (lle_head,
+                                 lle_tail,
+                                 lle);
+    GNUNET_free (lle);
+  }
+  GNUNET_CONTAINER_multishortmap_destroy (dvlearn_map);
+  dvlearn_map = NULL;
   GNUNET_CONTAINER_heap_destroy (validation_heap);
   validation_heap = NULL;
   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
@@ -6298,6 +6588,7 @@ run (void *cls,
      struct GNUNET_SERVICE_Handle *service)
 {
   (void) cls;
+  (void) service;
   /* setup globals */
   GST_cfg = c;
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
@@ -6307,6 +6598,8 @@ run (void *cls,
   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
                                                         GNUNET_YES);
   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
+                                                       GNUNET_YES);
   validation_map = GNUNET_CONTAINER_multipeermap_create (1024,
                                                          GNUNET_YES);
   validation_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
@@ -6326,7 +6619,7 @@ run (void *cls,
   GST_stats = GNUNET_STATISTICS_create ("transport",
                                         GST_cfg);
   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
-                                NULL);
+                                 NULL);
   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
   if (NULL == peerstore)
   {
index 3bd7ccec7e4c61cbb1128396e99e067c72c9cc9a..2ddbb8c60f06daf7b4f8f6f72b25cb4fa30147c9 100644 (file)
@@ -915,6 +915,7 @@ driver_add_multiple (struct GNUNET_SCHEDULER_Task *t)
 static void
 install_parent_control_handler (void *cls)
 {
+  (void) cls;
   install_parent_control_task = NULL;
   GNUNET_OS_install_parent_control_handler (NULL);
 }
@@ -926,6 +927,7 @@ shutdown_pipe_cb (void *cls)
   char c;
   const struct GNUNET_DISK_FileHandle *pr;
 
+  (void) cls;
   shutdown_pipe_task = NULL;
   pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle,
                                 GNUNET_DISK_PIPE_END_READ);
index 799c6cc6323fafc935dfca05e6bfbd2c629027a5..758921718abffedb7af3fb204810cd1b18f67d52 100644 (file)
@@ -769,6 +769,22 @@ GNUNET_TIME_randomized_backoff(struct GNUNET_TIME_Relative rt, struct GNUNET_TIM
 }
 
 
+/**
+ * Return a random time value between 0.5*r and 1.5*r.
+ *
+ * @param r input time for scaling
+ * @return randomized time
+ */
+struct GNUNET_TIME_Relative
+GNUNET_TIME_randomize (struct GNUNET_TIME_Relative r)
+{
+  double d = ((rand() % 1001) - 500) / 1000.0;
+
+  return relative_multiply_double (r,
+                                   d);
+}
+
+
 /**
  * Obtain the current time and make sure it is monotonically
  * increasing.  Guards against systems without an RTC or
@@ -819,53 +835,53 @@ GNUNET_TIME_absolute_get_monotonic (const struct GNUNET_CONFIGURATION_Handle *cf
       struct GNUNET_DISK_FileHandle *fh;
 
       fh = GNUNET_DISK_file_open (filename,
-                                 GNUNET_DISK_OPEN_READWRITE | GNUNET_DISK_OPEN_CREATE,
-                                 GNUNET_DISK_PERM_USER_WRITE | GNUNET_DISK_PERM_GROUP_WRITE |
-                                 GNUNET_DISK_PERM_USER_READ  | GNUNET_DISK_PERM_GROUP_READ);
+                                  GNUNET_DISK_OPEN_READWRITE | GNUNET_DISK_OPEN_CREATE,
+                                  GNUNET_DISK_PERM_USER_WRITE | GNUNET_DISK_PERM_GROUP_WRITE |
+                                  GNUNET_DISK_PERM_USER_READ  | GNUNET_DISK_PERM_GROUP_READ);
       if (NULL == fh)
       {
-       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                   _("Failed to map `%s', cannot assure monotonic time!\n"),
-                   filename);
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    _("Failed to map `%s', cannot assure monotonic time!\n"),
+                    filename);
       }
       else
       {
-       off_t size;
-
-       size = 0;
-       GNUNET_break (GNUNET_OK ==
-                     GNUNET_DISK_file_handle_size (fh,
-                                                   &size));
-       if (size < sizeof (*map))
-       {
-         struct GNUNET_TIME_AbsoluteNBO o;
-
-         o = GNUNET_TIME_absolute_hton (now);
-         if (sizeof (o) !=
-             GNUNET_DISK_file_write (fh,
-                                     &o,
-                                     sizeof (o)))
-           size = 0;
-         else
-           size = sizeof (o);
-       }
-       if (size == sizeof (*map))
-       {
-         map = GNUNET_DISK_file_map (fh,
-                                     &map_handle,
-                                     GNUNET_DISK_MAP_TYPE_READWRITE,
-                                     sizeof (*map));
-         if (NULL == map)
-           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                       _("Failed to map `%s', cannot assure monotonic time!\n"),
-                       filename);
-       }
-       else
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Failed to setup monotonic time file `%s', cannot assure monotonic time!\n"),
-                     filename);
-       }
+        off_t size;
+
+        size = 0;
+        GNUNET_break (GNUNET_OK ==
+                      GNUNET_DISK_file_handle_size (fh,
+                                                    &size));
+        if (size < (off_t) sizeof (*map))
+        {
+          struct GNUNET_TIME_AbsoluteNBO o;
+          
+          o = GNUNET_TIME_absolute_hton (now);
+          if (sizeof (o) !=
+              GNUNET_DISK_file_write (fh,
+                                      &o,
+                                      sizeof (o)))
+            size = 0;
+          else
+            size = sizeof (o);
+        }
+        if (size == sizeof (*map))
+        {
+          map = GNUNET_DISK_file_map (fh,
+                                      &map_handle,
+                                      GNUNET_DISK_MAP_TYPE_READWRITE,
+                                      sizeof (*map));
+          if (NULL == map)
+            GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                        _("Failed to map `%s', cannot assure monotonic time!\n"),
+                        filename);
+        }
+        else
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                      _("Failed to setup monotonic time file `%s', cannot assure monotonic time!\n"),
+                      filename);
+        }
       }
       GNUNET_DISK_file_close (fh);
       GNUNET_free (filename);