pass only CadetTunnelAxolotl if it suffices, preparation for having ambiguous KX...
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_connection.c
index 5123f9d458223612242d3833849408be03a70c7a..a098674f51643affb4c5053a6c23f4c5ef59703a 100644 (file)
  * @author Christian Grothoff
  *
  * TODO:
- * - congestion control
- * - GCC_debug()
- * - keepalive messages
- * - performance metrics
- * - back-off reset
+ * - Optimization: keep per-connection performance metrics (?)
  */
 #include "platform.h"
+#include "gnunet-service-cadet-new.h"
 #include "gnunet-service-cadet-new_channel.h"
 #include "gnunet-service-cadet-new_connection.h"
 #include "gnunet-service-cadet-new_paths.h"
 #include "gnunet-service-cadet-new_peer.h"
 #include "gnunet-service-cadet-new_tunnels.h"
 #include "gnunet_cadet_service.h"
+#include "gnunet_statistics_service.h"
 #include "cadet_protocol.h"
 
 
+#define LOG(level, ...) GNUNET_log_from(level,"cadet-con",__VA_ARGS__)
+
+
 /**
  * All the states a connection can be in.
  */
@@ -64,19 +65,16 @@ enum CadetConnectionState
   CADET_CONNECTION_SENT,
 
   /**
-   * Connection confirmed, ready to carry traffic.
+   * We are an inbound connection, and received a CREATE. Need to
+   * send an CREATE_ACK back.
    */
-  CADET_CONNECTION_READY,
+  CADET_CONNECTION_CREATE_RECEIVED,
 
   /**
-   * Connection to be destroyed, just waiting to empty queues.
+   * Connection confirmed, ready to carry traffic.
    */
-  CADET_CONNECTION_DESTROYED,
+  CADET_CONNECTION_READY
 
-  /**
-   * Connection to be destroyed because of a distant peer, same as DESTROYED.
-   */
-  CADET_CONNECTION_BROKEN
 };
 
 
@@ -111,11 +109,6 @@ struct CadetConnection
    */
   struct GNUNET_MQ_Envelope *env;
 
-  /**
-   * Message queue to the first hop, or NULL if we have no connection yet.
-   */
-  struct GNUNET_MQ_Handle *mq;
-
   /**
    * Handle for calling #GCP_request_mq_cancel() once we are finished.
    */
@@ -126,10 +119,15 @@ struct CadetConnection
    */
   struct GNUNET_SCHEDULER_Task *task;
 
+  /**
+   * Queue entry for keepalive messages.
+   */
+  struct CadetTunnelQueueEntry *keepalive_qe;
+
   /**
    * Function to call once we are ready to transmit.
    */
-  GNUNET_SCHEDULER_TaskCallback ready_cb;
+  GCC_ReadyCallback ready_cb;
 
   /**
    * Closure for @e ready_cb.
@@ -151,64 +149,100 @@ struct CadetConnection
    */
   unsigned int off;
 
+  /**
+   * Are we ready to transmit via @e mq_man right now?
+   */
+  int mqm_ready;
+
 };
 
 
 /**
- * Is the given connection currently ready for transmission?
+ * Destroy a connection, part of the internal implementation.  Called
+ * only from #GCC_destroy_from_core() or #GCC_destroy_from_tunnel().
  *
- * @param cc connection to transmit on
- * @return #GNUNET_YES if we could transmit
+ * @param cc connection to destroy
  */
