fair, global message buffer implemented
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_core.c
index 3d8406dc9f8b5a47ec7f4bc6f8c9abbea4507f1c..2c050af6ddee92fc09739a3ac85ceac5fcb7270e 100644 (file)
  * @author Christian Grothoff
  *
  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
+ *
+ * TODO:
+ * - do NOT use buffering if the route options say no buffer!
+ * - Optimization: given BROKEN messages, destroy paths (?)
  */
 #include "platform.h"
 #include "gnunet-service-cadet-new_core.h"
 #include "gnunet-service-cadet-new_connection.h"
 #include "gnunet-service-cadet-new_tunnels.h"
 #include "gnunet_core_service.h"
+#include "gnunet_statistics_service.h"
 #include "cadet_protocol.h"
 
 
+#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
+
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection;
+
+
+/**
+ * Set of CadetRoutes that have exactly the same number of messages
+ * in their buffer.  Used so we can efficiently find all of those
+ * routes that have the current maximum of messages in the buffer (in
+ * case we have to purge).
+ */
+struct Rung
+{
+
+  /**
+   * Rung of RouteDirections with one more buffer entry each.
+   */
+  struct Rung *next;
+
+  /**
+   * Rung of RouteDirections with one less buffer entry each.
+   */
+  struct Rung *prev;
+
+  /**
+   * DLL of route directions with a number of buffer entries matching this rung.
+   */
+  struct RouteDirection *rd_head;
+
+  /**
+   * DLL of route directions with a number of buffer entries matching this rung.
+   */
+  struct RouteDirection *rd_tail;
+
+  /**
+   * Total number of route directions in this rung.
+   */
+  unsigned int num_routes;
+
+  /**
+   * Number of messages route directions at this rung have
+   * in their buffer.
+   */
+  unsigned int rung_off;
+};
+
+
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection
+{
+
+  /**
+   * DLL of other route directions within the same `struct Rung`.
+   */
+  struct RouteDirection *prev;
+
+  /**
+   * DLL of other route directions within the same `struct Rung`.
+   */
+  struct RouteDirection *next;
+
+  /**
+   * Rung of this route direction (matches length of the buffer DLL).
+   */
+  struct Rung *rung;
+
+  /**
+   * Head of DLL of envelopes we have in the buffer for this direction.
+   */
+  struct GNUNET_MQ_Envelope *env_head;
+
+  /**
+   * Tail of DLL of envelopes we have in the buffer for this direction.
+   */
+  struct GNUNET_MQ_Envelope *env_tail;
+
+  /**
+   * Target peer.
+   */
+  struct CadetPeer *hop;
+
+  /**
+   * Route this direction is part of.
+   */
+  struct CadetRoute *my_route;
+
+  /**
+   * Message queue manager for @e hop.
+   */
+  struct GCP_MessageQueueManager *mqm;
+
+  /**
+   * Is @e mqm currently ready for transmission?
+   */
+  int is_ready;
+
+};
+
+
 /**
  * Description of a segment of a `struct CadetConnection` at the
  * intermediate peers.  Routes are basically entries in a peer's
@@ -48,24 +157,14 @@ struct CadetRoute
 {
 
   /**
-   * Previous hop on this route.
-   */
-  struct CadetPeer *prev_hop;
-
-  /**
-   * Next hop on this route.
+   * Information about the next hop on this route.
    */
-  struct CadetPeer *next_hop;
+  struct RouteDirection next;
 
   /**
-   * Message queue notifications for @e prev_hop.
+   * Information about the previous hop on this route.
    */
-  struct GCP_MessageQueueManager *prev_mqm;
-
-  /**
-   * Message queue notifications for @e next_hop.
-   */
-  struct GCP_MessageQueueManager *next_mqm;
+  struct RouteDirection prev;
 
   /**
    * Unique identifier for the connection that uses this route.
@@ -78,11 +177,14 @@ struct CadetRoute
   struct GNUNET_TIME_Absolute last_use;
 
   /**
-   * Counter, used to verify that both MQs are up when the route is
-   * initialized.
+   * Position of this route in the #route_heap.
    */
