loglevel MESSAGE for the incoming connection message + type
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_connection.c
index a098674f51643affb4c5053a6c23f4c5ef59703a..6976e66e4af6406b88c705d48165f3215136b9fc 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
      This file is part of GNUnet.
      Copyright (C) 2001-2017 GNUnet e.V.
@@ -25,9 +24,6 @@
  *        end-to-end routes and transmits messages along the route
  * @author Bartlomiej Polot
  * @author Christian Grothoff
- *
- * TODO:
- * - Optimization: keep per-connection performance metrics (?)
  */
 #include "platform.h"
 #include "gnunet-service-cadet-new.h"
@@ -139,11 +135,26 @@ struct CadetConnection
    */
   struct GNUNET_TIME_Relative retry_delay;
 
+  /**
+   * Performance metrics for this connection.
+   */
+  struct CadetConnectionMetrics metrics;
+
   /**
    * State of the connection.
    */
   enum CadetConnectionState state;
 
+  /**
+   * Options for the route, control buffering.
+   */
+  enum GNUNET_CADET_ChannelOption options;
+
+  /**
+   * How many latency observations did we make for this connection?
+   */
+  unsigned int latency_datapoints;
+
   /**
    * Offset of our @e destination in @e path.
    */
@@ -157,6 +168,51 @@ struct CadetConnection
 };
 
 
