trying to make KX logic slightly more readable
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_core.c
index a24f7a3ce8d1fea2b81991b3e2a6629cefc662d9..086337c9ab25af579fa1d776744470e503fa459d 100644 (file)
  * @author Christian Grothoff
  *
  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
+ *
+ * TODO:
+ * - Optimization: given BROKEN messages, destroy paths (?)
  */
 #include "platform.h"
 #include "gnunet-service-cadet-new_core.h"
+#include "gnunet-service-cadet-new_paths.h"
 #include "gnunet-service-cadet-new_peer.h"
 #include "gnunet-service-cadet-new_connection.h"
+#include "gnunet-service-cadet-new_tunnels.h"
 #include "gnunet_core_service.h"
+#include "cadet_protocol.h"
+
+
+#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
+
+
+/**
+ * Number of messages we are willing to buffer per route.
+ */
+#define ROUTE_BUFFER_SIZE 8
+
+
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection
+{
+  /**
+   * Target peer.
+   */
+  struct CadetPeer *hop;
+
+  /**
+   * Route this direction is part of.
+   */
+  struct CadetRoute *my_route;
+
+  /**
+   * Message queue manager for @e hop.
+   */
+  struct GCP_MessageQueueManager *mqm;
+
+  /**
+   * Cyclic message buffer to @e hop.
+   */
+  struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
+
+  /**
+   * Next write offset to use to append messages to @e out_buffer.
+   */
+  unsigned int out_wpos;
+
+  /**
+   * Next read offset to use to retrieve messages from @e out_buffer.
+   */
+  unsigned int out_rpos;
+
+  /**
+   * Is @e mqm currently ready for transmission?
+   */
+  int is_ready;
+
+};
+
+
+/**
+ * Description of a segment of a `struct CadetConnection` at the
+ * intermediate peers.  Routes are basically entries in a peer's
+ * routing table for forwarding traffic.  At both endpoints, the
+ * routes are terminated by a `struct CadetConnection`, which knows
+ * the complete `struct CadetPath` that is formed by the individual
+ * routes.
+ */
+struct CadetRoute
+{
+
+  /**
+   * Information about the next hop on this route.
+   */
+  struct RouteDirection next;
+
+  /**
+   * Information about the previous hop on this route.
+   */
+  struct RouteDirection prev;
+
+  /**
+   * Unique identifier for the connection that uses this route.
+   */
+  struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
+
+  /**
+   * When was this route last in use?
+   */
+  struct GNUNET_TIME_Absolute last_use;
+
+};
+
 
 /**
  * Handle to the CORE service.
  */
 static struct GNUNET_CORE_Handle *core;
 