-  unsigned int up;
+  struct GNUNET_CONTAINER_HeapNode *hn;
 
+  /**
+   * Options for the route, control buffering.
+   */
+  enum GNUNET_CADET_ChannelOption options;
 };
 
 
@@ -96,6 +198,47 @@ static struct GNUNET_CORE_Handle *core;
  */
 static struct GNUNET_CONTAINER_MultiShortmap *routes;
 
+/**
+ * Heap of routes, MIN-sorted by last activity.
+ */
+static struct GNUNET_CONTAINER_Heap *route_heap;
+
+/**
+ * Rung zero (always pointed to by #rung_head).
+ */
+static struct Rung rung_zero;
+
+/**
+ * DLL of rungs, with the head always point to a rung of
+ * route directions with no messages in the queue.
+ */
+static struct Rung *rung_head = &rung_zero;
+
+/**
+ * Tail of the #rung_head DLL.
+ */
+static struct Rung *rung_tail = &rung_zero;
+
+/**
+ * Maximum number of concurrent routes this peer will support.
+ */
+static unsigned long long max_routes;
+
+/**
+ * Maximum number of envelopes we will buffer at this peer.
+ */
+static unsigned long long max_buffers;
+
+/**
+ * Current number of envelopes we have buffered at this peer.
+ */
+static unsigned long long cur_buffers;
+
+/**
+ * Task to timeout routes.
+ */
+static struct GNUNET_SCHEDULER_Task *timeout_task;
+
 
 /**
  * Get the route corresponding to a hash.
@@ -110,6 +253,91 @@ get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
 }
 
 
+/**
+ * Lower the rung in which @a dir is by 1.
+ *
+ * @param dir direction to lower in rung.
+ */
+static void
+lower_rung (struct RouteDirection *dir)
+{
+  struct Rung *rung = dir->rung;
+  struct Rung *prev;
+
+  GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+  prev = rung->prev;
+  GNUNET_assert (NULL != prev);
+  if (prev->rung_off != rung->rung_off - 1)
+  {
+    prev = GNUNET_new (struct Rung);
+    prev->rung_off = rung->rung_off - 1;
+    GNUNET_CONTAINER_DLL_insert_after (rung_head,
+                                       rung_tail,
+                                       prev,
+                                       rung);
+  }
+  else
+  {
+    rung = prev;
+  }
+  GNUNET_assert (NULL != rung);
+  GNUNET_CONTAINER_DLL_insert (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+
+}
+
+
+/**
+ * Discard the buffer @a env from the route direction @a dir and
+ * move @a dir down a rung.
+ *
+ * @param dir direction that contains the @a env in the buffer
+ * @param env envelope to discard
+ */
+static void
+discard_buffer (struct RouteDirection *dir,
+                struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_MQ_dll_remove (&dir->env_head,
+                        &dir->env_tail,
+                        env);
+  cur_buffers--;
+  GNUNET_MQ_discard (env);
+  lower_rung (dir);
+}
+
+
+/**
+ * Discard all messages from the highest rung, to make space.
+ */
+static void
+discard_all_from_rung_tail ()
+{
+  struct Rung *tail = rung_tail;
+  struct RouteDirection *dir;
+
+  while (NULL != (dir = tail->rd_head))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue full due new message %s on connection %s, dropping old message\n",
+         GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+    GNUNET_STATISTICS_update (stats,
+                              "# messages dropped due to full buffer",
+                              1,
+                              GNUNET_NO);
+    discard_buffer (dir,
+                    dir->env_head);
+  }
+  GNUNET_CONTAINER_DLL_remove (rung_head,
+                               rung_tail,
+                               tail);
+  GNUNET_free (tail);
+}
+
+
 /**
  * We message @a msg from @a prev.  Find its route by @a cid and
  * forward to the next hop.  Drop and signal broken route if we do not
@@ -125,6 +353,10 @@ route_message (struct CadetPeer *prev,
                const struct GNUNET_MessageHeader *msg)
 {
   struct CadetRoute *route;
+  struct RouteDirection *dir;
+  struct Rung *rung;
+  struct Rung *nxt;
+  struct GNUNET_MQ_Envelope *env;
 
   route = get_route (cid);
   if (NULL == route)
@@ -132,17 +364,101 @@ route_message (struct CadetPeer *prev,
     struct GNUNET_MQ_Envelope *env;
     struct GNUNET_CADET_ConnectionBrokenMessage *bm;
 
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to route message of type %u from %s on connection %s: no route\n",
+         ntohs (msg->type),
+         GCP_2s (prev),
+         GNUNET_sh2s (&cid->connection_of_tunnel));
     env = GNUNET_MQ_msg (bm,
                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
     bm->cid = *cid;
     bm->peer1 = my_full_id;
-    GCP_send (prev,
-              env);
+    GCP_send_ooo (prev,
+                  env);
+    return;
+  }
+  route->last_use = GNUNET_TIME_absolute_get ();
+  GNUNET_CONTAINER_heap_update_cost (route->hn,
+                                     route->last_use.abs_value_us);
+  dir = (prev == route->prev.hop) ? &route->next : &route->prev;
+  if (GNUNET_YES == dir->is_ready)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Routing message of type %u from %s to %s on connection %s\n",
+         ntohs (msg->type),
+         GCP_2s (prev),
+         GNUNET_i2s (GCP_get_id (dir->hop)),
+         GNUNET_sh2s (&cid->connection_of_tunnel));
+    dir->is_ready = GNUNET_NO;
+    GCP_send (dir->mqm,
+              GNUNET_MQ_msg_copy (msg));
     return;
   }
-  /* FIXME: support round-robin queue management here somewhere! */
-  GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop,
-            GNUNET_MQ_msg_copy (msg));
+  rung = dir->rung;
+  if (cur_buffers == max_buffers)
+  {
+    /* Need to make room. */
+    if (NULL != rung->next)
+    {
+      /* Easy case, drop messages from route directions in highest rung */
+      discard_all_from_rung_tail ();
+    }
+    else
+    {
+      /* We are in the highest rung, drop our own! */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Queue full due new message %s on connection %s, dropping old message\n",
+           GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+      GNUNET_STATISTICS_update (stats,
+                                "# messages dropped due to full buffer",
+                                1,
+                                GNUNET_NO);
+      discard_buffer (dir,
+                      dir->env_head);
+      rung = dir->rung;
+    }
+  }
+  /* remove 'dir' from current rung */
+  GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+  /* make 'nxt' point to the next higher rung, creat if necessary */
+  nxt = rung->next;
+  if ( (NULL == nxt) ||
+       (rung->rung_off + 1 != nxt->rung_off) )
+  {
+    nxt = GNUNET_new (struct Rung);
+    nxt->rung_off = rung->rung_off + 1;
+    GNUNET_CONTAINER_DLL_insert_after (rung_head,
+                                       rung_tail,
+                                       rung,
+                                       nxt);
+  }
+  /* insert 'dir' into next higher rung */
+  GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
+                               nxt->rd_tail,
+                               dir);
+  /* add message into 'dir' buffer */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Queueing new message of type %u from %s to %s on connection %s\n",
+       ntohs (msg->type),
+       GCP_2s (prev),
+       GNUNET_i2s (GCP_get_id (dir->hop)),
+       GNUNET_sh2s (&cid->connection_of_tunnel));
+  env = GNUNET_MQ_msg_copy (msg);
+  GNUNET_MQ_dll_insert_tail (&dir->env_head,
+                             &dir->env_tail,
+                             env);
+  cur_buffers++;
+  /* Clean up 'rung' if now empty (and not head) */
+  if ( (NULL == rung->rd_head) &&
+       (rung != rung_head) )
+  {
+    GNUNET_CONTAINER_DLL_remove (rung_head,
+                                 rung_tail,
+                                 rung);
+    GNUNET_free (rung);
+  }
 }
 
 