+/**
+ * Lookup a connection by its identifier.
+ *
+ * @param cid identifier to resolve
+ * @return NULL if connection was not found
+ */
+struct CadetConnection *
+GCC_lookup (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
+{
+  return GNUNET_CONTAINER_multishortmap_get (connections,
+                                             &cid->connection_of_tunnel);
+}
+
+
+/**
+ * Update the connection state. Also triggers the necessary
+ * MQM notifications.
+ *
+ * @param cc connection to update the state for
+ * @param new_state new state for @a cc
+ * @param new_mqm_ready new `mqm_ready` state for @a cc
+ */
+static void
+update_state (struct CadetConnection *cc,
+              enum CadetConnectionState new_state,
+              int new_mqm_ready)
+{
+  int old_ready;
+  int new_ready;
+
+  if ( (new_state == cc->state) &&
+       (new_mqm_ready == cc->mqm_ready) )
+    return; /* no change, nothing to do */
+  old_ready = ( (CADET_CONNECTION_READY == cc->state) &&
+                (GNUNET_YES == cc->mqm_ready) );
+  new_ready = ( (CADET_CONNECTION_READY == new_state) &&
+                (GNUNET_YES == new_mqm_ready) );
+  cc->state = new_state;
+  cc->mqm_ready = new_mqm_ready;
+  if (old_ready != new_ready)
+    cc->ready_cb (cc->ready_cb_cls,
+                  new_ready);
+}
+
+
 /**
  * Destroy a connection, part of the internal implementation.  Called
  * only from #GCC_destroy_from_core() or #GCC_destroy_from_tunnel().
@@ -188,6 +244,10 @@ GCC_destroy (struct CadetConnection *cc)
   GCPP_del_connection (cc->path,
                        cc->off,
                        cc);
+  for (unsigned int i=0;i<cc->off;i++)
+    GCP_remove_connection (GCPP_get_peer_at_offset (cc->path,
+                                                    i),
+                           cc);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multishortmap_remove (connections,
                                                         &GCC_get_id (cc)->connection_of_tunnel,
@@ -259,6 +319,19 @@ GCC_get_ct (struct CadetConnection *cc)
 }
 
 
+/**
+ * Obtain performance @a metrics from @a cc.
+ *
+ * @param cc connection to query
+ * @return the metrics
+ */
+const struct CadetConnectionMetrics *
+GCC_get_metrics (struct CadetConnection *cc)
+{
+  return &cc->metrics;
+}
+
+
 /**
  * Send a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_KEEPALIVE through the
  * tunnel to prevent it from timing out.
@@ -274,9 +347,12 @@ send_keepalive (void *cls);
  * schedule the next one.
  *
  * @param cls the `struct CadetConnection` to keep alive.
+ * @param cid identifier of the connection within the tunnel, NULL
+ *            if transmission failed
  */
 static void
-keepalive_done (void *cls)
+keepalive_done (void *cls,
+                const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
 {
   struct CadetConnection *cc = cls;
 
@@ -302,6 +378,14 @@ send_keepalive (void *cls)
   struct GNUNET_MessageHeader msg;
 
   cc->task = NULL;
+  if (CADET_TUNNEL_KEY_OK != GCT_get_estate (cc->ct->t))
+  {
+    /* Tunnel not yet ready, wait with keepalives... */
+    cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period,
+                                             &send_keepalive,
+                                             cc);
+    return;
+  }
   GNUNET_assert (NULL != cc->ct);
   GNUNET_assert (GNUNET_YES == cc->mqm_ready);
   GNUNET_assert (NULL == cc->keepalive_qe);
@@ -324,6 +408,81 @@ send_keepalive (void *cls)
 }
 
 
+/**
+ * We sent a message for which we expect to receive an ACK via
+ * the connection identified by @a cti.
+ *
+ * @param cid connection identifier where we expect an ACK
+ */
+void
+GCC_ack_expected (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
+{
+  struct CadetConnection *cc;
+
+  cc = GCC_lookup (cid);
+  if (NULL == cc)
+    return; /* whopise, connection alredy down? */
+  cc->metrics.num_acked_transmissions++;
+}
+
+
+/**
+ * We observed an ACK for a message that was originally sent via
+ * the connection identified by @a cti.
+ *
+ * @param cti connection identifier where we got an ACK for a message
+ *            that was originally sent via this connection (the ACK
+ *            may have gotten back to us via a different connection).
+ */
+void
+GCC_ack_observed (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
+{
+  struct CadetConnection *cc;
+
+  cc = GCC_lookup (cid);
+  if (NULL == cc)
+    return; /* whopise, connection alredy down? */
+  cc->metrics.num_successes++;
+}
+
+
+/**
+ * We observed some the given @a latency on the connection
+ * identified by @a cti.  (The same connection was taken
+ * in both directions.)
+ *
+ * @param cid connection identifier where we measured latency
+ * @param latency the observed latency
+ */
+void
+GCC_latency_observed (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+                      struct GNUNET_TIME_Relative latency)
+{
+  struct CadetConnection *cc;
+  double weight;
+  double result;
+
+  cc = GCC_lookup (cid);
+  if (NULL == cc)
+    return; /* whopise, connection alredy down? */
+  GNUNET_STATISTICS_update (stats,
+                            "# latencies observed",
+                            1,
+                            GNUNET_NO);
+  cc->latency_datapoints++;
+  if (cc->latency_datapoints >= 7)
+    weight = 7.0;
+  else
+    weight = cc->latency_datapoints;
+  /* Compute weighted average, giving at MOST weight 7 to the
+     existing values, or less if that value is based on fewer than 7
+     measurements. */
+  result = (weight * cc->metrics.aged_latency.rel_value_us) + 1.0 * latency.rel_value_us;
+  result /= (weight + 1.0);
+  cc->metrics.aged_latency.rel_value_us = (uint64_t) result;
+}
+
+
 /**
  * A #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK was received for this connection, implying
  * that the end-to-end connection is up.  Process it.
@@ -338,23 +497,23 @@ GCC_handle_connection_create_ack (struct CadetConnection *cc)
        GCC_2s (cc),
        cc->state,
        (GNUNET_YES == cc->mqm_ready) ? "MQM ready" : "MQM busy");
+  if (CADET_CONNECTION_READY == cc->state)
+    return; /* Duplicate ACK, ignore */
   if (NULL != cc->task)
   {
     GNUNET_SCHEDULER_cancel (cc->task);
     cc->task = NULL;
   }
-  cc->state = CADET_CONNECTION_READY;
-  if (GNUNET_YES == cc->mqm_ready)
-  {
-    cc->ready_cb (cc->ready_cb_cls,
-                  GNUNET_YES);
-    if ( (NULL == cc->keepalive_qe) &&
-         (GNUNET_YES == cc->mqm_ready) &&
-         (NULL == cc->task) )
-      cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period,
-                                               &send_keepalive,
-                                               cc);
-  }
+  cc->metrics.age = GNUNET_TIME_absolute_get ();
+  update_state (cc,
+                CADET_CONNECTION_READY,
+                cc->mqm_ready);
+  if ( (NULL == cc->keepalive_qe) &&
+       (GNUNET_YES == cc->mqm_ready) &&
+       (NULL == cc->task) )
+    cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period,
+                                             &send_keepalive,
+                                             cc);
 }
 
 
@@ -382,6 +541,30 @@ GCC_handle_kx (struct CadetConnection *cc,
 }
 
 