-int
-GCC_is_ready (struct CadetConnection *cc)
+static void
+GCC_destroy (struct CadetConnection *cc)
 {
-  return ( (NULL != cc->mq) &&
-           (CADET_CONNECTION_READY == cc->state) &&
-           (NULL == cc->env) ) ? GNUNET_YES : GNUNET_NO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Destroying %s\n",
+       GCC_2s (cc));
+  if (NULL != cc->mq_man)
+  {
+    GCP_request_mq_cancel (cc->mq_man,
+                           NULL);
+    cc->mq_man = NULL;
+  }
+  if (NULL != cc->task)
+  {
+    GNUNET_SCHEDULER_cancel (cc->task);
+    cc->task = NULL;
+  }
+  if (NULL != cc->keepalive_qe)
+  {
+    GCT_send_cancel (cc->keepalive_qe);
+    cc->keepalive_qe = NULL;
+  }
+  GCPP_del_connection (cc->path,
+                       cc->off,
+                       cc);
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multishortmap_remove (connections,
+                                                        &GCC_get_id (cc)->connection_of_tunnel,
+                                                        cc));
+  GNUNET_free (cc);
 }
 
 
+
 /**
- * Destroy a connection.
+ * Destroy a connection, called when the CORE layer is already done
+ * (i.e. has received a BROKEN message), but if we still have to
+ * communicate the destruction of the connection to the tunnel (if one
+ * exists).
  *
  * @param cc connection to destroy
  */
 void
-GCC_destroy (struct CadetConnection *cc)
+GCC_destroy_without_core (struct CadetConnection *cc)
 {
-  if (NULL != cc->env)
+  if (NULL != cc->ct)
   {
-    if (NULL != cc->mq)
-      GNUNET_MQ_send_cancel (cc->env);
-    else
-      GNUNET_MQ_discard (cc->env);
-    cc->env = NULL;
+    GCT_connection_lost (cc->ct);
+    cc->ct = NULL;
   }
-  if ( (NULL != cc->mq) &&
-       (CADET_CONNECTION_SENDING_CREATE != cc->state) )
+  GCC_destroy (cc);
+}
+
+
+/**
+ * Destroy a connection, called if the tunnel association with the
+ * connection was already broken, but we still need to notify the CORE
+ * layer about the breakage.
+ *
+ * @param cc connection to destroy
+ */
+void
+GCC_destroy_without_tunnel (struct CadetConnection *cc)
+{
+  cc->ct = NULL;
+  if ( (CADET_CONNECTION_SENDING_CREATE != cc->state) &&
+       (NULL != cc->mq_man) )
   {
-    /* Need to notify next hop that we are down. */
     struct GNUNET_MQ_Envelope *env;
     struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg;
 
+    /* Need to notify next hop that we are down. */
     env = GNUNET_MQ_msg (destroy_msg,
                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
     destroy_msg->cid = cc->cid;
-    GNUNET_MQ_send (cc->mq,
-                    env);
+    GCP_request_mq_cancel (cc->mq_man,
+                           env);
+    cc->mq_man = NULL;
   }
-  cc->mq = NULL;
-  GCP_request_mq_cancel (cc->mq_man);
-  cc->mq_man = NULL;
-  GCPP_del_connection (cc->path,
-                       cc->off,
-                       cc);
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multishortmap_remove (connections,
-                                                        &GCC_get_id (cc)->connection_of_tunnel,
-                                                        cc));
-  GNUNET_free (cc);
+  GCC_destroy (cc);
 }
 
 
