implement route timeouts at 3x keepalive frequency
authorChristian Grothoff <christian@grothoff.org>
Sun, 29 Jan 2017 15:24:14 +0000 (16:24 +0100)
committerChristian Grothoff <christian@grothoff.org>
Sun, 29 Jan 2017 15:24:14 +0000 (16:24 +0100)
src/cadet/cadet.conf.in
src/cadet/gnunet-service-cadet-new_core.c
src/cadet/gnunet-service-cadet-new_peer.c

index a6d7627862ed9e453af98f750a10b972aa47c1be..86ba2e535d0a2b6e839c2ccd52a8a4b027822930 100644 (file)
@@ -11,13 +11,43 @@ UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-cadet.sock
 UNIX_MATCH_UID = YES
 UNIX_MATCH_GID = YES
 
+
+# How often do we send KEEPALIVE messages on connections to keep them
+# from timing out?
 REFRESH_CONNECTION_TIME = 5 min
+
+# Percentage of packets CADET is artificially dropping. Used for testing only!
+# DROP_PERCENT =
+
+# How frequently do we usually anounce our presence in the DHT?
 ID_ANNOUNCE_TIME = 1 h
+
+# FIXME: document
 CONNECT_TIMEOUT = 30 s
+
+# What is the replication level we give to the DHT when announcing our
+# existence?  Usually there is no need to change this.
 DHT_REPLICATION_LEVEL = 3
-MAX_TUNNELS = 1000
+
+# FIXME: not implemented
+# MAX_TUNNELS = 1000
+
+# FIXME: not implemented, replaced by MAX_ROUTES in NEW CADET!
 MAX_CONNECTIONS = 1000
+
+# How many routes do we participate in at most?  Should be smaller
+# than MAX_MSGS_QUEUE
+MAX_ROUTES = 5000
+
+# FIXME: not implemented
 MAX_MSGS_QUEUE = 10000
+
+# FIXME: not implemented
 MAX_PEERS = 1000
+
+# How often do we advance the ratchet even if there is not
+# any traffic?
 RATCHET_TIME = 1 h
+
+# How often do we advance the ratched if there is traffic?
 RATCHET_MESSAGES = 64
index 956a1207d9e69f9322d30362733962c1001b04b4..25ffcb3cee88ef92b726098e65fb6e40d9a160d6 100644 (file)
@@ -27,6 +27,7 @@
  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
  *
  * TODO:
+ * - properly implement GLOBAL message buffer, instead of per-route buffers
  * - Optimization: given BROKEN messages, destroy paths (?)
  */
 #include "platform.h"
@@ -123,6 +124,10 @@ struct CadetRoute
    */
   struct GNUNET_TIME_Absolute last_use;
 
+  /**
+   * Position of this route in the #route_heap.
+   */
+  struct GNUNET_CONTAINER_HeapNode *hn;
 };
 
 
@@ -136,6 +141,21 @@ static struct GNUNET_CORE_Handle *core;
  */
 static struct GNUNET_CONTAINER_MultiShortmap *routes;
 
+/**
+ * Heap of routes, MIN-sorted by last activity.
+ */
+static struct GNUNET_CONTAINER_Heap *route_heap;
+
+/**
+ * Maximum number of concurrent routes this peer will support.
+ */
+static unsigned long long max_routes;
+
+/**
+ * Task to timeout routes.
+ */
+static struct GNUNET_SCHEDULER_Task *timeout_task;
+
 
 /**
  * Get the route corresponding to a hash.
@@ -187,6 +207,9 @@ route_message (struct CadetPeer *prev,
                   env);
     return;
   }
+  route->last_use = GNUNET_TIME_absolute_get ();
+  GNUNET_CONTAINER_heap_update_cost (route->hn,
+                                     route->last_use.abs_value_us);
   dir = (prev == route->prev.hop) ? &route->next : &route->prev;
   if (GNUNET_YES == dir->is_ready)
   {
@@ -294,6 +317,8 @@ destroy_route (struct CadetRoute *route)
        GNUNET_i2s  (GCP_get_id (route->prev.hop)),
        GNUNET_i2s2 (GCP_get_id (route->next.hop)),
        GNUNET_sh2s (&route->cid.connection_of_tunnel));
+  GNUNET_assert (route ==
+                 GNUNET_CONTAINER_heap_remove_node (route->hn));
   destroy_direction (&route->prev);
   destroy_direction (&route->next);
   GNUNET_free (route);
@@ -340,6 +365,49 @@ send_broken (struct RouteDirection *target,
 }
 
 
+/**
+ * Function called to check if any routes have timed out, and if
+ * so, to clean them up.  Finally, schedules itself again at the
+ * earliest time where there might be more work.
+ *
+ * @param cls NULL
+ */
+static void
+timeout_cb (void *cls)
+{
+  struct CadetRoute *r;
+  struct GNUNET_TIME_Relative linger;
+  struct GNUNET_TIME_Absolute exp;
+
+  timeout_task = NULL;
+  linger = GNUNET_TIME_relative_multiply (keepalive_period,
+                                          3);
+  while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
+  {
+    exp = GNUNET_TIME_absolute_add (r->last_use,
+                                    linger);
+    if (0 != GNUNET_TIME_absolute_get_duration (exp).rel_value_us)
+    {
+      /* Route not yet timed out, wait until it does. */
+      timeout_task = GNUNET_SCHEDULER_add_at (exp,
+                                              &timeout_cb,
+                                              NULL);
+      return;
+    }
+    send_broken (&r->prev,
+                 &r->cid,
+                 NULL,
+                 NULL);
+    send_broken (&r->next,
+                 &r->cid,
+                 NULL,
+                 NULL);
+    destroy_route (r);
+  }
+  /* No more routes left, so no need for a #timeout_task */
+}
+
+
 /**
  * Function called when the message queue to the previous hop
  * becomes available/unavailable.  We expect this function to
@@ -407,6 +475,35 @@ dir_init (struct RouteDirection *dir,
 }
 
 
+/**
+ * We could not create the desired route.  Send a
+ * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+ * message to @a target.
+ *
+ * @param target who should receive the message
+ * @param cid identifier of the connection/route that failed
+ * @param failure_at neighbour with which we failed to route,
+ *        or NULL.
+ */
+static void
+send_broken_without_mqm (struct CadetPeer *target,
+                         const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+                         const struct GNUNET_PeerIdentity *failure_at)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+  env = GNUNET_MQ_msg (bm,
+                       GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+  bm->cid = *cid;
+  bm->peer1 = my_full_id;
+  if (NULL != failure_at)
+    bm->peer2 = *failure_at;
+  GCP_send_ooo (target,
+                env);
+}
+
+
 /**
  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
  *
@@ -492,20 +589,14 @@ handle_connection_create (void *cls,
     {
       /* Send back BROKEN: duplicate connection on the same path,
          we will use the other one. */
