X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcadet%2Fgnunet-service-cadet-new_connection.c;h=6976e66e4af6406b88c705d48165f3215136b9fc;hb=0af32e03677ab1c8a819b376c8fa026d0ffa9144;hp=a098674f51643affb4c5053a6c23f4c5ef59703a;hpb=263e1a188460cc89c1341422a7338aaf0645b190;p=oweals%2Fgnunet.git diff --git a/src/cadet/gnunet-service-cadet-new_connection.c b/src/cadet/gnunet-service-cadet-new_connection.c index a098674f5..6976e66e4 100644 --- a/src/cadet/gnunet-service-cadet-new_connection.c +++ b/src/cadet/gnunet-service-cadet-new_connection.c @@ -1,4 +1,3 @@ - /* This file is part of GNUnet. Copyright (C) 2001-2017 GNUnet e.V. @@ -25,9 +24,6 @@ * end-to-end routes and transmits messages along the route * @author Bartlomiej Polot * @author Christian Grothoff - * - * TODO: - * - Optimization: keep per-connection performance metrics (?) */ #include "platform.h" #include "gnunet-service-cadet-new.h" @@ -139,11 +135,26 @@ struct CadetConnection */ struct GNUNET_TIME_Relative retry_delay; + /** + * Performance metrics for this connection. + */ + struct CadetConnectionMetrics metrics; + /** * State of the connection. */ enum CadetConnectionState state; + /** + * Options for the route, control buffering. + */ + enum GNUNET_CADET_ChannelOption options; + + /** + * How many latency observations did we make for this connection? + */ + unsigned int latency_datapoints; + /** * Offset of our @e destination in @e path. */ @@ -157,6 +168,51 @@ struct CadetConnection }; +/** + * Lookup a connection by its identifier. + * + * @param cid identifier to resolve + * @return NULL if connection was not found + */ +struct CadetConnection * +GCC_lookup (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) +{ + return GNUNET_CONTAINER_multishortmap_get (connections, + &cid->connection_of_tunnel); +} + + +/** + * Update the connection state. Also triggers the necessary + * MQM notifications. + * + * @param cc connection to update the state for + * @param new_state new state for @a cc + * @param new_mqm_ready new `mqm_ready` state for @a cc + */ +static void +update_state (struct CadetConnection *cc, + enum CadetConnectionState new_state, + int new_mqm_ready) +{ + int old_ready; + int new_ready; + + if ( (new_state == cc->state) && + (new_mqm_ready == cc->mqm_ready) ) + return; /* no change, nothing to do */ + old_ready = ( (CADET_CONNECTION_READY == cc->state) && + (GNUNET_YES == cc->mqm_ready) ); + new_ready = ( (CADET_CONNECTION_READY == new_state) && + (GNUNET_YES == new_mqm_ready) ); + cc->state = new_state; + cc->mqm_ready = new_mqm_ready; + if (old_ready != new_ready) + cc->ready_cb (cc->ready_cb_cls, + new_ready); +} + + /** * Destroy a connection, part of the internal implementation. Called * only from #GCC_destroy_from_core() or #GCC_destroy_from_tunnel(). @@ -188,6 +244,10 @@ GCC_destroy (struct CadetConnection *cc) GCPP_del_connection (cc->path, cc->off, cc); + for (unsigned int i=0;ioff;i++) + GCP_remove_connection (GCPP_get_peer_at_offset (cc->path, + i), + cc); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multishortmap_remove (connections, &GCC_get_id (cc)->connection_of_tunnel, @@ -259,6 +319,19 @@ GCC_get_ct (struct CadetConnection *cc) } +/** + * Obtain performance @a metrics from @a cc. + * + * @param cc connection to query + * @return the metrics + */ +const struct CadetConnectionMetrics * +GCC_get_metrics (struct CadetConnection *cc) +{ + return &cc->metrics; +} + + /** * Send a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_KEEPALIVE through the * tunnel to prevent it from timing out. @@ -274,9 +347,12 @@ send_keepalive (void *cls); * schedule the next one. * * @param cls the `struct CadetConnection` to keep alive. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -keepalive_done (void *cls) +keepalive_done (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetConnection *cc = cls; @@ -302,6 +378,14 @@ send_keepalive (void *cls) struct GNUNET_MessageHeader msg; cc->task = NULL; + if (CADET_TUNNEL_KEY_OK != GCT_get_estate (cc->ct->t)) + { + /* Tunnel not yet ready, wait with keepalives... */ + cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period, + &send_keepalive, + cc); + return; + } GNUNET_assert (NULL != cc->ct); GNUNET_assert (GNUNET_YES == cc->mqm_ready); GNUNET_assert (NULL == cc->keepalive_qe); @@ -324,6 +408,81 @@ send_keepalive (void *cls) } +/** + * We sent a message for which we expect to receive an ACK via + * the connection identified by @a cti. + * + * @param cid connection identifier where we expect an ACK + */ +void +GCC_ack_expected (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) +{ + struct CadetConnection *cc; + + cc = GCC_lookup (cid); + if (NULL == cc) + return; /* whopise, connection alredy down? */ + cc->metrics.num_acked_transmissions++; +} + + +/** + * We observed an ACK for a message that was originally sent via + * the connection identified by @a cti. + * + * @param cti connection identifier where we got an ACK for a message + * that was originally sent via this connection (the ACK + * may have gotten back to us via a different connection). + */ +void +GCC_ack_observed (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) +{ + struct CadetConnection *cc; + + cc = GCC_lookup (cid); + if (NULL == cc) + return; /* whopise, connection alredy down? */ + cc->metrics.num_successes++; +} + + +/** + * We observed some the given @a latency on the connection + * identified by @a cti. (The same connection was taken + * in both directions.) + * + * @param cid connection identifier where we measured latency + * @param latency the observed latency + */ +void +GCC_latency_observed (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, + struct GNUNET_TIME_Relative latency) +{ + struct CadetConnection *cc; + double weight; + double result; + + cc = GCC_lookup (cid); + if (NULL == cc) + return; /* whopise, connection alredy down? */ + GNUNET_STATISTICS_update (stats, + "# latencies observed", + 1, + GNUNET_NO); + cc->latency_datapoints++; + if (cc->latency_datapoints >= 7) + weight = 7.0; + else + weight = cc->latency_datapoints; + /* Compute weighted average, giving at MOST weight 7 to the + existing values, or less if that value is based on fewer than 7 + measurements. */ + result = (weight * cc->metrics.aged_latency.rel_value_us) + 1.0 * latency.rel_value_us; + result /= (weight + 1.0); + cc->metrics.aged_latency.rel_value_us = (uint64_t) result; +} + + /** * A #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK was received for this connection, implying * that the end-to-end connection is up. Process it. @@ -338,23 +497,23 @@ GCC_handle_connection_create_ack (struct CadetConnection *cc) GCC_2s (cc), cc->state, (GNUNET_YES == cc->mqm_ready) ? "MQM ready" : "MQM busy"); + if (CADET_CONNECTION_READY == cc->state) + return; /* Duplicate ACK, ignore */ if (NULL != cc->task) { GNUNET_SCHEDULER_cancel (cc->task); cc->task = NULL; } - cc->state = CADET_CONNECTION_READY; - if (GNUNET_YES == cc->mqm_ready) - { - cc->ready_cb (cc->ready_cb_cls, - GNUNET_YES); - if ( (NULL == cc->keepalive_qe) && - (GNUNET_YES == cc->mqm_ready) && - (NULL == cc->task) ) - cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period, - &send_keepalive, - cc); - } + cc->metrics.age = GNUNET_TIME_absolute_get (); + update_state (cc, + CADET_CONNECTION_READY, + cc->mqm_ready); + if ( (NULL == cc->keepalive_qe) && + (GNUNET_YES == cc->mqm_ready) && + (NULL == cc->task) ) + cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period, + &send_keepalive, + cc); } @@ -382,6 +541,30 @@ GCC_handle_kx (struct CadetConnection *cc, } +/** + * Handle KX_AUTH message. + * + * @param cc connection that received encrypted message + * @param msg the key exchange message + */ +void +GCC_handle_kx_auth (struct CadetConnection *cc, + const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg) +{ + if (CADET_CONNECTION_SENT == cc->state) + { + /* We didn't get the CADET_CONNECTION_CREATE_ACK, but instead got payload. That's fine, + clearly something is working, so pretend we got an ACK. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Faking connection CADET_CONNECTION_CREATE_ACK for %s due to KX\n", + GCC_2s (cc)); + GCC_handle_connection_create_ack (cc); + } + GCT_handle_kx_auth (cc->ct, + msg); +} + + /** * Handle encrypted message. * @@ -401,6 +584,7 @@ GCC_handle_encrypted (struct CadetConnection *cc, GCC_2s (cc)); GCC_handle_connection_create_ack (cc); } + cc->metrics.last_use = GNUNET_TIME_absolute_get (); GCT_handle_encrypted (cc->ct, msg); } @@ -427,6 +611,7 @@ send_create (void *cls) env = GNUNET_MQ_msg_extra (create_msg, (1 + path_length) * sizeof (struct GNUNET_PeerIdentity), GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); + create_msg->options = htonl ((uint32_t) cc->options); create_msg->cid = cc->cid; pids = (struct GNUNET_PeerIdentity *) &create_msg[1]; pids[0] = my_full_id; @@ -437,8 +622,9 @@ send_create (void *cls) "Sending CADET_CONNECTION_CREATE message for %s\n", GCC_2s (cc)); cc->env = env; - cc->mqm_ready = GNUNET_NO; - cc->state = CADET_CONNECTION_SENT; + update_state (cc, + CADET_CONNECTION_SENT, + GNUNET_NO); GCP_send (cc->mq_man, env); } @@ -466,8 +652,9 @@ send_create_ack (void *cls) GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK); ack_msg->cid = cc->cid; cc->env = env; - cc->mqm_ready = GNUNET_NO; - cc->state = CADET_CONNECTION_READY; + update_state (cc, + CADET_CONNECTION_READY, + GNUNET_NO); GCP_send (cc->mq_man, env); } @@ -489,13 +676,11 @@ GCC_handle_duplicate_create (struct CadetConnection *cc) "Got duplicate CREATE for %s, scheduling another ACK (%s)\n", GCC_2s (cc), (GNUNET_YES == cc->mqm_ready) ? "MQM ready" : "MQM busy"); - /* Tell tunnel that we are not ready for transmission anymore - (until CREATE_ACK is done) */ - cc->ready_cb (cc->ready_cb_cls, - GNUNET_NO); /* Revert back to the state of having only received the 'CREATE', and immediately proceed to send the CREATE_ACK. */ - cc->state = CADET_CONNECTION_CREATE_RECEIVED; + update_state (cc, + CADET_CONNECTION_CREATE_RECEIVED, + cc->mqm_ready); if (NULL != cc->task) GNUNET_SCHEDULER_cancel (cc->task); cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack, @@ -535,20 +720,21 @@ manage_first_hop_mq (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Core MQ for %s went down\n", GCC_2s (cc)); - cc->mqm_ready = GNUNET_NO; - cc->state = CADET_CONNECTION_NEW; + update_state (cc, + CADET_CONNECTION_NEW, + GNUNET_NO); cc->retry_delay = GNUNET_TIME_UNIT_ZERO; if (NULL != cc->task) { GNUNET_SCHEDULER_cancel (cc->task); cc->task = NULL; } - cc->ready_cb (cc->ready_cb_cls, - GNUNET_NO); return; } - cc->mqm_ready = GNUNET_YES; + update_state (cc, + cc->state, + GNUNET_YES); LOG (GNUNET_ERROR_TYPE_DEBUG, "Core MQ for %s became available in state %d\n", GCC_2s (cc), @@ -573,12 +759,11 @@ manage_first_hop_mq (void *cls, break; case CADET_CONNECTION_CREATE_RECEIVED: /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */ + cc->metrics.age = GNUNET_TIME_absolute_get (); cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack, cc); break; case CADET_CONNECTION_READY: - cc->ready_cb (cc->ready_cb_cls, - GNUNET_YES); if ( (NULL == cc->keepalive_qe) && (GNUNET_YES == cc->mqm_ready) && (NULL == cc->task) ) @@ -604,6 +789,8 @@ manage_first_hop_mq (void *cls, * * @param destination where to go * @param path which path to take (may not be the full path) + * @param off offset of @a destination on @a path + * @param options options for the connection * @param ct which tunnel uses this connection * @param init_state initial state for the connection * @param ready_cb function to call when ready to transmit @@ -613,6 +800,8 @@ manage_first_hop_mq (void *cls, static struct CadetConnection * connection_create (struct CadetPeer *destination, struct CadetPeerPath *path, + unsigned int off, + enum GNUNET_CADET_ChannelOption options, struct CadetTConnection *ct, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, enum CadetConnectionState init_state, @@ -621,12 +810,9 @@ connection_create (struct CadetPeer *destination, { struct CadetConnection *cc; struct CadetPeer *first_hop; - unsigned int off; - off = GCPP_find_peer (path, - destination); - GNUNET_assert (UINT_MAX > off); cc = GNUNET_new (struct CadetConnection); + cc->options = options; cc->state = init_state; cc->ct = ct; cc->cid = *cid; @@ -648,7 +834,7 @@ connection_create (struct CadetPeer *destination, cc); for (unsigned int i=0;imqm_ready); GNUNET_assert (CADET_CONNECTION_READY == cc->state); + cc->metrics.last_use = GNUNET_TIME_absolute_get (); cc->mqm_ready = GNUNET_NO; if (NULL != cc->task) {