@@ -226,22 +260,101 @@ GCC_get_ct (struct CadetConnection *cc)
 
 
 /**
- * A connection ACK was received for this connection, implying
+ * Send a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_KEEPALIVE through the
+ * tunnel to prevent it from timing out.
+ *
+ * @param cls the `struct CadetConnection` to keep alive.
+ */
+static void
+send_keepalive (void *cls);
+
+
+/**
+ * Keepalive was transmitted.  Remember this, and possibly
+ * schedule the next one.
+ *
+ * @param cls the `struct CadetConnection` to keep alive.
+ */
+static void
+keepalive_done (void *cls)
+{
+  struct CadetConnection *cc = cls;
+
+  cc->keepalive_qe = NULL;
+  if ( (GNUNET_YES == cc->mqm_ready) &&
+       (NULL == cc->task) )
+    cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period,
+                                             &send_keepalive,
+                                             cc);
+}
+
+
+/**
+ * Send a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_KEEPALIVE through the
+ * tunnel to prevent it from timing out.
+ *
+ * @param cls the `struct CadetConnection` to keep alive.
+ */
+static void
+send_keepalive (void *cls)
+{
+  struct CadetConnection *cc = cls;
+  struct GNUNET_MessageHeader msg;
+
+  cc->task = NULL;
+  GNUNET_assert (NULL != cc->ct);
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+  GNUNET_assert (NULL == cc->keepalive_qe);
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Sending KEEPALIVE on behalf of %s via %s\n",
+       GCC_2s (cc),
+       GCT_2s (cc->ct->t));
+  GNUNET_STATISTICS_update (stats,
+                            "# keepalives sent",
+                            1,
+                            GNUNET_NO);
+  msg.size = htons (sizeof (msg));
+  msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_KEEPALIVE);
+
+  cc->keepalive_qe
+    = GCT_send (cc->ct->t,
+                &msg,
+                &keepalive_done,
+                cc);
+}
+
+
+/**
+ * A #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK was received for this connection, implying
  * that the end-to-end connection is up.  Process it.
  *
  * @param cc the connection that got the ACK.
  */
 void
-GCC_handle_connection_ack (struct CadetConnection *cc)
+GCC_handle_connection_create_ack (struct CadetConnection *cc)
 {
-  GNUNET_SCHEDULER_cancel (cc->task);
-#if FIXME
-  cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period,
-                                           &send_keepalive,
-                                           cc);
-#endif
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received CADET_CONNECTION_CREATE_ACK for %s in state %d (%s)\n",
+       GCC_2s (cc),
+       cc->state,
+       (GNUNET_YES == cc->mqm_ready) ? "MQM ready" : "MQM busy");
+  if (NULL != cc->task)
+  {
+    GNUNET_SCHEDULER_cancel (cc->task);
+    cc->task = NULL;
+  }
   cc->state = CADET_CONNECTION_READY;
-  cc->ready_cb (cc->ready_cb_cls);
+  if (GNUNET_YES == cc->mqm_ready)
+  {
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_YES);
+    if ( (NULL == cc->keepalive_qe) &&
+         (GNUNET_YES == cc->mqm_ready) &&
+         (NULL == cc->task) )
+      cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period,
+                                               &send_keepalive,
+                                               cc);
+  }
 }
 
 
@@ -255,6 +368,15 @@ void
 GCC_handle_kx (struct CadetConnection *cc,
                const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
 {
+  if (CADET_CONNECTION_SENT == cc->state)
+  {
+    /* We didn't get the CADET_CONNECTION_CREATE_ACK, but instead got payload. That's fine,
+       clearly something is working, so pretend we got an ACK. */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Faking connection CADET_CONNECTION_CREATE_ACK for %s due to KX\n",
+         GCC_2s (cc));
+    GCC_handle_connection_create_ack (cc);
+  }
   GCT_handle_kx (cc->ct,
                  msg);
 }
@@ -270,72 +392,124 @@ void
 GCC_handle_encrypted (struct CadetConnection *cc,
                       const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
 {
+  if (CADET_CONNECTION_SENT == cc->state)
+  {
+    /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
+       clearly something is working, so pretend we got an ACK. */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Faking connection ACK for %s due to ENCRYPTED payload\n",
+         GCC_2s (cc));
+    GCC_handle_connection_create_ack (cc);
+  }
   GCT_handle_encrypted (cc->ct,
                         msg);
 }
 
 
 /**
- * Send a CREATE message to the first hop.
+ * Send a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE message to the
+ * first hop.
  *
  * @param cls the `struct CadetConnection` to initiate
  */
 static void