@@ -169,6 +485,37 @@ check_connection_create (void *cls,
 }
 
 
+/**
+ * Free internal data of a route direction.
+ *
+ * @param dir direction to destroy (do NOT free memory of 'dir' itself)
+ */
+static void
+destroy_direction (struct RouteDirection *dir)
+{
+  struct GNUNET_MQ_Envelope *env;
+
+  while (NULL != (env = dir->env_head))
+  {
+    GNUNET_STATISTICS_update (stats,
+                              "# messages dropped due to route destruction",
+                              1,
+                              GNUNET_NO);
+    discard_buffer (dir,
+                    env);
+  }
+  if (NULL != dir->mqm)
+  {
+    GCP_request_mq_cancel (dir->mqm,
+                           NULL);
+    dir->mqm = NULL;
+  }
+  GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
+                               rung_head->rd_tail,
+                               dir);
+}
+
+
 /**
  * Destroy our state for @a route.
  *
@@ -177,8 +524,19 @@ check_connection_create (void *cls,
 static void
 destroy_route (struct CadetRoute *route)
 {
-  GCP_request_mq_cancel (route->next_mqm);
-  GCP_request_mq_cancel (route->prev_mqm);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Destroying route from %s to %s of connection %s\n",
+       GNUNET_i2s  (GCP_get_id (route->prev.hop)),
+       GNUNET_i2s2 (GCP_get_id (route->next.hop)),
+       GNUNET_sh2s (&route->cid.connection_of_tunnel));
+  GNUNET_assert (route ==
+                 GNUNET_CONTAINER_heap_remove_node (route->hn));
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multishortmap_remove (routes,
+                                                        &route->cid.connection_of_tunnel,
+                                                        route));
+  destroy_direction (&route->prev);
+  destroy_direction (&route->next);
   GNUNET_free (route);
 }
 
@@ -192,7 +550,7 @@ destroy_route (struct CadetRoute *route)
  * @param peer2 another one of the peers where a link is broken
  */
 static void