+/**
+ * Routes on which this peer is an intermediate.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *routes;
+
+
+/**
+ * Get the route corresponding to a hash.
+ *
+ * @param cid hash generated from the connection identifier
+ */
+static struct CadetRoute *
+get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
+{
+  return GNUNET_CONTAINER_multishortmap_get (routes,
+                                             &cid->connection_of_tunnel);
+}
+
+
+/**
+ * We message @a msg from @a prev.  Find its route by @a cid and
+ * forward to the next hop.  Drop and signal broken route if we do not
+ * have a route.
+ *
+ * @param prev previous hop (sender)
+ * @param cid connection identifier, tells us which route to use
+ * @param msg the message to forward
+ */
+static void
+route_message (struct CadetPeer *prev,
+               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+               const struct GNUNET_MessageHeader *msg)
+{
+  struct CadetRoute *route;
+  struct RouteDirection *dir;
+  struct GNUNET_MQ_Envelope *env;
+
+  route = get_route (cid);
+  if (NULL == route)
+  {
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to route message of type %u from %s on connection %s: no route\n",
+         ntohs (msg->type),
+         GCP_2s (prev),
+         GNUNET_sh2s (&cid->connection_of_tunnel));
+    env = GNUNET_MQ_msg (bm,
+                         GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+    bm->cid = *cid;
+    bm->peer1 = my_full_id;
+    GCP_send_ooo (prev,
+                  env);
+    return;
+  }
+  dir = (prev == route->prev.hop) ? &route->next : &route->prev;
+  if (GNUNET_YES == dir->is_ready)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Routing message of type %u from %s to %s on connection %s\n",
+         ntohs (msg->type),
+         GCP_2s (prev),
+         GNUNET_i2s (GCP_get_id (dir->hop)),
+         GNUNET_sh2s (&cid->connection_of_tunnel));
+    dir->is_ready = GNUNET_NO;
+    GCP_send (dir->mqm,
+              GNUNET_MQ_msg_copy (msg));
+    return;
+  }
+  env = dir->out_buffer[dir->out_wpos];
+  if (NULL != env)
+  {
+    /* Queue full, drop earliest message in queue */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
+         ntohs (msg->type),
+         GCP_2s (prev),
+         GNUNET_i2s (GCP_get_id (dir->hop)),
+         GNUNET_sh2s (&cid->connection_of_tunnel));
+    GNUNET_assert (dir->out_rpos == dir->out_wpos);
+    GNUNET_MQ_discard (env);
+    dir->out_rpos++;
+    if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+      dir->out_rpos = 0;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Queueing new message of type %u from %s to %s on connection %s\n",
+       ntohs (msg->type),
+       GCP_2s (prev),
+       GNUNET_i2s (GCP_get_id (dir->hop)),
+       GNUNET_sh2s (&cid->connection_of_tunnel));
+  env = GNUNET_MQ_msg_copy (msg);
+  dir->out_buffer[dir->out_wpos] = env;
+  dir->out_wpos++;
+  if (ROUTE_BUFFER_SIZE == dir->out_wpos)
+    dir->out_wpos = 0;
+}
+
+
+/**
+ * Check if the create_connection message has the appropriate size.
+ *
+ * @param cls Closure (unused).
+ * @param msg Message to check.
+ *
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
+ */
+static int
+check_connection_create (void *cls,
+                         const struct GNUNET_CADET_ConnectionCreateMessage *msg)
+{
+  uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
+
+  if (0 != (size % sizeof (struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Free internal data of a route direction.
+ *
+ * @param dir direction to destroy (do NOT free memory of 'dir' itself)
+ */
+static void
+destroy_direction (struct RouteDirection *dir)
+{
+  for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
+    if (NULL != dir->out_buffer[i])
+    {
+      GNUNET_MQ_discard (dir->out_buffer[i]);
+      dir->out_buffer[i] = NULL;
+    }
+  if (NULL != dir->mqm)
+  {
+    GCP_request_mq_cancel (dir->mqm,
+                           NULL);
+    dir->mqm = NULL;
+  }
+}
+
+
+/**
+ * Destroy our state for @a route.
+ *
+ * @param route route to destroy
+ */
+static void
+destroy_route (struct CadetRoute *route)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Destroying route from %s to %s of connection %s\n",
+       GNUNET_i2s  (GCP_get_id (route->prev.hop)),
+       GNUNET_i2s2 (GCP_get_id (route->next.hop)),
+       GNUNET_sh2s (&route->cid.connection_of_tunnel));
+  destroy_direction (&route->prev);
+  destroy_direction (&route->next);
+  GNUNET_free (route);
+}
+
+
+/**
+ * Send message that a route is broken between @a peer1 and @a peer2.
+ *
+ * @param target where to send the message
+ * @param cid connection identifier to use
+ * @param peer1 one of the peers where a link is broken
+ * @param peer2 another one of the peers where a link is broken
+ */
+static void
+send_broken (struct RouteDirection *target,
+             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+             const struct GNUNET_PeerIdentity *peer1,
+             const struct GNUNET_PeerIdentity *peer2)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+  if (NULL == target->mqm)
+    return; /* Can't send notification, connection is down! */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Notifying %s about BROKEN route at %s-%s of connection %s\n",
+       GCP_2s (target->hop),
+       GNUNET_i2s (peer1),
+       GNUNET_i2s2 (peer2),
+       GNUNET_sh2s (&cid->connection_of_tunnel));
+
+  env = GNUNET_MQ_msg (bm,
+                       GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+  bm->cid = *cid;
+  if (NULL != peer1)
+    bm->peer1 = *peer1;
+  if (NULL != peer2)
+    bm->peer2 = *peer2;
+
+  GCP_request_mq_cancel (target->mqm,
+                         env);
+  target->mqm = NULL;
+}
+
+
+/**
+ * Function called when the message queue to the previous hop
+ * becomes available/unavailable.  We expect this function to
+ * be called immediately when we register, and then again
+ * later if the connection ever goes down.
+ *
+ * @param cls the `struct RouteDirection`
+ * @param available #GNUNET_YES if sending is now possible,
+ *                  #GNUNET_NO if sending is no longer possible
+ *                  #GNUNET_SYSERR if sending is no longer possible
+ *                                 and the last envelope was discarded
+ */
+static void
+dir_ready_cb (void *cls,
+              int ready)
+{
+  struct RouteDirection *dir = cls;
+  struct CadetRoute *route = dir->my_route;
+  struct RouteDirection *odir;
+
+  if (GNUNET_YES == ready)
+  {
+    struct GNUNET_MQ_Envelope *env;
+
+    dir->is_ready = GNUNET_YES;
+    if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+    {
+      dir->out_buffer[dir->out_rpos] = NULL;
+      dir->out_rpos++;
+      if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+        dir->out_rpos = 0;
+      dir->is_ready = GNUNET_NO;
+      GCP_send (dir->mqm,
+                env);
+    }
+    return;
+  }
+  odir = (dir == &route->next) ? &route->prev : &route->next;
+  send_broken (&route->next,
+               &route->cid,
+               GCP_get_id (odir->hop),
+               &my_full_id);
+  destroy_route (route);
+}
+
+
+/**
+ * Initialize one of the directions of a route.
+ *
+ * @param route route the direction belongs to
+ * @param dir direction to initialize
+ * @param hop next hop on in the @a dir
+ */
+static void
+dir_init (struct RouteDirection *dir,
+          struct CadetRoute *route,
+          struct CadetPeer *hop)
+{
+  dir->hop = hop;
+  dir->my_route = route;
+  dir->mqm = GCP_request_mq (hop,
+                             &dir_ready_cb,
+                             dir);
+  GNUNET_assert (GNUNET_YES == dir->is_ready);
+}
+
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_connection_create (void *cls,
+                          const struct GNUNET_CADET_ConnectionCreateMessage *msg)
+{
+  struct CadetPeer *sender = cls;
+  struct CadetPeer *next;
+  const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
+  struct CadetRoute *route;
+  uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
+  unsigned int path_length;
+  unsigned int off;
+
+  path_length = size / sizeof (struct GNUNET_PeerIdentity);
+  /* Initiator is at offset 0. */
+  for (off=1;off<path_length;off++)
+    if (0 == memcmp (&my_full_id,
+                     &pids[off],
+                     sizeof (struct GNUNET_PeerIdentity)))
+      break;
+  if (off == path_length)
+  {
+    /* We are not on the path, bogus request */
+    GNUNET_break_op (0);
+    return;
+  }
+  /* Check previous hop */
+  if (sender != GCP_get (&pids[off - 1],
+                         GNUNET_NO))
+  {
+    /* sender is not on the path, not allowed */
+    GNUNET_break_op (0);
+    return;
+  }
+  if (NULL !=
+      get_route (&msg->cid))
+  {
+    /* Duplicate CREATE, pass it on, previous one might have been lost! */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    route_message (sender,
+                   &msg->cid,
+                   &msg->header);
+    return;
+  }
+  if (off == path_length - 1)
+  {
+    /* We are the destination, create connection */
+    struct CadetConnection *cc;
+    struct CadetPeerPath *path;
+    struct CadetPeer *origin;
+
+    cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                             &msg->cid.connection_of_tunnel);
+    if (NULL != cc)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
+           GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+      GCC_handle_duplicate_create (cc);
+      return;
+    }
+
+    origin = GCP_get (&pids[0],
+                      GNUNET_YES);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
+         GCP_2s (origin),
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    path = GCPP_get_path_from_route (path_length - 1,
+                                     pids);
+    if (GNUNET_OK !=
+        GCT_add_inbound_connection (GCP_get_tunnel (origin,
+                                                    GNUNET_YES),
+                                    &msg->cid,
+                                    path))
+    {
+      /* Send back BROKEN: duplicate connection on the same path,
+         we will use the other one. */
+      struct GNUNET_MQ_Envelope *env;
+      struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
+           GCP_2s (sender),
+           GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+           GCPP_2s (path));
+      env = GNUNET_MQ_msg (bm,
+                           GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+      bm->cid = msg->cid;
+      bm->peer1 = my_full_id;
+      GCP_send_ooo (sender,
+                    env);
+      return;
+    }
+    return;
+  }
+  /* We are merely a hop on the way, check if we can support the route */
+  next = GCP_get (&pids[off + 1],
+                  GNUNET_NO);
+  if ( (NULL == next) ||
+       (GNUNET_NO == GCP_has_core_connection (next)) )
+  {
+    /* unworkable, send back BROKEN notification */
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
+         GCP_2s (sender),
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+         GNUNET_i2s (&pids[off + 1]),
+         off + 1);
+    env = GNUNET_MQ_msg (bm,
+                         GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+    bm->cid = msg->cid;
+    bm->peer1 = pids[off + 1];
+    bm->peer2 = my_full_id;
+    GCP_send_ooo (sender,
+                  env);
+    return;
+  }
+
+  /* Workable route, create routing entry */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
+       GCP_2s (sender),
+       GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+       GNUNET_i2s (&pids[off + 1]),
+       off + 1);
+  route = GNUNET_new (struct CadetRoute);
+  route->cid = msg->cid;
+  dir_init (&route->prev,
+            route,
+            sender);
+  dir_init (&route->next,
+            route,
+            next);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multishortmap_put (routes,
+                                                     &route->cid.connection_of_tunnel,
+                                                     route,
+                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+}
+
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_connection_create_ack (void *cls,
+                              const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
+{
+  struct CadetPeer *peer = cls;
+  struct CadetConnection *cc;
+
+  /* First, check if ACK belongs to a connection that ends here. */
+  cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                           &msg->cid.connection_of_tunnel);
+  if (NULL != cc)
+  {
+    /* verify ACK came from the right direction */
+    struct CadetPeerPath *path = GCC_get_path (cc);
+
+    if (peer !=
+        GCPP_get_peer_at_offset (path,
+                                 0))
+    {
+      /* received ACK from unexpected direction, ignore! */
+      GNUNET_break_op (0);
+      return;
+    }
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CONNECTION_CREATE_ACK for connection %s.\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    GCC_handle_connection_create_ack (cc);
+    return;
+  }
+
+  /* We're just an intermediary peer, route the message along its path */
+  route_message (peer,
+                 &msg->cid,
+                 &msg->header);
+}
+
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ * @deprecated duplicate logic with #handle_destroy(); dedup!
+ */
+static void
+handle_connection_broken (void *cls,
+                          const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
+{
+  struct CadetPeer *peer = cls;
+  struct CadetConnection *cc;
+  struct CadetRoute *route;
+
+  /* First, check if message belongs to a connection that ends here. */
+  cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                           &msg->cid.connection_of_tunnel);
+  if (NULL != cc)
+  {
+    /* verify message came from the right direction */
+    struct CadetPeerPath *path = GCC_get_path (cc);
+
+    if (peer !=
+        GCPP_get_peer_at_offset (path,
+                                 0))
+    {
+      /* received message from unexpected direction, ignore! */
+      GNUNET_break_op (0);
+      return;
+    }
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+    GCC_destroy_without_core (cc);
+
+    /* FIXME: also destroy the path up to the specified link! */
+    return;
+  }
+
+  /* We're just an intermediary peer, route the message along its path */
+  route = get_route (&msg->cid);
+  route_message (peer,
+                 &msg->cid,
+                 &msg->header);
+  destroy_route (route);
+  /* FIXME: also destroy paths we MAY have up to the specified link! */
+}
+
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_connection_destroy (void *cls,
+                           const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
+{
+  struct CadetPeer *peer = cls;
+  struct CadetConnection *cc;
+  struct CadetRoute *route;
+
+  /* First, check if message belongs to a connection that ends here. */
+  cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                           &msg->cid.connection_of_tunnel);
+  if (NULL != cc)
+  {
+    /* verify message came from the right direction */
+    struct CadetPeerPath *path = GCC_get_path (cc);
+
+    if (peer !=
+        GCPP_get_peer_at_offset (path,
+                                 0))
+    {
+      /* received message from unexpected direction, ignore! */
+      GNUNET_break_op (0);
+      return;
+    }
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
+         GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+
+    GCC_destroy_without_core (cc);
+    return;
+  }
+
+  /* We're just an intermediary peer, route the message along its path */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
+       GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+  route = get_route (&msg->cid);
+  route_message (peer,
+                 &msg->cid,
+                 &msg->header);
+  destroy_route (route);
+}
+
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_tunnel_kx (void *cls,
+                  const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
+{
+  struct CadetPeer *peer = cls;
+  struct CadetConnection *cc;
+
+  /* First, check if message belongs to a connection that ends here. */
+  cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                           &msg->cid.connection_of_tunnel);
+  if (NULL != cc)
+  {
+    /* verify message came from the right direction */
+    struct CadetPeerPath *path = GCC_get_path (cc);
+
+    if (peer !=
+        GCPP_get_peer_at_offset (path,
+                                 0))
+    {
+      /* received message from unexpected direction, ignore! */
+      GNUNET_break_op (0);
+      return;
+    }
+    GCC_handle_kx (cc,
+                   msg);
+    return;
+  }
+
+  /* We're just an intermediary peer, route the message along its path */
+  route_message (peer,
+                 &msg->cid,
+                 &msg->header);
+}
+
+
+/**
+ * Check if the encrypted message has the appropriate size.
+ *
+ * @param cls Closure (unused).
+ * @param msg Message to check.
+ *
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
+ */
+static int
+check_tunnel_encrypted (void *cls,
+                        const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
+{
+  return GNUNET_YES;
+}
+
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_tunnel_encrypted (void *cls,
+                         const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
+{
+  struct CadetPeer *peer = cls;
+  struct CadetConnection *cc;
+
+  /* First, check if message belongs to a connection that ends here. */
+  cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                           &msg->cid.connection_of_tunnel);
+  if (NULL != cc)
+  {
+    /* verify message came from the right direction */
+    struct CadetPeerPath *path = GCC_get_path (cc);
+
+    if (peer !=
+        GCPP_get_peer_at_offset (path,
+                                 0))
+    {
+      /* received message from unexpected direction, ignore! */
+      GNUNET_break_op (0);
+      return;
+    }
+    GCC_handle_encrypted (cc,
+                          msg);
+    return;
+  }
+  /* We're just an intermediary peer, route the message along its path */
+  route_message (peer,
+                 &msg->cid,
+                 &msg->header);
+}
+
 
 /**
  * Function called after #GNUNET_CORE_connect has succeeded (or failed
@@ -79,6 +835,9 @@ core_connect_cb (void *cls,
 {
   struct CadetPeer *cp;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "CORE connection to peer %s was established.\n",
+       GNUNET_i2s (peer));
   cp = GCP_get (peer,
                 GNUNET_YES);
   GCP_set_mq (cp,
@@ -100,6 +859,9 @@ core_disconnect_cb (void *cls,
 {
   struct CadetPeer *cp = peer_cls;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "CORE connection to peer %s went down.\n",
+       GNUNET_i2s (peer));
   GCP_set_mq (cp,
               NULL);
 }
@@ -114,8 +876,35 @@ void
 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
 {
   struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (connection_create,
+                           GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
+                           struct GNUNET_CADET_ConnectionCreateMessage,
+                           NULL),
+    GNUNET_MQ_hd_fixed_size (connection_create_ack,
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
+                             struct GNUNET_CADET_ConnectionCreateAckMessage,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (connection_broken,
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
+                             struct GNUNET_CADET_ConnectionBrokenMessage,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (connection_destroy,
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
+                             struct GNUNET_CADET_ConnectionDestroyMessage,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (tunnel_kx,
+                             GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
+                             struct GNUNET_CADET_TunnelKeyExchangeMessage,
+                             NULL),
+    GNUNET_MQ_hd_var_size (tunnel_encrypted,
+                           GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
+                           struct GNUNET_CADET_TunnelEncryptedMessage,
+                           NULL),
     GNUNET_MQ_handler_end ()
   };
+
+  routes = GNUNET_CONTAINER_multishortmap_create (1024,
+                                                  GNUNET_NO);
   core = GNUNET_CORE_connect (c,
                               NULL,
                               &core_init_cb,
@@ -136,6 +925,8 @@ GCO_shutdown ()
     GNUNET_CORE_disconnect (core);
     core = NULL;
   }
+  GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
+  GNUNET_CONTAINER_multishortmap_destroy (routes);
 }
 
 /* end of gnunet-cadet-service_core.c */