-      struct GNUNET_MQ_Envelope *env;
-      struct GNUNET_CADET_ConnectionBrokenMessage *bm;
-
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
            GCP_2s (sender),
            GNUNET_sh2s (&msg->cid.connection_of_tunnel),
            GCPP_2s (path));
-      env = GNUNET_MQ_msg (bm,
-                           GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
-      bm->cid = msg->cid;
-      bm->peer1 = my_full_id;
-      GCP_send_ooo (sender,
-                    env);
+      send_broken_without_mqm (sender,
+                               &msg->cid,
+                               NULL);
       return;
     }
     return;
@@ -517,22 +608,26 @@ handle_connection_create (void *cls,
        (GNUNET_NO == GCP_has_core_connection (next)) )
   {
     /* unworkable, send back BROKEN notification */
-    struct GNUNET_MQ_Envelope *env;
-    struct GNUNET_CADET_ConnectionBrokenMessage *bm;
-
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
          GCP_2s (sender),
          GNUNET_sh2s (&msg->cid.connection_of_tunnel),
          GNUNET_i2s (&pids[off + 1]),
          off + 1);
-    env = GNUNET_MQ_msg (bm,
-                         GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
-    bm->cid = msg->cid;
-    bm->peer1 = pids[off + 1];
-    bm->peer2 = my_full_id;
-    GCP_send_ooo (sender,
-                  env);
+    send_broken_without_mqm (sender,
+                             &msg->cid,
+                             &pids[off + 1]);
+    return;
+  }
+  if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
+         GCP_2s (sender),
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    send_broken_without_mqm (sender,
+                             &msg->cid,
+                             &pids[off - 1]);
     return;
   }
 
@@ -545,6 +640,7 @@ handle_connection_create (void *cls,
        off + 1);
   route = GNUNET_new (struct CadetRoute);
   route->cid = msg->cid;
+  route->last_use = GNUNET_TIME_absolute_get ();
   dir_init (&route->prev,
             route,
             sender);
@@ -556,6 +652,14 @@ handle_connection_create (void *cls,
                                                      &route->cid.connection_of_tunnel,
                                                      route,
                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
+                                            route,
+                                            route->last_use.abs_value_us);
+  if (NULL == timeout_task)
+    timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
+                                                                                3),
+                                                 &timeout_cb,
+                                                 NULL);
 }
 
 
@@ -953,8 +1057,15 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
     GNUNET_MQ_handler_end ()
   };
 
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (c,
+                                             "CADET",
+                                             "MAX_ROUTES",
+                                             &max_routes))
+    max_routes = 10000;
   routes = GNUNET_CONTAINER_multishortmap_create (1024,
                                                   GNUNET_NO);
+  route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
   core = GNUNET_CORE_connect (c,
                               NULL,
                               &core_init_cb,
@@ -977,6 +1088,14 @@ GCO_shutdown ()
   }
   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
   GNUNET_CONTAINER_multishortmap_destroy (routes);
+  routes = NULL;
+  GNUNET_CONTAINER_heap_destroy (route_heap);
+  route_heap = NULL;
+  if (NULL != timeout_task)
+  {
+    GNUNET_SCHEDULER_cancel (timeout_task);
+    timeout_task = NULL;
+  }
 }
 
 /* end of gnunet-cadet-service_core.c */
index 180fdab54037de28858a2ef38e4d228f2700fe06..97bb1378ef3c1879f1e0a1f5c4339414fb905926 100644 (file)
@@ -25,7 +25,6 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - timeout for routes
  * - optimize stopping/restarting DHT search to situations
  *   where we actually need it (i.e. not if we have a direct connection,
  *   or if we already have plenty of good short ones, or maybe even