-send_create (void *cls);
-
-
-/**
- * We finished transmission of the create message, now wait for
- * ACK or retransmit.
- *
- * @param cls the `struct CadetConnection` that sent the create message
- */
-static void
-transmit_create_done_cb (void *cls)
+send_create (void *cls)
 {
   struct CadetConnection *cc = cls;
+  struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
+  struct GNUNET_PeerIdentity *pids;
+  struct GNUNET_MQ_Envelope *env;
+  unsigned int path_length;
 
+  cc->task = NULL;
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+  path_length = GCPP_get_length (cc->path);
+  env = GNUNET_MQ_msg_extra (create_msg,
+                             (1 + path_length) * sizeof (struct GNUNET_PeerIdentity),
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
+  create_msg->cid = cc->cid;
+  pids = (struct GNUNET_PeerIdentity *) &create_msg[1];
+  pids[0] = my_full_id;
+  for (unsigned int i=0;i<path_length;i++)
+    pids[i + 1] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
+                                                        i));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Sending CADET_CONNECTION_CREATE message for %s\n",
+       GCC_2s (cc));
+  cc->env = env;
+  cc->mqm_ready = GNUNET_NO;
   cc->state = CADET_CONNECTION_SENT;
-  cc->env = NULL;
-  /* FIXME: at some point, we need to reset the delay back to 0! */
-  cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
-  cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
-                                           &send_create,
-                                           cc);
+  GCP_send (cc->mq_man,
+            env);
 }
 
 
 /**
- * Send a CREATE message to the first hop.
+ * Send a CREATE_ACK message towards the origin.
  *
  * @param cls the `struct CadetConnection` to initiate
  */
 static void
-send_create (void *cls)
+send_create_ack (void *cls)
 {
   struct CadetConnection *cc = cls;
-  struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
-  struct GNUNET_PeerIdentity *pids;
+  struct GNUNET_CADET_ConnectionCreateAckMessage *ack_msg;
   struct GNUNET_MQ_Envelope *env;
-  unsigned int path_length;
 
   cc->task = NULL;
-  GNUNET_assert (NULL != cc->mq);
-  path_length = GCPP_get_length (cc->path);
-  env = GNUNET_MQ_msg_extra (create_msg,
-                             path_length * sizeof (struct GNUNET_PeerIdentity),
-                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
-  create_msg->cid = cc->cid;
-  pids = (struct GNUNET_PeerIdentity *) &create_msg[1];
-  for (unsigned int i=0;i<path_length;i++)
-    pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
-                                                    i));
+  GNUNET_assert (CADET_CONNECTION_CREATE_RECEIVED == cc->state);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Sending CONNECTION_CREATE_ACK message for %s\n",
+       GCC_2s (cc));
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+  env = GNUNET_MQ_msg (ack_msg,
+                       GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK);
+  ack_msg->cid = cc->cid;
   cc->env = env;
-  GNUNET_MQ_notify_sent (env,
-                         &transmit_create_done_cb,
-                         cc);
-  GNUNET_MQ_send (cc->mq,
-                  env);
+  cc->mqm_ready = GNUNET_NO;
+  cc->state = CADET_CONNECTION_READY;
+  GCP_send (cc->mq_man,
+            env);
+}
+
+
+/**
+ * We got a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE for a
+ * connection that we already have.  Either our ACK got lost
+ * or something is fishy.  Consider retransmitting the ACK.
+ *
+ * @param cc connection that got the duplicate CREATE
+ */
+void
+GCC_handle_duplicate_create (struct CadetConnection *cc)
+{
+  if (GNUNET_YES == cc->mqm_ready)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Got duplicate CREATE for %s, scheduling another ACK (%s)\n",
+         GCC_2s (cc),
+         (GNUNET_YES == cc->mqm_ready) ? "MQM ready" : "MQM busy");
+    /* Tell tunnel that we are not ready for transmission anymore
+       (until CREATE_ACK is done) */
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_NO);
+    /* Revert back to the state of having only received the 'CREATE',
+       and immediately proceed to send the CREATE_ACK. */
+    cc->state = CADET_CONNECTION_CREATE_RECEIVED;
+    if (NULL != cc->task)
+      GNUNET_SCHEDULER_cancel (cc->task);
+    cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
+                                         cc);
+  }
+  else
+  {
+    /* We are currently sending something else back, which
+       can only be an ACK or payload, either of which would
+       do. So actually no need to do anything. */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Got duplicate CREATE for %s. MQ is busy, not queueing another ACK\n",
+         GCC_2s (cc));
+  }
 }
 
 