+/**
+ * Handle KX_AUTH message.
+ *
+ * @param cc connection that received encrypted message
+ * @param msg the key exchange message
+ */
+void
+GCC_handle_kx_auth (struct CadetConnection *cc,
+                    const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
+{
+  if (CADET_CONNECTION_SENT == cc->state)
+  {
+    /* We didn't get the CADET_CONNECTION_CREATE_ACK, but instead got payload. That's fine,
+       clearly something is working, so pretend we got an ACK. */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Faking connection CADET_CONNECTION_CREATE_ACK for %s due to KX\n",
+         GCC_2s (cc));
+    GCC_handle_connection_create_ack (cc);
+  }
+  GCT_handle_kx_auth (cc->ct,
+                      msg);
+}
+
+
 /**
  * Handle encrypted message.
  *
@@ -401,6 +584,7 @@ GCC_handle_encrypted (struct CadetConnection *cc,
          GCC_2s (cc));
     GCC_handle_connection_create_ack (cc);
   }
+  cc->metrics.last_use = GNUNET_TIME_absolute_get ();
   GCT_handle_encrypted (cc->ct,
                         msg);
 }
@@ -427,6 +611,7 @@ send_create (void *cls)
   env = GNUNET_MQ_msg_extra (create_msg,
                              (1 + path_length) * sizeof (struct GNUNET_PeerIdentity),
                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
+  create_msg->options = htonl ((uint32_t) cc->options);
   create_msg->cid = cc->cid;
   pids = (struct GNUNET_PeerIdentity *) &create_msg[1];
   pids[0] = my_full_id;
@@ -437,8 +622,9 @@ send_create (void *cls)
        "Sending CADET_CONNECTION_CREATE message for %s\n",
        GCC_2s (cc));
   cc->env = env;
-  cc->mqm_ready = GNUNET_NO;
-  cc->state = CADET_CONNECTION_SENT;
+  update_state (cc,
+                CADET_CONNECTION_SENT,
+                GNUNET_NO);
   GCP_send (cc->mq_man,
             env);
 }
@@ -466,8 +652,9 @@ send_create_ack (void *cls)
                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK);
   ack_msg->cid = cc->cid;
   cc->env = env;
-  cc->mqm_ready = GNUNET_NO;
-  cc->state = CADET_CONNECTION_READY;
+  update_state (cc,
+                CADET_CONNECTION_READY,
+                GNUNET_NO);
   GCP_send (cc->mq_man,
             env);
 }
@@ -489,13 +676,11 @@ GCC_handle_duplicate_create (struct CadetConnection *cc)
          "Got duplicate CREATE for %s, scheduling another ACK (%s)\n",
          GCC_2s (cc),
          (GNUNET_YES == cc->mqm_ready) ? "MQM ready" : "MQM busy");
-    /* Tell tunnel that we are not ready for transmission anymore
-       (until CREATE_ACK is done) */
-    cc->ready_cb (cc->ready_cb_cls,
-                  GNUNET_NO);
     /* Revert back to the state of having only received the 'CREATE',
        and immediately proceed to send the CREATE_ACK. */
-    cc->state = CADET_CONNECTION_CREATE_RECEIVED;
+    update_state (cc,
+                  CADET_CONNECTION_CREATE_RECEIVED,
+                  cc->mqm_ready);
     if (NULL != cc->task)
       GNUNET_SCHEDULER_cancel (cc->task);
     cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
@@ -535,20 +720,21 @@ manage_first_hop_mq (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Core MQ for %s went down\n",
          GCC_2s (cc));
-    cc->mqm_ready = GNUNET_NO;
-    cc->state = CADET_CONNECTION_NEW;
+    update_state (cc,
+                  CADET_CONNECTION_NEW,
+                  GNUNET_NO);
     cc->retry_delay = GNUNET_TIME_UNIT_ZERO;
     if (NULL != cc->task)
     {
       GNUNET_SCHEDULER_cancel (cc->task);
       cc->task = NULL;
     }
-    cc->ready_cb (cc->ready_cb_cls,
-                  GNUNET_NO);
     return;
   }
 
-  cc->mqm_ready = GNUNET_YES;
+  update_state (cc,
+                cc->state,
+                GNUNET_YES);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Core MQ for %s became available in state %d\n",
        GCC_2s (cc),
@@ -573,12 +759,11 @@ manage_first_hop_mq (void *cls,
     break;
   case CADET_CONNECTION_CREATE_RECEIVED:
     /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */
+    cc->metrics.age = GNUNET_TIME_absolute_get ();
     cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
                                          cc);
     break;
   case CADET_CONNECTION_READY:
