* @author Christian Grothoff
*
* FIXME:
- * - connection management
- * + properly (evaluate, kill old ones, search for new ones)
- * + when managing connections, distinguish those that
- * have (recently) had traffic from those that were
- * never ready (or not recently)
+ * - proper connection evaluation during connection management:
+ * + consider quality (or quality spread?) of current connection set
+ * when deciding how often to do maintenance
+ * + interact with PEER to drive DHT GET/PUT operations based
+ * on how much we like our connections
*/
#include "platform.h"
#include "gnunet_util_lib.h"
/**
* Continuation to call once sent (on the channel layer).
*/
- GNUNET_SCHEDULER_TaskCallback cont;
+ GCT_SendContinuation cont;
/**
* Closure for @c cont.
*/
struct CadetTunnelQueueEntry *tq_tail;
+ /**
+ * Identification of the connection from which we are currently processing
+ * a message. Only valid (non-NULL) during #handle_decrypted() and the
+ * handle-*()-functions called from our @e mq during that function.
+ */
+ struct CadetTConnection *current_ct;
+
/**
* How long do we wait until we retry the KX?
*/
sizeof (kx_auth)))
{
/* This KX_AUTH is not using the latest KX/KX_AUTH data
- we transmitted to the sender, refuse it! */
+ we transmitted to the sender, refuse it, try KX again. */
GNUNET_break_op (0);
+ send_kx (t,
+ NULL,
+ &t->ax);
return;
}
/* Yep, we're good. */
}
+/**
+ * Clean up connection @a ct of a tunnel.
+ *
+ * @param cls the `struct CadetTunnel`
+ * @param ct connection to clean up
+ */
+static void
+destroy_t_connection (void *cls,
+ struct CadetTConnection *ct)
+{
+ struct CadetTunnel *t = cls;
+ struct CadetConnection *cc = ct->cc;
+
+ GNUNET_assert (ct->t == t);
+ GCT_connection_lost (ct);
+ GCC_destroy_without_tunnel (cc);
+}
+
+
/**
* This tunnel is no longer used, destroy it.
*
destroy_tunnel (void *cls)
{
struct CadetTunnel *t = cls;
- struct CadetTConnection *ct;
struct CadetTunnelQueueEntry *tq;
t->destroy_task = NULL;
"Destroying idle %s\n",
GCT_2s (t));
GNUNET_assert (0 == GCT_count_channels (t));
- while (NULL != (ct = t->connection_ready_head))
- {
- struct CadetConnection *cc;
-
- GNUNET_assert (ct->t == t);
- cc = ct->cc;
- GCT_connection_lost (ct);
- GCC_destroy_without_tunnel (cc);
- }
- while (NULL != (ct = t->connection_busy_head))
- {
- struct CadetConnection *cc;
-
- GNUNET_assert (ct->t == t);
- cc = ct->cc;
- GCT_connection_lost (ct);
- GCC_destroy_without_tunnel (cc);
- }
+ GCT_iterate_connections (t,
+ &destroy_t_connection,
+ t);
+ GNUNET_assert (NULL == t->connection_ready_head);
+ GNUNET_assert (NULL == t->connection_busy_head);
while (NULL != (tq = t->tq_head))
{
if (NULL != tq->cont)
- tq->cont (tq->cont_cls);
+ tq->cont (tq->cont_cls,
+ NULL);
GCT_send_cancel (tq);
}
GCP_drop_tunnel (t->destination,
GNUNET_free (t->unverified_ax);
}
cleanup_ax (&t->ax);
+ GNUNET_assert (NULL == t->destroy_task);
GNUNET_free (t);
}
GNUNET_CONTAINER_multihashmap32_remove (t->channels,
ntohl (ctn.cn),
ch));
- if (0 ==
- GCT_count_channels (t))
+ if ( (0 ==
+ GCT_count_channels (t)) &&
+ (NULL == t->destroy_task) )
{
- t->destroy_task = GNUNET_SCHEDULER_add_delayed (IDLE_DESTROY_DELAY,
- &destroy_tunnel,
- t);
+ t->destroy_task
+ = GNUNET_SCHEDULER_add_delayed (IDLE_DESTROY_DELAY,
+ &destroy_tunnel,
+ t);
}
}
{
struct CadetChannel *ch = value;
- GCCH_handle_remote_destroy (ch);
+ GCCH_handle_remote_destroy (ch,
+ NULL);
return GNUNET_OK;
}
GCC_transmit (ct->cc,
tq->env);
if (NULL != tq->cont)
- tq->cont (tq->cont_cls);
+ tq->cont (tq->cont_cls,
+ GCC_get_id (ct->cc));
GNUNET_free (tq);
}
GNUNET_CONTAINER_HeapCostType max_desire;
/**
- * Path we are comparing against.
+ * Path we are comparing against for #evaluate_connection, can be NULL.
*/
struct CadetPeerPath *path;
+ /**
+ * Connection deemed the "worst" so far encountered by #evaluate_connection,
+ * NULL if we did not yet encounter any connections.
+ */
+ struct CadetTConnection *worst;
+
+ /**
+ * Numeric score of @e worst, only set if @e worst is non-NULL.
+ */
+ double worst_score;
+
/**
* Set to #GNUNET_YES if we have a connection over @e path already.
*/
* what kinds of connections we have.
*
* @param cls the `struct EvaluationSummary *` to update
- * @param cc a connection to include in the summary
+ * @param ct a connection to include in the summary
*/
static void
evaluate_connection (void *cls,
- struct CadetConnection *cc)
+ struct CadetTConnection *ct)
{
struct EvaluationSummary *es = cls;
+ struct CadetConnection *cc = ct->cc;
struct CadetPeerPath *ps = GCC_get_path (cc);
+ const struct CadetConnectionMetrics *metrics;
+ GNUNET_CONTAINER_HeapCostType ct_desirability;
+ struct GNUNET_TIME_Relative uptime;
+ struct GNUNET_TIME_Relative last_use;
+ uint32_t ct_length;
+ double score;
+ double success_rate;
if (ps == es->path)
{
es->duplicate = GNUNET_YES;
return;
}
+ ct_desirability = GCPP_get_desirability (ps);
+ ct_length = GCPP_get_length (ps);
+ metrics = GCC_get_metrics (cc);
+ uptime = GNUNET_TIME_absolute_get_duration (metrics->age);
+ last_use = GNUNET_TIME_absolute_get_duration (metrics->last_use);
+ /* We add 1.0 here to avoid division by zero. */
+ success_rate = (metrics->num_acked_transmissions + 1.0) / (metrics->num_successes + 1.0);
+ score
+ = ct_desirability
+ + 100.0 / (1.0 + ct_length) /* longer paths = better */
+ + sqrt (uptime.rel_value_us / 60000000LL) /* larger uptime = better */
+ - last_use.rel_value_us / 1000L; /* longer idle = worse */
+ score *= success_rate; /* weigh overall by success rate */
+
+ if ( (NULL == es->worst) ||
+ (score < es->worst_score) )
+ {
+ es->worst = ct;
+ es->worst_score = score;
+ }
es->min_length = GNUNET_MIN (es->min_length,
- GCPP_get_length (ps));
+ ct_length);
es->max_length = GNUNET_MAX (es->max_length,
- GCPP_get_length (ps));
+ ct_length);
es->min_desire = GNUNET_MIN (es->min_desire,
- GCPP_get_desirability (ps));
+ ct_desirability);
es->max_desire = GNUNET_MAX (es->max_desire,
- GCPP_get_desirability (ps));
+ ct_desirability);
}
this one is more than twice as long than what we are currently
using, then ignore all of these super-long ones! */
if ( (GCT_count_any_connections (t) > DESIRED_CONNECTIONS_PER_TUNNEL) &&
- (es.min_length * 2 < off) )
+ (es.min_length * 2 < off) &&
+ (es.max_length < off) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring paths of length %u, they are way too long.\n",
/* If we have enough paths and this one looks no better, ignore it. */
if ( (GCT_count_any_connections (t) >= DESIRED_CONNECTIONS_PER_TUNNEL) &&
(es.min_length < GCPP_get_length (path)) &&
- (es.max_desire > GCPP_get_desirability (path)) )
+ (es.min_desire > GCPP_get_desirability (path)) &&
+ (es.max_length < off) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring path (%u/%llu) to %s, got something better already.\n",
ct->t = t;
ct->cc = GCC_create (t->destination,
path,
+ GNUNET_CADET_OPTION_DEFAULT, /* FIXME: set based on what channels want/need! */
ct,
&connection_ready_cb,
ct);
+
/* FIXME: schedule job to kill connection (and path?) if it takes
too long to get ready! (And track performance data on how long
other connections took with the tunnel!)
maintain_connections_cb (void *cls)
{
struct CadetTunnel *t = cls;
+ struct GNUNET_TIME_Relative delay;
+ struct EvaluationSummary es;
t->maintain_connections_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Performing connection maintenance for %s.\n",
GCT_2s (t));
+ es.min_length = UINT_MAX;
+ es.max_length = 0;
+ es.max_desire = 0;
+ es.min_desire = UINT64_MAX;
+ es.path = NULL;
+ es.worst = NULL;
+ es.duplicate = GNUNET_NO;
+ GCT_iterate_connections (t,
+ &evaluate_connection,
+ &es);
+ if ( (NULL != es.worst) &&
+ (GCT_count_any_connections (t) > DESIRED_CONNECTIONS_PER_TUNNEL) )
+ {
+ /* Clear out worst-performing connection 'es.worst'. */
+ destroy_t_connection (t,
+ es.worst);
+ }
+
+ /* Consider additional paths */
(void) GCP_iterate_paths (t->destination,
&consider_path_cb,
t);
- GNUNET_break (0); // FIXME: implement!
+ /* FIXME: calculate when to try again based on how well we are doing;
+ in particular, if we have to few connections, we might be able
+ to do without this (as PATHS should tell us whenever a new path
+ is available instantly; however, need to make sure this job is
+ restarted after that happens).
+ Furthermore, if the paths we do know are in a reasonably narrow
+ quality band and are plentyful, we might also consider us stabilized
+ and then reduce the frequency accordingly. */
+ delay = GNUNET_TIME_UNIT_MINUTES;
+ t->maintain_connections_task
+ = GNUNET_SCHEDULER_add_delayed (delay,
+ &maintain_connections_cb,
+ t);
}
return;
}
GCCH_handle_channel_plaintext_data (ch,
+ GCC_get_id (t->current_ct->cc),
msg);
}
return;
}
GCCH_handle_channel_plaintext_data_ack (ch,
+ GCC_get_id (t->current_ct->cc),
ack);
}
GNUNET_h2s (&copen->port),
GCT_2s (t),
GCCH_2s (ch));
- GCCH_handle_duplicate_open (ch);
+ GCCH_handle_duplicate_open (ch,
+ GCC_get_id (t->current_ct->cc));
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received channel OPEN_ACK on channel %s from %s\n",
GCCH_2s (ch),
GCT_2s (t));
- GCCH_handle_channel_open_ack (ch);
+ GCCH_handle_channel_open_ack (ch,
+ GCC_get_id (t->current_ct->cc));
}
"Receicved channel DESTROY on %s from %s\n",
GCCH_2s (ch),
GCT_2s (t));
- GCCH_handle_remote_destroy (ch);
+ GCCH_handle_remote_destroy (ch,
+ GCC_get_id (t->current_ct->cc));
}
{
struct CadetTunnel *t = cls;
+ GNUNET_assert (NULL != t->current_ct);
GNUNET_MQ_inject_message (t->mq,
msg);
return GNUNET_OK;
*
* @param t a tunnel
* @param cid connection identifer to use for the connection
+ * @param options options for the connection
* @param path path to use for the connection
* @return #GNUNET_OK on success,
* #GNUNET_SYSERR on failure (duplicate connection)
int
GCT_add_inbound_connection (struct CadetTunnel *t,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+ enum GNUNET_CADET_ChannelOption options,
struct CadetPeerPath *path)
{
struct CadetTConnection *ct;
ct->t = t;
ct->cc = GCC_create_inbound (t->destination,
path,
+ options,
ct,
cid,
&connection_ready_cb,
}
/* The MST will ultimately call #handle_decrypted() on each message. */
+ t->current_ct = ct;
GNUNET_break_op (GNUNET_OK ==
GNUNET_MST_from_buffer (t->mst,
cbuf,
decrypted_size,
GNUNET_YES,
GNUNET_NO));
+ t->current_ct = NULL;
}
struct CadetTunnelQueueEntry *
GCT_send (struct CadetTunnel *t,
const struct GNUNET_MessageHeader *message,
- GNUNET_SCHEDULER_TaskCallback cont,
+ GCT_SendContinuation cont,
void *cont_cls)
{
struct CadetTunnelQueueEntry *tq;
{
n = ct->next;
iter (iter_cls,
- ct->cc);
+ ct);
}
for (struct CadetTConnection *ct = t->connection_busy_head;
NULL != ct;
{
n = ct->next;
iter (iter_cls,
- ct->cc);
+ ct);
}
}