@@ -344,34 +518,82 @@ send_create (void *cls)
  * peer at the first hop.  Adjust accordingly.
  *
  * @param cls the `struct CadetConnection`
- * @param mq NULL if the CORE connection was lost, non-NULL if
- *           it became available
+ * @param available #GNUNET_YES if sending is now possible,
+ *                  #GNUNET_NO if sending is no longer possible
+ *                  #GNUNET_SYSERR if sending is no longer possible
+ *                                 and the last envelope was discarded
  */
 static void
 manage_first_hop_mq (void *cls,
-                     struct GNUNET_MQ_Handle *mq)
+                     int available)
 {
   struct CadetConnection *cc = cls;
 
-  if (NULL == mq)
+  if (GNUNET_YES != available)
   {
     /* Connection is down, for now... */
-    cc->mq = NULL;
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Core MQ for %s went down\n",
+         GCC_2s (cc));
+    cc->mqm_ready = GNUNET_NO;
+    cc->state = CADET_CONNECTION_NEW;
+    cc->retry_delay = GNUNET_TIME_UNIT_ZERO;
     if (NULL != cc->task)
     {
       GNUNET_SCHEDULER_cancel (cc->task);
       cc->task = NULL;
     }
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_NO);
     return;
   }
 
-  cc->mq = mq;
-  cc->state = CADET_CONNECTION_SENDING_CREATE;
-
-  /* Now repeat sending connection creation messages
-     down the path, until we get an ACK! */
-  cc->task = GNUNET_SCHEDULER_add_now (&send_create,
-                                       cc);
+  cc->mqm_ready = GNUNET_YES;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Core MQ for %s became available in state %d\n",
+       GCC_2s (cc),
+       cc->state);
+  switch (cc->state)
+  {
+  case CADET_CONNECTION_NEW:
+    /* Transmit immediately */
+    cc->task = GNUNET_SCHEDULER_add_now (&send_create,
+                                         cc);
+    break;
+  case CADET_CONNECTION_SENDING_CREATE:
+    /* Should not be possible to be called in this state. */
+    GNUNET_assert (0);
+    break;
+  case CADET_CONNECTION_SENT:
+    /* Retry a bit later... */
+    cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
+    cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
+                                             &send_create,
+                                             cc);
+    break;
+  case CADET_CONNECTION_CREATE_RECEIVED:
+    /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */
+    cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
+                                         cc);
+    break;
+  case CADET_CONNECTION_READY:
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_YES);
+    if ( (NULL == cc->keepalive_qe) &&
+         (GNUNET_YES == cc->mqm_ready) &&
+         (NULL == cc->task) )
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Scheduling keepalive for %s in %s\n",
+           GCC_2s (cc),
+           GNUNET_STRINGS_relative_time_to_string (keepalive_period,
+                                                   GNUNET_YES));
+      cc->task = GNUNET_SCHEDULER_add_delayed (keepalive_period,
+                                               &send_keepalive,
+                                               cc);
+    }
+    break;
+  }
 }
 
 
