From f73216a9c9515d444213b6aa7701c5155f071c81 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 29 Jan 2017 16:24:14 +0100 Subject: [PATCH] implement route timeouts at 3x keepalive frequency --- src/cadet/cadet.conf.in | 32 ++++- src/cadet/gnunet-service-cadet-new_core.c | 157 +++++++++++++++++++--- src/cadet/gnunet-service-cadet-new_peer.c | 1 - 3 files changed, 169 insertions(+), 21 deletions(-) diff --git a/src/cadet/cadet.conf.in b/src/cadet/cadet.conf.in index a6d762786..86ba2e535 100644 --- a/src/cadet/cadet.conf.in +++ b/src/cadet/cadet.conf.in @@ -11,13 +11,43 @@ UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-cadet.sock UNIX_MATCH_UID = YES UNIX_MATCH_GID = YES + +# How often do we send KEEPALIVE messages on connections to keep them +# from timing out? REFRESH_CONNECTION_TIME = 5 min + +# Percentage of packets CADET is artificially dropping. Used for testing only! +# DROP_PERCENT = + +# How frequently do we usually anounce our presence in the DHT? ID_ANNOUNCE_TIME = 1 h + +# FIXME: document CONNECT_TIMEOUT = 30 s + +# What is the replication level we give to the DHT when announcing our +# existence? Usually there is no need to change this. DHT_REPLICATION_LEVEL = 3 -MAX_TUNNELS = 1000 + +# FIXME: not implemented +# MAX_TUNNELS = 1000 + +# FIXME: not implemented, replaced by MAX_ROUTES in NEW CADET! MAX_CONNECTIONS = 1000 + +# How many routes do we participate in at most? Should be smaller +# than MAX_MSGS_QUEUE +MAX_ROUTES = 5000 + +# FIXME: not implemented MAX_MSGS_QUEUE = 10000 + +# FIXME: not implemented MAX_PEERS = 1000 + +# How often do we advance the ratchet even if there is not +# any traffic? RATCHET_TIME = 1 h + +# How often do we advance the ratched if there is traffic? RATCHET_MESSAGES = 64 diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c index 956a1207d..25ffcb3ce 100644 --- a/src/cadet/gnunet-service-cadet-new_core.c +++ b/src/cadet/gnunet-service-cadet-new_core.c @@ -27,6 +27,7 @@ * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) * * TODO: + * - properly implement GLOBAL message buffer, instead of per-route buffers * - Optimization: given BROKEN messages, destroy paths (?) */ #include "platform.h" @@ -123,6 +124,10 @@ struct CadetRoute */ struct GNUNET_TIME_Absolute last_use; + /** + * Position of this route in the #route_heap. + */ + struct GNUNET_CONTAINER_HeapNode *hn; }; @@ -136,6 +141,21 @@ static struct GNUNET_CORE_Handle *core; */ static struct GNUNET_CONTAINER_MultiShortmap *routes; +/** + * Heap of routes, MIN-sorted by last activity. + */ +static struct GNUNET_CONTAINER_Heap *route_heap; + +/** + * Maximum number of concurrent routes this peer will support. + */ +static unsigned long long max_routes; + +/** + * Task to timeout routes. + */ +static struct GNUNET_SCHEDULER_Task *timeout_task; + /** * Get the route corresponding to a hash. @@ -187,6 +207,9 @@ route_message (struct CadetPeer *prev, env); return; } + route->last_use = GNUNET_TIME_absolute_get (); + GNUNET_CONTAINER_heap_update_cost (route->hn, + route->last_use.abs_value_us); dir = (prev == route->prev.hop) ? &route->next : &route->prev; if (GNUNET_YES == dir->is_ready) { @@ -294,6 +317,8 @@ destroy_route (struct CadetRoute *route) GNUNET_i2s (GCP_get_id (route->prev.hop)), GNUNET_i2s2 (GCP_get_id (route->next.hop)), GNUNET_sh2s (&route->cid.connection_of_tunnel)); + GNUNET_assert (route == + GNUNET_CONTAINER_heap_remove_node (route->hn)); destroy_direction (&route->prev); destroy_direction (&route->next); GNUNET_free (route); @@ -340,6 +365,49 @@ send_broken (struct RouteDirection *target, } +/** + * Function called to check if any routes have timed out, and if + * so, to clean them up. Finally, schedules itself again at the + * earliest time where there might be more work. + * + * @param cls NULL + */ +static void +timeout_cb (void *cls) +{ + struct CadetRoute *r; + struct GNUNET_TIME_Relative linger; + struct GNUNET_TIME_Absolute exp; + + timeout_task = NULL; + linger = GNUNET_TIME_relative_multiply (keepalive_period, + 3); + while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap))) + { + exp = GNUNET_TIME_absolute_add (r->last_use, + linger); + if (0 != GNUNET_TIME_absolute_get_duration (exp).rel_value_us) + { + /* Route not yet timed out, wait until it does. */ + timeout_task = GNUNET_SCHEDULER_add_at (exp, + &timeout_cb, + NULL); + return; + } + send_broken (&r->prev, + &r->cid, + NULL, + NULL); + send_broken (&r->next, + &r->cid, + NULL, + NULL); + destroy_route (r); + } + /* No more routes left, so no need for a #timeout_task */ +} + + /** * Function called when the message queue to the previous hop * becomes available/unavailable. We expect this function to @@ -407,6 +475,35 @@ dir_init (struct RouteDirection *dir, } +/** + * We could not create the desired route. Send a + * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN + * message to @a target. + * + * @param target who should receive the message + * @param cid identifier of the connection/route that failed + * @param failure_at neighbour with which we failed to route, + * or NULL. + */ +static void +send_broken_without_mqm (struct CadetPeer *target, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, + const struct GNUNET_PeerIdentity *failure_at) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_ConnectionBrokenMessage *bm; + + env = GNUNET_MQ_msg (bm, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); + bm->cid = *cid; + bm->peer1 = my_full_id; + if (NULL != failure_at) + bm->peer2 = *failure_at; + GCP_send_ooo (target, + env); +} + + /** * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE * @@ -492,20 +589,14 @@ handle_connection_create (void *cls, { /* Send back BROKEN: duplicate connection on the same path, we will use the other one. */ - struct GNUNET_MQ_Envelope *env; - struct GNUNET_CADET_ConnectionBrokenMessage *bm; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n", GCP_2s (sender), GNUNET_sh2s (&msg->cid.connection_of_tunnel), GCPP_2s (path)); - env = GNUNET_MQ_msg (bm, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); - bm->cid = msg->cid; - bm->peer1 = my_full_id; - GCP_send_ooo (sender, - env); + send_broken_without_mqm (sender, + &msg->cid, + NULL); return; } return; @@ -517,22 +608,26 @@ handle_connection_create (void *cls, (GNUNET_NO == GCP_has_core_connection (next)) ) { /* unworkable, send back BROKEN notification */ - struct GNUNET_MQ_Envelope *env; - struct GNUNET_CADET_ConnectionBrokenMessage *bm; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n", GCP_2s (sender), GNUNET_sh2s (&msg->cid.connection_of_tunnel), GNUNET_i2s (&pids[off + 1]), off + 1); - env = GNUNET_MQ_msg (bm, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); - bm->cid = msg->cid; - bm->peer1 = pids[off + 1]; - bm->peer2 = my_full_id; - GCP_send_ooo (sender, - env); + send_broken_without_mqm (sender, + &msg->cid, + &pids[off + 1]); + return; + } + if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n", + GCP_2s (sender), + GNUNET_sh2s (&msg->cid.connection_of_tunnel)); + send_broken_without_mqm (sender, + &msg->cid, + &pids[off - 1]); return; } @@ -545,6 +640,7 @@ handle_connection_create (void *cls, off + 1); route = GNUNET_new (struct CadetRoute); route->cid = msg->cid; + route->last_use = GNUNET_TIME_absolute_get (); dir_init (&route->prev, route, sender); @@ -556,6 +652,14 @@ handle_connection_create (void *cls, &route->cid.connection_of_tunnel, route, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + route->hn = GNUNET_CONTAINER_heap_insert (route_heap, + route, + route->last_use.abs_value_us); + if (NULL == timeout_task) + timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period, + 3), + &timeout_cb, + NULL); } @@ -953,8 +1057,15 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) GNUNET_MQ_handler_end () }; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (c, + "CADET", + "MAX_ROUTES", + &max_routes)) + max_routes = 10000; routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO); + route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); core = GNUNET_CORE_connect (c, NULL, &core_init_cb, @@ -977,6 +1088,14 @@ GCO_shutdown () } GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes)); GNUNET_CONTAINER_multishortmap_destroy (routes); + routes = NULL; + GNUNET_CONTAINER_heap_destroy (route_heap); + route_heap = NULL; + if (NULL != timeout_task) + { + GNUNET_SCHEDULER_cancel (timeout_task); + timeout_task = NULL; + } } /* end of gnunet-cadet-service_core.c */ diff --git a/src/cadet/gnunet-service-cadet-new_peer.c b/src/cadet/gnunet-service-cadet-new_peer.c index 180fdab54..97bb1378e 100644 --- a/src/cadet/gnunet-service-cadet-new_peer.c +++ b/src/cadet/gnunet-service-cadet-new_peer.c @@ -25,7 +25,6 @@ * @author Christian Grothoff * * TODO: - * - timeout for routes * - optimize stopping/restarting DHT search to situations * where we actually need it (i.e. not if we have a direct connection, * or if we already have plenty of good short ones, or maybe even -- 2.25.1