-send_broken (struct CadetPeer *target,
+send_broken (struct RouteDirection *target,
              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
              const struct GNUNET_PeerIdentity *peer1,
              const struct GNUNET_PeerIdentity *peer2)
@@ -200,6 +558,15 @@ send_broken (struct CadetPeer *target,
   struct GNUNET_MQ_Envelope *env;
   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
 
+  if (NULL == target->mqm)
+    return; /* Can't send notification, connection is down! */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Notifying %s about BROKEN route at %s-%s of connection %s\n",
+       GCP_2s (target->hop),
+       GNUNET_i2s (peer1),
+       GNUNET_i2s2 (peer2),
+       GNUNET_sh2s (&cid->connection_of_tunnel));
+
   env = GNUNET_MQ_msg (bm,
                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
   bm->cid = *cid;
@@ -207,36 +574,53 @@ send_broken (struct CadetPeer *target,
     bm->peer1 = *peer1;
   if (NULL != peer2)
     bm->peer2 = *peer2;
-  GCP_send (target,
-            env);
+
+  GCP_request_mq_cancel (target->mqm,
+                         env);
+  target->mqm = NULL;
 }
 
 
 /**
- * Function called when the message queue to the previous hop
- * becomes available/unavailable.  We expect this function to
- * be called immediately when we register, and then again
- * later if the connection ever goes down.
+ * Function called to check if any routes have timed out, and if
+ * so, to clean them up.  Finally, schedules itself again at the
+ * earliest time where there might be more work.
  *
- * @param cls the `struct CadetRoute`
- * @param mq the message queue, NULL if connection went down
+ * @param cls NULL
  */
 static void
-mqm_cr_destroy_prev (void *cls,
-                     struct GNUNET_MQ_Handle *mq)
+timeout_cb (void *cls)
 {
-  struct CadetRoute *route = cls;
-
-  if (NULL != mq)
+  struct CadetRoute *r;
+  struct GNUNET_TIME_Relative linger;
+  struct GNUNET_TIME_Absolute exp;
+
+  timeout_task = NULL;
+  linger = GNUNET_TIME_relative_multiply (keepalive_period,
+                                          3);
+  while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
   {
-    route->up |= 1;
-    return;
+    exp = GNUNET_TIME_absolute_add (r->last_use,
+                                    linger);
+    if (0 != GNUNET_TIME_absolute_get_duration (exp).rel_value_us)
+    {
+      /* Route not yet timed out, wait until it does. */
+      timeout_task = GNUNET_SCHEDULER_add_at (exp,
+                                              &timeout_cb,
+                                              NULL);
+      return;
+    }
+    send_broken (&r->prev,
+                 &r->cid,
+                 NULL,
+                 NULL);
+    send_broken (&r->next,
+                 &r->cid,
+                 NULL,
+                 NULL);
+    destroy_route (r);
   }
-  send_broken (route->next_hop,
-               &route->cid,
-               GCP_get_id (route->prev_hop),
-               &my_full_id);
-  destroy_route (route);
+  /* No more routes left, so no need for a #timeout_task */
 }
 
 
@@ -246,28 +630,97 @@ mqm_cr_destroy_prev (void *cls,
  * be called immediately when we register, and then again
  * later if the connection ever goes down.
  *
- * @param cls the `struct CadetRoute`
- * @param mq the message queue, NULL if connection went down
+ * @param cls the `struct RouteDirection`
+ * @param available #GNUNET_YES if sending is now possible,
+ *                  #GNUNET_NO if sending is no longer possible
+ *                  #GNUNET_SYSERR if sending is no longer possible
+ *                                 and the last envelope was discarded
  */
 static void
-mqm_cr_destroy_next (void *cls,
-                     struct GNUNET_MQ_Handle *mq)
+dir_ready_cb (void *cls,
+              int ready)
 {
-  struct CadetRoute *route = cls;
+  struct RouteDirection *dir = cls;
+  struct CadetRoute *route = dir->my_route;
+  struct RouteDirection *odir;
 
-  if (NULL != mq)
+  if (GNUNET_YES == ready)
   {
-    route->up |= 2;
+    struct GNUNET_MQ_Envelope *env;
+
+    dir->is_ready = GNUNET_YES;
+    if (NULL != (env = dir->env_head))
+    {
+      GNUNET_MQ_dll_remove (&dir->env_head,
+                            &dir->env_tail,
+                            env);
+      cur_buffers--;
+      lower_rung (dir);
+      dir->is_ready = GNUNET_NO;
+      GCP_send (dir->mqm,
+                env);
+    }
     return;
   }
-  send_broken (route->prev_hop,
+  odir = (dir == &route->next) ? &route->prev : &route->next;
+  send_broken (&route->next,
                &route->cid,
-               GCP_get_id (route->next_hop),
+               GCP_get_id (odir->hop),
                &my_full_id);
   destroy_route (route);
 }
 
 
+/**
+ * Initialize one of the directions of a route.
+ *
+ * @param route route the direction belongs to
+ * @param dir direction to initialize
+ * @param hop next hop on in the @a dir
+ */
+static void
+dir_init (struct RouteDirection *dir,
+          struct CadetRoute *route,
+          struct CadetPeer *hop)
+{
+  dir->hop = hop;
+  dir->my_route = route;
+  dir->mqm = GCP_request_mq (hop,
+                             &dir_ready_cb,
+                             dir);
+  GNUNET_assert (GNUNET_YES == dir->is_ready);
+}
+
+
+/**
+ * We could not create the desired route.  Send a
+ * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+ * message to @a target.
+ *
+ * @param target who should receive the message
+ * @param cid identifier of the connection/route that failed
+ * @param failure_at neighbour with which we failed to route,
+ *        or NULL.
+ */
+static void
+send_broken_without_mqm (struct CadetPeer *target,
+                         const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+                         const struct GNUNET_PeerIdentity *failure_at)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+  env = GNUNET_MQ_msg (bm,
+                       GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+  bm->cid = *cid;
+  bm->peer1 = my_full_id;
+  if (NULL != failure_at)
+    bm->peer2 = *failure_at;
+  GCP_send_ooo (target,
+                env);
+}
+
+
 /**
  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
  *
@@ -285,7 +738,9 @@ handle_connection_create (void *cls,
   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
   unsigned int path_length;
   unsigned int off;
+  enum GNUNET_CADET_ChannelOption options;
 
+  options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
   path_length = size / sizeof (struct GNUNET_PeerIdentity);
   /* Initiator is at offset 0. */
   for (off=1;off<path_length;off++)
@@ -310,57 +765,122 @@ handle_connection_create (void *cls,
   if (NULL !=
       get_route (&msg->cid))
   {
-    /* CID not chosen at random, collides */
-    GNUNET_break_op (0);
+    /* Duplicate CREATE, pass it on, previous one might have been lost! */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    route_message (sender,
+                   &msg->cid,
+                   &msg->header);
     return;
   }
   if (off == path_length - 1)
   {
     /* We are the destination, create connection */
+    struct CadetConnection *cc;
     struct CadetPeerPath *path;
     struct CadetPeer *origin;
 
-    path = GCPP_get_path_from_route (path_length,
-                                     pids);
+    cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                             &msg->cid.connection_of_tunnel);
+    if (NULL != cc)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
+           GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+      GCC_handle_duplicate_create (cc);
+      return;
+    }
+
     origin = GCP_get (&pids[0],
                       GNUNET_YES);
-    GCT_add_inbound_connection (GCT_create_tunnel (origin),
-                                &msg->cid,
-                                path);
-
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
+         GCP_2s (origin),
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    path = GCPP_get_path_from_route (path_length - 1,
+                                     pids);
+    if (GNUNET_OK !=
+        GCT_add_inbound_connection (GCP_get_tunnel (origin,
+                                                    GNUNET_YES),
+                                    &msg->cid,
+                                    (enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
+                                    path))
+    {
+      /* Send back BROKEN: duplicate connection on the same path,
+         we will use the other one. */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
+           GCP_2s (sender),
+           GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+           GCPP_2s (path));
+      send_broken_without_mqm (sender,
+                               &msg->cid,
+                               NULL);
+      return;
+    }
     return;
   }
   /* We are merely a hop on the way, check if we can support the route */
   next = GCP_get (&pids[off + 1],
                   GNUNET_NO);
   if ( (NULL == next) ||
-       (NULL == GCP_get_mq (next)) )
+       (GNUNET_NO == GCP_has_core_connection (next)) )
   {
     /* unworkable, send back BROKEN notification */
-    send_broken (sender,
-                 &msg->cid,
-                 &pids[off + 1],
-                 &my_full_id);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
+         GCP_2s (sender),
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+         GNUNET_i2s (&pids[off + 1]),
+         off + 1);
+    send_broken_without_mqm (sender,
+                             &msg->cid,
+                             &pids[off + 1]);
+    return;
+  }
+  if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
+         GCP_2s (sender),
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    send_broken_without_mqm (sender,
+                             &msg->cid,
+                             &pids[off - 1]);
     return;
   }
 
   /* Workable route, create routing entry */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