@@ -383,6 +605,7 @@ manage_first_hop_mq (void *cls,
  * @param destination where to go
  * @param path which path to take (may not be the full path)
  * @param ct which tunnel uses this connection
+ * @param init_state initial state for the connection
  * @param ready_cb function to call when ready to transmit
  * @param ready_cb_cls closure for @a cb
  * @return handle to the connection
@@ -392,7 +615,8 @@ connection_create (struct CadetPeer *destination,
                    struct CadetPeerPath *path,
                    struct CadetTConnection *ct,
                    const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
-                   GNUNET_SCHEDULER_TaskCallback ready_cb,
+                   enum CadetConnectionState init_state,
+                   GCC_ReadyCallback ready_cb,
                    void *ready_cb_cls)
 {
   struct CadetConnection *cc;
@@ -403,6 +627,7 @@ connection_create (struct CadetPeer *destination,
                         destination);
   GNUNET_assert (UINT_MAX > off);
   cc = GNUNET_new (struct CadetConnection);
+  cc->state = init_state;
   cc->ct = ct;
   cc->cid = *cid;
   GNUNET_assert (GNUNET_OK ==
@@ -414,6 +639,10 @@ connection_create (struct CadetPeer *destination,
   cc->ready_cb_cls = ready_cb_cls;
   cc->path = path;
   cc->off = off;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Creating %s using path %s\n",
+       GCC_2s (cc),
+       GCPP_2s (path));
   GCPP_add_connection (path,
                        off,
                        cc);
@@ -441,26 +670,72 @@ connection_create (struct CadetPeer *destination,
  * @param ct which tunnel uses this connection
  * @param ready_cb function to call when ready to transmit
  * @param ready_cb_cls closure for @a cb
- * @return handle to the connection
+ * @return handle to the connection, NULL if we already have
+ *         a connection that takes precedence on @a path
  */
 struct CadetConnection *
 GCC_create_inbound (struct CadetPeer *destination,
                     struct CadetPeerPath *path,
                     struct CadetTConnection *ct,
                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
-                    GNUNET_SCHEDULER_TaskCallback ready_cb,
+                    GCC_ReadyCallback ready_cb,
                     void *ready_cb_cls)
 {
   struct CadetConnection *cc;
+  unsigned int off;
 
-  cc = connection_create (destination,
-                          path,
-                          ct,
-                          cid,
-                          ready_cb,
-                          ready_cb_cls);
-  /* FIXME: send CREATE_ACK? */
-  return cc;
+  off = GCPP_find_peer (path,
+                        destination);
+  GNUNET_assert (UINT_MAX != off);
+  cc = GCPP_get_connection (path,
+                            destination,
+                            off);
+  if (NULL != cc)
+  {
+    int cmp;
+
+    cmp = memcmp (cid,
+                  &cc->cid,
+                  sizeof (*cid));
+    if (0 == cmp)
+    {
+      /* Two peers picked the SAME random connection identifier at the
+         same time for the same path? Must be malicious.  Drop
+         connection (existing and inbound), even if it is the only
+         one. */
+      GNUNET_break_op (0);
+      GCT_connection_lost (cc->ct);
+      GCC_destroy_without_tunnel (cc);
+      return NULL;
+    }
+    if (0 < cmp)
+    {
+      /* drop existing */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Got two connections on %s, dropping my existing %s\n",
+           GCPP_2s (path),
+           GCC_2s (cc));
+      GCT_connection_lost (cc->ct);
+      GCC_destroy_without_tunnel (cc);
+    }
+    else
+    {
+      /* keep existing */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Got two connections on %s, keeping my existing %s\n",
+           GCPP_2s (path),
+           GCC_2s (cc));
+      return NULL;
+    }
+  }
+
+  return connection_create (destination,
+                            path,
+                            ct,
+                            cid,
+                            CADET_CONNECTION_CREATE_RECEIVED,
+                            ready_cb,
+                            ready_cb_cls);
 }
 
 
@@ -479,41 +754,21 @@ struct CadetConnection *
 GCC_create (struct CadetPeer *destination,
             struct CadetPeerPath *path,
             struct CadetTConnection *ct,
-            GNUNET_SCHEDULER_TaskCallback ready_cb,
+            GCC_ReadyCallback ready_cb,
             void *ready_cb_cls)
 {
   struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
-  struct CadetConnection *cc;
 
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &cid,
                               sizeof (cid));
-  cc = connection_create (destination,
-                          path,
-                          ct,
-                          &cid,
-                          ready_cb,
-                          ready_cb_cls);
-  /* FIXME: send CREATE? */
-  return cc;
-}
-
-
-/**
- * We finished transmission of a message, if we are still ready, tell
- * the tunnel!
- *
- * @param cls our `struct CadetConnection`
- */
-static void
-transmit_done_cb (void *cls)
-{
-  struct CadetConnection *cc = cls;
-
-  cc->env = NULL;
-  if ( (NULL != cc->mq) &&
-       (CADET_CONNECTION_READY == cc->state) )
-    cc->ready_cb (cc->ready_cb_cls);
+  return connection_create (destination,
+                            path,
+                            ct,
+                            &cid,
+                            CADET_CONNECTION_NEW,
+                            ready_cb,
+                            ready_cb_cls);
 }
 
 
@@ -524,21 +779,26 @@ transmit_done_cb (void *cls)
  * connection is right now ready for transmission.
  *
  * @param cc connection identification
- * @param env envelope with message to transmit
+ * @param env envelope with message to transmit; must NOT
+ *            yet have a #GNUNET_MQ_notify_sent() callback attached to it
  */
 void
 GCC_transmit (struct CadetConnection *cc,
               struct GNUNET_MQ_Envelope *env)
 {
-  GNUNET_assert (NULL == cc->env);
-  cc->env = env;
-  GNUNET_MQ_notify_sent (env,
-                         &transmit_done_cb,
-                         cc);
-  if ( (NULL != cc->mq) &&
-       (CADET_CONNECTION_READY == cc->state) )
-    GNUNET_MQ_send (cc->mq,
-                    env);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Scheduling message for transmission on %s\n",
+       GCC_2s (cc));
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+  GNUNET_assert (CADET_CONNECTION_READY == cc->state);
+  cc->mqm_ready = GNUNET_NO;
+  if (NULL != cc->task)
+  {
+    GNUNET_SCHEDULER_cancel (cc->task);
+    cc->task = NULL;
+  }
+  GCP_send (cc->mq_man,
+            env);
 }
 
 