-    cc->ready_cb (cc->ready_cb_cls,
-                  GNUNET_YES);
     if ( (NULL == cc->keepalive_qe) &&
          (GNUNET_YES == cc->mqm_ready) &&
          (NULL == cc->task) )
@@ -604,6 +789,8 @@ manage_first_hop_mq (void *cls,
  *
  * @param destination where to go
  * @param path which path to take (may not be the full path)
+ * @param off offset of @a destination on @a path
+ * @param options options for the connection
  * @param ct which tunnel uses this connection
  * @param init_state initial state for the connection
  * @param ready_cb function to call when ready to transmit
@@ -613,6 +800,8 @@ manage_first_hop_mq (void *cls,
 static struct CadetConnection *
 connection_create (struct CadetPeer *destination,
                    struct CadetPeerPath *path,
+                   unsigned int off,
+                   enum GNUNET_CADET_ChannelOption options,
                    struct CadetTConnection *ct,
                    const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
                    enum CadetConnectionState init_state,
@@ -621,12 +810,9 @@ connection_create (struct CadetPeer *destination,
 {
   struct CadetConnection *cc;
   struct CadetPeer *first_hop;
-  unsigned int off;
 
-  off = GCPP_find_peer (path,
-                        destination);
-  GNUNET_assert (UINT_MAX > off);
   cc = GNUNET_new (struct CadetConnection);
+  cc->options = options;
   cc->state = init_state;
   cc->ct = ct;
   cc->cid = *cid;
@@ -648,7 +834,7 @@ connection_create (struct CadetPeer *destination,
                        cc);
   for (unsigned int i=0;i<off;i++)
     GCP_add_connection (GCPP_get_peer_at_offset (path,
-                                                 off),
+                                                 i),
                         cc);
 
   first_hop = GCPP_get_peer_at_offset (path,
@@ -667,6 +853,7 @@ connection_create (struct CadetPeer *destination,
  *
  * @param destination where to go
  * @param path which path to take (may not be the full path)
+ * @param options options for the connection
  * @param ct which tunnel uses this connection
  * @param ready_cb function to call when ready to transmit
  * @param ready_cb_cls closure for @a cb
@@ -676,6 +863,7 @@ connection_create (struct CadetPeer *destination,
 struct CadetConnection *
 GCC_create_inbound (struct CadetPeer *destination,
                     struct CadetPeerPath *path,
+                    enum GNUNET_CADET_ChannelOption options,
                     struct CadetTConnection *ct,
                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
                     GCC_ReadyCallback ready_cb,
@@ -731,6 +919,8 @@ GCC_create_inbound (struct CadetPeer *destination,
 
   return connection_create (destination,
                             path,
+                            off,
+                            options,
                             ct,
                             cid,
                             CADET_CONNECTION_CREATE_RECEIVED,
@@ -745,6 +935,8 @@ GCC_create_inbound (struct CadetPeer *destination,
  *
  * @param destination where to go
  * @param path which path to take (may not be the full path)
+ * @param off offset of @a destination on @a path
+ * @param options options for the connection
  * @param ct tunnel that uses the connection
  * @param ready_cb function to call when ready to transmit
  * @param ready_cb_cls closure for @a cb
@@ -753,6 +945,8 @@ GCC_create_inbound (struct CadetPeer *destination,
 struct CadetConnection *
 GCC_create (struct CadetPeer *destination,
             struct CadetPeerPath *path,
+            unsigned int off,
+            enum GNUNET_CADET_ChannelOption options,
             struct CadetTConnection *ct,
             GCC_ReadyCallback ready_cb,
             void *ready_cb_cls)
@@ -764,6 +958,8 @@ GCC_create (struct CadetPeer *destination,
                               sizeof (cid));
   return connection_create (destination,
                             path,
+                            off,
+                            options,
                             ct,
                             &cid,
                             CADET_CONNECTION_NEW,
@@ -791,6 +987,7 @@ GCC_transmit (struct CadetConnection *cc,
        GCC_2s (cc));
   GNUNET_assert (GNUNET_YES == cc->mqm_ready);
   GNUNET_assert (CADET_CONNECTION_READY == cc->state);
+  cc->metrics.last_use = GNUNET_TIME_absolute_get ();
   cc->mqm_ready = GNUNET_NO;
   if (NULL != cc->task)
   {