+       GCP_2s (sender),
+       GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+       GNUNET_i2s (&pids[off + 1]),
+       off + 1);
   route = GNUNET_new (struct CadetRoute);
+  route->options = options;
   route->cid = msg->cid;
-  route->prev_mqm = GCP_request_mq (sender,
-                                    &mqm_cr_destroy_prev,
-                                    route);
-  route->next_mqm = GCP_request_mq (next,
-                                    &mqm_cr_destroy_next,
-                                    route);
-  route->prev_hop = sender;
-  route->next_hop = next;
-  GNUNET_assert ((1|2) == route->up);
+  route->last_use = GNUNET_TIME_absolute_get ();
+  dir_init (&route->prev,
+            route,
+            sender);
+  dir_init (&route->next,
+            route,
+            next);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multishortmap_put (routes,
                                                      &route->cid.connection_of_tunnel,
                                                      route,
                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
+                                            route,
+                                            route->last_use.abs_value_us);
+  if (NULL == timeout_task)
+    timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
+                                                                                3),
+                                                 &timeout_cb,
+                                                 NULL);
 }
 
 
@@ -371,8 +891,8 @@ handle_connection_create (void *cls,
  * @param msg Message itself.
  */
 static void
-handle_connection_ack (void *cls,
-                       const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
+handle_connection_create_ack (void *cls,
+                              const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
 {
   struct CadetPeer *peer = cls;
   struct CadetConnection *cc;
@@ -393,7 +913,10 @@ handle_connection_ack (void *cls,
       GNUNET_break_op (0);
       return;
     }
-    GCC_handle_connection_ack (cc);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CONNECTION_CREATE_ACK for connection %s.\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    GCC_handle_connection_create_ack (cc);
     return;
   }
 
@@ -435,18 +958,22 @@ handle_connection_broken (void *cls,
       GNUNET_break_op (0);
       return;
     }
-    GCC_destroy (cc);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    GCC_destroy_without_core (cc);
 
     /* FIXME: also destroy the path up to the specified link! */
     return;
   }
 
   /* We're just an intermediary peer, route the message along its path */
-  route = get_route (&msg->cid);
   route_message (peer,
                  &msg->cid,
                  &msg->header);
-  destroy_route (route);
+  route = get_route (&msg->cid);
+  if (NULL != route)
+    destroy_route (route);
   /* FIXME: also destroy paths we MAY have up to the specified link! */
 }
 
@@ -481,28 +1008,36 @@ handle_connection_destroy (void *cls,
       GNUNET_break_op (0);
       return;
     }
-    GCC_destroy (cc);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+
+    GCC_destroy_without_core (cc);
     return;
   }
 
   /* We're just an intermediary peer, route the message along its path */