@@ -568,6 +828,39 @@ GCC_get_id (struct CadetConnection *cc)
 }
 
 
+/**
+ * Get a (static) string for a connection.
+ *
+ * @param cc Connection.
+ */
+const char *
+GCC_2s (const struct CadetConnection *cc)
+{
+  static char buf[128];
+
+  if (NULL == cc)
+    return "Connection(NULL)";
+
+  if (NULL != cc->ct)
+  {
+    GNUNET_snprintf (buf,
+                     sizeof (buf),
+                     "Connection %s (%s)",
+                     GNUNET_sh2s (&cc->cid.connection_of_tunnel),
+                     GCT_2s (cc->ct->t));
+    return buf;
+  }
+  GNUNET_snprintf (buf,
+                   sizeof (buf),
+                   "Connection %s",
+                   GNUNET_sh2s (&cc->cid.connection_of_tunnel));
+  return buf;
+}
+
+
+#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
+
+
 /**
  * Log connection info.
  *
@@ -578,7 +871,26 @@ void
 GCC_debug (struct CadetConnection *cc,
            enum GNUNET_ErrorType level)
 {
-  GNUNET_break (0); // FIXME: implement...
+  int do_log;
+
+  do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
+                                       "cadet-con",
+                                       __FILE__, __FUNCTION__, __LINE__);
+  if (0 == do_log)
+    return;
+  if (NULL == cc)
+  {
+    LOG2 (level,
+          "Connection (NULL)\n");
+    return;
+  }
+  LOG2 (level,
+        "%s to %s via path %s in state %d is %s\n",
+        GCC_2s (cc),
+        GCP_2s (cc->destination),
+        GCPP_2s (cc->path),
+        cc->state,
+        (GNUNET_YES == cc->mqm_ready) ? "ready" : "busy");
 }
 
 /* end of gnunet-service-cadet-new_connection.c */