-  route = get_route (&msg->cid);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
+       GNUNET_sh2s (&msg->cid.connection_of_tunnel));
   route_message (peer,
                  &msg->cid,
                  &msg->header);
-  destroy_route (route);
+  route = get_route (&msg->cid);
+  if (NULL != route)
+    destroy_route (route);
 }
 
 
 /**
- * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_HOP_BY_HOP_ENCRYPTED_ACK.
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
  *
  * @param cls Closure (CadetPeer for neighbor that sent the message).
  * @param msg Message itself.
  */
 static void
-handle_hop_by_hop_encrypted_ack (void *cls,
-                                 const struct GNUNET_CADET_ConnectionEncryptedAckMessage *msg)
+handle_tunnel_kx (void *cls,
+                  const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
 {
   struct CadetPeer *peer = cls;
   struct CadetConnection *cc;
@@ -523,10 +1058,8 @@ handle_hop_by_hop_encrypted_ack (void *cls,
       GNUNET_break_op (0);
       return;
     }
-#if FIXME
-    GCC_handle_ack (peer,
-                    msg);
-#endif
+    GCC_handle_kx (cc,
+                   msg);
     return;
   }
 
@@ -538,40 +1071,21 @@ handle_hop_by_hop_encrypted_ack (void *cls,
 
 
 /**
- * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED_POLL
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
  *
  * @param cls Closure (CadetPeer for neighbor that sent the message).
  * @param msg Message itself.
  */
 static void
-handle_poll (void *cls,
-             const struct GNUNET_CADET_ConnectionHopByHopPollMessage *msg)
-{
-  struct CadetPeer *peer = cls;
-
-#if FIXME
-  GCC_handle_poll (peer,
-                   msg);
-#endif
-}
-
-
-/**
- * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
- *
- * @param cls Closure (CadetPeer for neighbor that sent the message).
- * @param msg Message itself.
- */
-static void
-handle_tunnel_kx (void *cls,
-                  const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
+handle_tunnel_kx_auth (void *cls,
+                       const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
 {
   struct CadetPeer *peer = cls;
   struct CadetConnection *cc;
 
   /* First, check if message belongs to a connection that ends here. */
   cc = GNUNET_CONTAINER_multishortmap_get (connections,
-                                           &msg->cid.connection_of_tunnel);
+                                           &msg->kx.cid.connection_of_tunnel);
   if (NULL != cc)
   {
     /* verify message came from the right direction */
@@ -585,15 +1099,15 @@ handle_tunnel_kx (void *cls,
       GNUNET_break_op (0);
       return;
     }
-    GCC_handle_kx (cc,
-                   msg);
+    GCC_handle_kx_auth (cc,
+                        msg);
     return;
   }
 
   /* We're just an intermediary peer, route the message along its path */
   route_message (peer,
-                 &msg->cid,
-                 &msg->header);
+                 &msg->kx.cid,
+                 &msg->kx.header);
 }
 
 
@@ -646,7 +1160,6 @@ handle_tunnel_encrypted (void *cls,
                           msg);
     return;
   }
-
   /* We're just an intermediary peer, route the message along its path */
   route_message (peer,
                  &msg->cid,
@@ -695,6 +1208,9 @@ core_connect_cb (void *cls,
 {
   struct CadetPeer *cp;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "CORE connection to peer %s was established.\n",
+       GNUNET_i2s (peer));
   cp = GCP_get (peer,
                 GNUNET_YES);
   GCP_set_mq (cp,
@@ -716,6 +1232,9 @@ core_disconnect_cb (void *cls,
 {
   struct CadetPeer *cp = peer_cls;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "CORE connection to peer %s went down.\n",
+       GNUNET_i2s (peer));
   GCP_set_mq (cp,
               NULL);
 }
@@ -734,9 +1253,9 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
                            GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
                            struct GNUNET_CADET_ConnectionCreateMessage,
                            NULL),
-    GNUNET_MQ_hd_fixed_size (connection_ack,
+    GNUNET_MQ_hd_fixed_size (connection_create_ack,
                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
-                             struct GNUNET_CADET_ConnectionCreateMessageAckMessage,
+                             struct GNUNET_CADET_ConnectionCreateAckMessage,
                              NULL),
     GNUNET_MQ_hd_fixed_size (connection_broken,
                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
@@ -746,18 +1265,14 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
                              struct GNUNET_CADET_ConnectionDestroyMessage,
                              NULL),
-    GNUNET_MQ_hd_fixed_size (hop_by_hop_encrypted_ack,
-                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_HOP_BY_HOP_ENCRYPTED_ACK,
-                             struct GNUNET_CADET_ConnectionEncryptedAckMessage,
-                             NULL),
-    GNUNET_MQ_hd_fixed_size (poll,
-                             GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED_POLL,
-                             struct GNUNET_CADET_ConnectionHopByHopPollMessage,
-                             NULL),
     GNUNET_MQ_hd_fixed_size (tunnel_kx,
                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
                              struct GNUNET_CADET_TunnelKeyExchangeMessage,
                              NULL),
+    GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
+                             GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
+                             struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
+                             NULL),
     GNUNET_MQ_hd_var_size (tunnel_encrypted,
                            GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
                            struct GNUNET_CADET_TunnelEncryptedMessage,
@@ -765,8 +1280,21 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
     GNUNET_MQ_handler_end ()
   };
 
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (c,
+                                             "CADET",
+                                             "MAX_ROUTES",
+                                             &max_routes))
+    max_routes = 5000;
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (c,
+                                             "CADET",
+                                             "MAX_MSGS_QUEUE",
+                                             &max_buffers))
+    max_buffers = 10000;
   routes = GNUNET_CONTAINER_multishortmap_create (1024,
                                                   GNUNET_NO);
+  route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
   core = GNUNET_CORE_connect (c,
                               NULL,
                               &core_init_cb,
@@ -789,6 +1317,14 @@ GCO_shutdown ()
   }
   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
   GNUNET_CONTAINER_multishortmap_destroy (routes);
+  routes = NULL;
+  GNUNET_CONTAINER_heap_destroy (route_heap);
+  route_heap = NULL;
+  if (NULL != timeout_task)
+  {
+    GNUNET_SCHEDULER_cancel (timeout_task);
+    timeout_task = NULL;
+  }
 }
 
 /* end of gnunet-cadet-service_core.c */