- log
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
index 99b1565956bb4e28305e5438ed3d3157c191f849..8d2ef4044b0725954bb210415c1bc9167f64d782 100644 (file)
 #include "gnunet_statistics_service.h"
 
 #include "mesh_path.h"
-#include "mesh_protocol_enc.h"
-#include "mesh_enc.h"
+#include "mesh_protocol.h"
+#include "mesh.h"
 #include "gnunet-service-mesh_connection.h"
 #include "gnunet-service-mesh_peer.h"
 #include "gnunet-service-mesh_tunnel.h"
-#include "gnunet-service-mesh_channel.h"
 
 
 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
@@ -105,6 +104,16 @@ struct MeshFlowControl
    * How frequently to poll for ACKs.
    */
   struct GNUNET_TIME_Relative poll_time;
+
+  /**
+   * Queued poll message, to cancel if not necessary anymore (got ACK).
+   */
+  struct MeshConnectionQueue *poll_msg;
+
+  /**
+   * Queued poll message, to cancel if not necessary anymore (got ACK).
+   */
+  struct MeshConnectionQueue *ack_msg;
 };
 
 /**
@@ -202,6 +211,27 @@ struct MeshConnection
   int destroy;
 };
 
+/**
+ * Handle for messages queued but not yet sent.
+ */
+struct MeshConnectionQueue
+{
+  /**
+   * Peer queue handle, to cancel if necessary.
+   */
+  struct MeshPeerQueue *q;
+
+  /**
+   * Continuation to call once sent.
+   */
+  GMC_sent cont;
+
+  /**
+   * Closure for @c cont.
+   */
+  void *cont_cls;
+};
+
 /******************************************************************************/
 /*******************************   GLOBALS  ***********************************/
 /******************************************************************************/
@@ -243,6 +273,11 @@ static unsigned long long max_msgs_queue;
  */
 static struct GNUNET_TIME_Relative refresh_connection_time;
 
+/**
+ * How often to send path create / ACKs.
+ */
+static struct GNUNET_TIME_Relative create_connection_time;
+
 
 /******************************************************************************/
 /********************************   STATIC  ***********************************/
@@ -299,6 +334,8 @@ GMC_state2s (enum MeshConnectionState s)
       return "MESH_CONNECTION_ACK";
     case MESH_CONNECTION_READY:
       return "MESH_CONNECTION_READY";
+    case MESH_CONNECTION_DESTROYED:
+      return "MESH_CONNECTION_DESTROYED";
     default:
       return "MESH_CONNECTION_STATE_ERROR";
   }
@@ -344,6 +381,11 @@ connection_change_state (struct MeshConnection* c,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
               "Connection %s state was %s\n",
               GMC_2s (c), GMC_state2s (c->state));
+  if (MESH_CONNECTION_DESTROYED == c->state)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
+    return;
+  }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
               "Connection %s state is now %s\n",
               GMC_2s (c), GMC_state2s (state));
@@ -351,20 +393,43 @@ connection_change_state (struct MeshConnection* c,
 }
 
 
+/**
+ * Callback called when a queued ACK message is sent.
+ *
+ * @param cls Closure (FC).
+ * @param c Connection this message was on.
+ * @param q Queue handler this call invalidates.
+ * @param type Type of message sent.
+ * @param fwd Was this a FWD going message?
+ * @param size Size of the message.
+ */
+static void
+ack_sent (void *cls,
+          struct MeshConnection *c,
+          struct MeshConnectionQueue *q,
+          uint16_t type, int fwd, size_t size)
+{
+  struct MeshFlowControl *fc = cls;
+
+  fc->ack_msg = NULL;
+}
+
+
 /**
  * Send an ACK on the connection, informing the predecessor about
  * the available buffer space. Should not be called in case the peer
- * is origin (no predecessor).
+ * is origin (no predecessor) in the @c fwd direction.
  *
  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
  * the ACK itself goes "back" (dest->root).
  *
  * @param c Connection on which to send the ACK.
  * @param buffer How much space free to advertise?
- * @param fwd Is this FWD ACK? (Going dest->owner)
+ * @param fwd Is this FWD ACK? (Going dest -> root)
+ * @param force Don't optimize out.
  */
 static void
-send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
+send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force)
 {
   struct MeshFlowControl *next_fc;
   struct MeshFlowControl *prev_fc;
@@ -386,9 +451,9 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
               "connection send %s ack on %s\n",
               fwd ? "FWD" : "BCK", GMC_2s (c));
 
-  /* Check if we need to transmit the ACK */
+  /* Check if we need to transmit the ACK. */
   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
-  if (3 < delta && buffer < delta)
+  if (3 < delta && buffer < delta && GNUNET_NO == force)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -404,12 +469,28 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
        " last pid %u, last ack %u, qmax %u, q %u\n",
        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
        next_fc->queue_max, next_fc->queue_n);
-  if (ack == prev_fc->last_ack_sent)
+  if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
     return;
   }
 
+  /* Check if message is already in queue */
+  if (NULL != prev_fc->ack_msg)
+  {
+    if (GMC_is_pid_bigger (ack, prev_fc->last_ack_sent))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
+      GMC_cancel (prev_fc->ack_msg);
+      /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */
+    }
+    else
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
+      return;
+    }
+  }
+
   prev_fc->last_ack_sent = ack;
 
   /* Build ACK message and send on connection */
@@ -418,7 +499,8 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
   msg.ack = htonl (ack);
   msg.cid = c->id;
 
-  GMC_send_prebuilt_message (&msg.header, c, NULL, !fwd);
+  prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c, !fwd,
+                                                &ack_sent, prev_fc);
 }
 
 
@@ -427,14 +509,14 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
  *
  * Calculates the average time and connection packet tracking.
  *
- * @param cls Closure.
+ * @param cls Closure (ConnectionQueue Handle).
  * @param c Connection this message was on.
  * @param type Type of message sent.
  * @param fwd Was this a FWD going message?
  * @param size Size of the message.
  * @param wait Time spent waiting for core (only the time for THIS message)
  */
-static void 
+static void
 message_sent (void *cls,
               struct MeshConnection *c, uint16_t type,
               int fwd, size_t size,
@@ -442,11 +524,24 @@ message_sent (void *cls,
 {
   struct MeshConnectionPerformance *p;
   struct MeshFlowControl *fc;
+  struct MeshConnectionQueue *q = cls;
   double usecsperbyte;
 
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "!  sent %s\n", GNUNET_MESH_DEBUG_M2S (type));
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "!  sent %s %s\n",
+       fwd ? "FWD" : "BCK",
+       GNUNET_MESH_DEBUG_M2S (type));
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "!  C_P- %p %u\n", c, c->pending_messages);
+  if (NULL != q)
+  {
+    if (NULL != q->cont)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "!  calling cont\n");
+      q->cont (q->cont_cls, c, q, type, fwd, size);
+    }
+    GNUNET_free (q);
+  }
   c->pending_messages--;
   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
   {
@@ -459,12 +554,22 @@ message_sent (void *cls,
   {
     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
       fc->last_pid_sent++;
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
       fc->queue_n--;
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "!   accounting pid %u\n",
            fc->last_pid_sent);
-      GMC_send_ack (c, fwd);
+      GMC_send_ack (c, fwd, GNUNET_NO);
       break;
+
+    case GNUNET_MESSAGE_TYPE_MESH_POLL:
+      fc->poll_msg = NULL;
+      break;
+
+    case GNUNET_MESSAGE_TYPE_MESH_ACK:
+      fc->ack_msg = NULL;
+      break;
+
     default:
       break;
   }
@@ -492,16 +597,6 @@ message_sent (void *cls,
     p->avg /= p->size;
   }
   p->idx = (p->idx + 1) % AVG_MSGS;
-
-//   if (NULL != c->t)
-//   {
-//     c->t->pending_messages--;
-//     if (GNUNET_YES == c->t->destroy && 0 == t->pending_messages)
-//     {
-//       LOG (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
-//       GMT_destroy (c->t);
-//     }
-//   }
 }
 
 
@@ -517,6 +612,7 @@ get_prev_hop (const struct MeshConnection *c)
 {
   GNUNET_PEER_Id id;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Get prev hop, own pos %u\n", c->own_pos);
   if (0 == c->own_pos || c->path->length < 2)
     id = c->path->peers[0];
   else
@@ -570,10 +666,12 @@ get_hop (struct MeshConnection *c, int fwd)
  * @param c Connection to check.
  * @param sender Peer identity of neighbor.
  *
- * @return GNUNET_YES in case the sender is the 'prev' hop and therefore
- *         the traffic is 'FWD'. GNUNET_NO for BCK. GNUNET_SYSERR for errors.
+ * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
+ *         the traffic is 'FWD'.
+ *         #GNUNET_NO for BCK.
+ *         #GNUNET_SYSERR for errors.
  */
-static int 
+static int
 is_fwd (const struct MeshConnection *c,
         const struct GNUNET_PeerIdentity *sender)
 {
@@ -596,7 +694,7 @@ is_fwd (const struct MeshConnection *c,
  * or a first CONNECTION_ACK directed to us.
  *
  * @param connection Connection to confirm.
- * @param fwd Should we send it FWD?
+ * @param fwd Should we send it FWD? (root->dest)
  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
  */
 static void
@@ -605,12 +703,13 @@ send_connection_ack (struct MeshConnection *connection, int fwd)
   struct MeshTunnel3 *t;
 
   t = connection->t;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection %s ACK\n",
+       !fwd ? "FWD" : "BCK");
   GMP_queue_add (get_hop (connection, fwd), NULL,
                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
                  sizeof (struct GNUNET_MESH_ConnectionACK),
-                 connection, NULL, fwd,
-                 &message_sent, NULL);
+                 connection, fwd, &message_sent, NULL);
+  connection->pending_messages++;
   if (MESH_TUNNEL3_NEW == GMT_get_state (t))
     GMT_change_state (t, MESH_TUNNEL3_WAITING);
   if (MESH_CONNECTION_READY != connection->state)
@@ -639,7 +738,7 @@ send_broken (struct MeshConnection *c,
   msg.cid = c->id;
   msg.peer1 = *id1;
   msg.peer2 = *id2;
-  GMC_send_prebuilt_message (&msg.header, c, NULL, fwd);
+  GMC_send_prebuilt_message (&msg.header, c, fwd, NULL, NULL);
 }
 
 
@@ -649,6 +748,8 @@ send_broken (struct MeshConnection *c,
  *
  * @param c Connection to keep alive..
  * @param fwd Is this a FWD keepalive? (owner -> dest).
+ *
+ * FIXME use only one type, register in GMC_send_prebuilt_message()
  */
 static void
 connection_keepalive (struct MeshConnection *c, int fwd)
@@ -670,7 +771,7 @@ connection_keepalive (struct MeshConnection *c, int fwd)
   msg->header.type = htons (type);
   msg->cid = c->id;
 
-  GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
+  GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL);
 }
 
 
@@ -678,7 +779,7 @@ connection_keepalive (struct MeshConnection *c, int fwd)
  * Send CONNECTION_{CREATE/ACK} packets for a connection.
  *
  * @param c Connection for which to send the message.
- * @param fwd If GNUNET_YES, send CREATE, otherwise send ACK.
+ * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
  */
 static void
 connection_recreate (struct MeshConnection *c, int fwd)
@@ -711,6 +812,7 @@ connection_maintain (struct MeshConnection *c, int fwd)
   {
     case MESH_CONNECTION_NEW:
       GNUNET_break (0);
+      /* fall-through */
     case MESH_CONNECTION_SENT:
       connection_recreate (c, fwd);
       break;
@@ -727,13 +829,16 @@ static void
 connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct MeshConnection *c = cls;
+  struct GNUNET_TIME_Relative delay;
 
   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
   connection_maintain (c, GNUNET_YES);
-  c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+  delay = c->state == MESH_CONNECTION_READY ?
+          refresh_connection_time : create_connection_time;
+  c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
                                                           &connection_fwd_keepalive,
                                                           c);
 }
@@ -743,13 +848,16 @@ static void
 connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct MeshConnection *c = cls;
+  struct GNUNET_TIME_Relative delay;
 
   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
   connection_maintain (c, GNUNET_NO);
-  c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+  delay = c->state == MESH_CONNECTION_READY ?
+          refresh_connection_time : create_connection_time;
+  c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
                                                           &connection_bck_keepalive,
                                                           c);
 }
@@ -788,34 +896,83 @@ connection_unlock_queue (struct MeshConnection *c, int fwd)
 /**
  * Cancel all transmissions that belong to a certain connection.
  *
- * @param c Connection which to cancel.
+ * If the connection is scheduled for destruction and no more messages are left,
+ * the connection will be destroyed by the continuation call.
+ *
+ * @param c Connection which to cancel. Might be destroyed during this call.
  * @param fwd Cancel fwd traffic?
  */
 static void
 connection_cancel_queues (struct MeshConnection *c, int fwd)
 {
-
   struct MeshFlowControl *fc;
   struct MeshPeer *peer;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       " *** Cancel %s queues for connection %s\n",
+       fwd ? "FWD" : "BCK", GMC_2s (c));
   if (NULL == c)
   {
     GNUNET_break (0);
     return;
   }
 
-  peer = get_hop (c, fwd);
-  GMP_queue_cancel (peer, c);
-
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
   {
     GNUNET_SCHEDULER_cancel (fc->poll_task);
     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
   }
+  peer = get_hop (c, fwd);
+  GMP_queue_cancel (peer, c);
 }
 
 
+/**
+ * Function called if a connection has been stalled for a while,
+ * possibly due to a missed ACK. Poll the neighbor about its ACK status.
+ *
+ * @param cls Closure (poll ctx).
+ * @param tc TaskContext.
+ */
+static void
+connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Callback called when a queued POLL message is sent.
+ *
+ * @param cls Closure (FC).
+ * @param c Connection this message was on.
+ * @param q Queue handler this call invalidates.
+ * @param type Type of message sent.
+ * @param fwd Was this a FWD going message?
+ * @param size Size of the message.
+ */
+static void
+poll_sent (void *cls,
+           struct MeshConnection *c,
+           struct MeshConnectionQueue *q,
+           uint16_t type, int fwd, size_t size)
+{
+  struct MeshFlowControl *fc = cls;
+
+  if (2 == c->destroy)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
+    return;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       " *** POLL sent for , scheduling new one!\n");
+  fc->poll_msg = NULL;
+  fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
+  fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
+                                                &connection_poll, fc);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
+
+}
+
 /**
  * Function called if a connection has been stalled for a while,
  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
@@ -840,15 +997,14 @@ connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%s]\n", GMC_2s (c));
   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
-              fc == &c->fwd_fc ? "FWD" : "BCK");
+       fc == &c->fwd_fc ? "FWD" : "BCK");
 
   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
   msg.header.size = htons (sizeof (msg));
-  LOG (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent);
-  GMC_send_prebuilt_message (&msg.header, c, NULL, fc == &c->fwd_fc);
-  fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
-  fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
-                                                &connection_poll, fc);
+  msg.pid = htonl (fc->last_pid_sent);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
+  fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c, fc == &c->fwd_fc,
+                                            &poll_sent, fc);
 }
 
 
@@ -933,12 +1089,12 @@ connection_reset_timeout (struct MeshConnection *c, int fwd)
   if (GNUNET_SCHEDULER_NO_TASK != *ti)
     GNUNET_SCHEDULER_cancel (*ti);
 
-  if (GMC_is_origin (c, fwd)) /* Endpoint */
+  if (GMC_is_origin (c, fwd)) /* Startpoint */
   {
     f  = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
   }
-  else /* Relay */
+  else /* Relay, endpoint. */
   {
     struct GNUNET_TIME_Relative delay;
 
@@ -953,26 +1109,27 @@ connection_reset_timeout (struct MeshConnection *c, int fwd)
  * Add the connection to the list of both neighbors.
  *
  * @param c Connection.
+ *
+ * @return #GNUNET_OK if everything went fine
+ *         #GNUNET_SYSERR if the was an error and @c c is malformed.
  */
-static void
+static int
 register_neighbors (struct MeshConnection *c)
 {
-  struct MeshPeer *peer;
+  struct MeshPeer *next_peer;
+  struct MeshPeer *prev_peer;
 
-  peer = get_next_hop (c);
-  if (GNUNET_NO == GMP_is_neighbor (peer))
-  {
-    GMC_destroy (c);
-    return;
-  }
-  GMP_add_connection (peer, c);
-  peer = get_prev_hop (c);
-  if (GNUNET_NO == GMP_is_neighbor (peer))
-  {
-    GMC_destroy (c);
-    return;
-  }
-  GMP_add_connection (peer, c);
+  next_peer = get_next_hop (c);
+  prev_peer = get_prev_hop (c);
+
+  if (GNUNET_NO == GMP_is_neighbor (next_peer)
+      || GNUNET_NO == GMP_is_neighbor (prev_peer))
+    return GNUNET_SYSERR;
+
+  GMP_add_connection (next_peer, c);
+  GMP_add_connection (prev_peer, c);
+
+  return GNUNET_OK;
 }
 
 
@@ -1103,12 +1260,15 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
         LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
     if (NULL == c)
+    {
+      path_destroy (path);
       return GNUNET_OK;
+    }
     connection_reset_timeout (c, GNUNET_YES);
   }
   else
   {
-    path = NULL;
+    path = path_duplicate (c->path);
   }
   if (MESH_CONNECTION_NEW == c->state)
     connection_change_state (c, MESH_CONNECTION_SENT);
@@ -1121,7 +1281,7 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
   if (c->own_pos == size - 1)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
-    GMP_add_path_to_origin (orig_peer, path, GNUNET_YES);
+    GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
 
     add_to_peer (c, orig_peer);
     if (MESH_TUNNEL3_NEW == GMT_get_state (c->t))
@@ -1132,16 +1292,19 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
       connection_change_state (c, MESH_CONNECTION_ACK);
 
     /* Keep tunnel alive in direction dest->owner*/
-    connection_reset_timeout (c, GNUNET_NO);
+    c->bck_maintenance_task =
+            GNUNET_SCHEDULER_add_delayed (create_connection_time,
+                                          &connection_bck_keepalive, c);
   }
   else
   {
     /* It's for somebody else! Retransmit. */
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
-    GMP_add_path_to_origin (orig_peer, path, GNUNET_NO);
-    GMC_send_prebuilt_message (message, c, NULL, GNUNET_YES);
+    GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
+    GMC_send_prebuilt_message (message, c, GNUNET_YES, NULL, NULL);
   }
+  path_destroy (path);
   return GNUNET_OK;
 }
 
@@ -1164,6 +1327,7 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
   struct MeshConnection *c;
   struct MeshPeerPath *p;
   struct MeshPeer *pi;
+  enum MeshConnectionState oldstate;
   int fwd;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
@@ -1180,15 +1344,14 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_OK;
   }
 
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n",
-              GNUNET_i2s (peer));
+  oldstate = c->state;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
   pi = GMP_get (peer);
   if (get_next_hop (c) == pi)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
     fwd = GNUNET_NO;
-    if (MESH_CONNECTION_SENT == c->state)
+    if (MESH_CONNECTION_SENT == oldstate)
       connection_change_state (c, MESH_CONNECTION_ACK);
   }
   else if (get_prev_hop (c) == pi)
@@ -1202,6 +1365,7 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
     GNUNET_break_op (0);
     return GNUNET_OK;
   }
+
   connection_reset_timeout (c, fwd);
 
   /* Add path to peers? */
@@ -1218,26 +1382,50 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
   /* Message for us as creator? */
   if (GMC_is_origin (c, GNUNET_YES))
   {
+    if (GNUNET_NO != fwd)
+    {
+      GNUNET_break_op (0);
+      return GNUNET_OK;
+    }
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
+
+    /* If just created, cancel the short timeout and start a long one */
+    if (MESH_CONNECTION_SENT == oldstate)
+      connection_reset_timeout (c, GNUNET_YES);
+
+    /* Change connection and tunnel state */
     connection_change_state (c, MESH_CONNECTION_READY);
-    GMT_change_state (c->t, MESH_TUNNEL3_READY);
+    if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
+      GMT_change_state (c->t, MESH_TUNNEL3_READY);
+
+    /* Send ACK (~TCP ACK)*/
     send_connection_ack (c, GNUNET_YES);
-    GMT_send_queued_data (c->t, GNUNET_YES);
     return GNUNET_OK;
   }
 
   /* Message for us as destination? */
   if (GMC_is_terminal (c, GNUNET_YES))
   {
+    if (GNUNET_YES != fwd)
+    {
+      GNUNET_break_op (0);
+      return GNUNET_OK;
+    }
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
-    connection_change_state (c, MESH_CONNECTION_READY);
-    GMT_change_state (c->t, MESH_TUNNEL3_READY);
-    GMT_send_queued_data (c->t, GNUNET_NO);
+
+    /* If just created, cancel the short timeout and start a long one */
+    if (MESH_CONNECTION_ACK == oldstate)
+      connection_reset_timeout (c, GNUNET_NO);
+
+    /* Change tunnel state */
+    if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
+      GMT_change_state (c->t, MESH_TUNNEL3_READY);
+
     return GNUNET_OK;
   }
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
-  GMC_send_prebuilt_message (message, c, NULL, fwd);
+  GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
   return GNUNET_OK;
 }
 
@@ -1286,7 +1474,7 @@ GMC_handle_broken (void* cls,
   }
   else
   {
-    GMC_send_prebuilt_message (message, c, NULL, fwd);
+    GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
     c->destroy = GNUNET_YES;
   }
 
@@ -1329,6 +1517,7 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
      */
     GNUNET_STATISTICS_update (stats, "# control on unknown tunnel",
                               1, GNUNET_NO);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
     return GNUNET_OK;
   }
   fwd = is_fwd (c, peer);
@@ -1337,8 +1526,9 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
     GNUNET_break_op (0);
     return GNUNET_OK;
   }
-  GMC_send_prebuilt_message (message, c, NULL, fwd);
+  GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
   c->destroy = GNUNET_YES;
+  c->state = MESH_CONNECTION_DESTROYED;
 
   return GNUNET_OK;
 }
@@ -1347,7 +1537,7 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
  * Generic handler for mesh network encrypted traffic.
  *
  * @param peer Peer identity this notification is about.
- * @param message Encrypted message.
+ * @param msg Encrypted message.
  *
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
@@ -1377,15 +1567,17 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
   }
   type = ntohs (msg->header.type);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
-              GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message (#%u) from %s\n",
+       GNUNET_MESH_DEBUG_M2S (type), ntohl (msg->pid), GNUNET_i2s (peer));
 
   /* Check connection */
   c = connection_get (&msg->cid);
   if (NULL == c)
   {
     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "WARNING connection %s unknown\n",
+         GNUNET_h2s (&msg->cid));
     return GNUNET_OK;
   }
 
@@ -1405,13 +1597,14 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
     }
     else
     {
+      /* Unexpected peer sending traffic on a connection. */
       GNUNET_break_op (0);
       return GNUNET_OK;
     }
   }
-  fc = fwd ? &c->bck_fc : &c->fwd_fc;
 
   /* Check PID */
+  fc = fwd ? &c->bck_fc : &c->fwd_fc;
   pid = ntohl (msg->pid);
   if (GMC_is_pid_bigger (pid, fc->last_ack_sent))
   {
@@ -1429,7 +1622,7 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
                 pid, fc->last_pid_recv + 1);
     return GNUNET_OK;
   }
-  if (MESH_CONNECTION_SENT == c->state)
+  if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
     connection_change_state (c, MESH_CONNECTION_READY);
   connection_reset_timeout (c, fwd);
   fc->last_pid_recv = pid;
@@ -1447,8 +1640,8 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
       return GNUNET_OK;
     }
     fc->last_pid_recv = pid;
-    GMT_handle_encrypted (c->t, msg, fwd);
-    GMC_send_ack (c, fwd);
+    GMT_handle_encrypted (c->t, msg);
+    GMC_send_ack (c, fwd, GNUNET_NO);
     return GNUNET_OK;
   }
 
@@ -1460,12 +1653,109 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
   {
     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
-    GMC_send_ack (c, fwd);
+    GMC_send_ack (c, fwd, GNUNET_NO);
     return GNUNET_OK;
   }
+
   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
+  GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL);
+
+  return GNUNET_OK;
+}
+
+/**
+ * Generic handler for mesh network encrypted traffic.
+ *
+ * @param peer Peer identity this notification is about.
+ * @param msg Encrypted message.
+ *
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_mesh_kx (const struct GNUNET_PeerIdentity *peer,
+                const struct GNUNET_MESH_KX *msg)
+{
+  struct MeshConnection *c;
+  struct MeshPeer *neighbor;
+  GNUNET_PEER_Id peer_id;
+  size_t size;
+  uint16_t type;
+  int fwd;
+
+  /* Check size */
+  size = ntohs (msg->header.size);
+  if (size <
+      sizeof (struct GNUNET_MESH_Encrypted) +
+      sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_OK;
+  }
+  type = ntohs (msg->header.type);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
+              GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
 
-  GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
+  /* Check connection */
+  c = connection_get (&msg->cid);
+  if (NULL == c)
+  {
+    GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
+    return GNUNET_OK;
+  }
+
+  /* Check if origin is as expected */
+  neighbor = get_prev_hop (c);
+  peer_id = GNUNET_PEER_search (peer);
+  if (peer_id == GMP_get_short_id (neighbor))
+  {
+    fwd = GNUNET_YES;
+  }
+  else
+  {
+    neighbor = get_next_hop (c);
+    if (peer_id == GMP_get_short_id (neighbor))
+    {
+      fwd = GNUNET_NO;
+    }
+    else
+    {
+      /* Unexpected peer sending traffic on a connection. */
+      GNUNET_break_op (0);
+      return GNUNET_OK;
+    }
+  }
+
+  /* Count as connection confirmation. */
+  if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
+    connection_change_state (c, MESH_CONNECTION_READY);
+  connection_reset_timeout (c, fwd);
+  if (NULL != c->t)
+  {
+    if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
+      GMT_change_state (c->t, MESH_TUNNEL3_READY);
+  }
+
+  /* Is this message for us? */
+  if (GMC_is_terminal (c, fwd))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
+    GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
+    if (NULL == c->t)
+    {
+      GNUNET_break (0);
+      return GNUNET_OK;
+    }
+    GMT_handle_kx (c->t, &msg[1].header);
+    return GNUNET_OK;
+  }
+
+  /* Message not for us: forward to next hop */
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
+  GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
+  GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL);
 
   return GNUNET_OK;
 }
@@ -1490,6 +1780,25 @@ GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
 }
 
 
+/**
+ * Core handler for key exchange traffic (ephemeral key, ping, pong).
+ *
+ * @param cls Closure (unused).
+ * @param message Message received.
+ * @param peer Peer who sent the message.
+ *
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+int
+GMC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
+               const struct GNUNET_MessageHeader *message)
+{
+  return handle_mesh_kx (peer,
+                         (struct GNUNET_MESH_KX *) message);
+}
+
+
 /**
  * Core handler for mesh network traffic point-to-point acks.
  *
@@ -1589,8 +1898,9 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
   int fwd;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a POLL packet from %s!\n",
-              GNUNET_i2s (peer));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got a POLL packet from %s!\n",
+       GNUNET_i2s (peer));
 
   msg = (struct GNUNET_MESH_Poll *) message;
 
@@ -1612,12 +1922,12 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
   id = GNUNET_PEER_search (peer);
   if (GMP_get_short_id (get_next_hop (c)) == id)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
     fc = &c->fwd_fc;
   }
   else if (GMP_get_short_id (get_prev_hop (c)) == id)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
     fc = &c->bck_fc;
   }
   else
@@ -1627,11 +1937,10 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
   }
 
   pid = ntohl (msg->pid);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n",
-              pid, fc->last_pid_recv);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
   fc->last_pid_recv = pid;
-  fwd = fc == &c->fwd_fc;
-  GMC_send_ack (c, fwd);
+  fwd = fc == &c->bck_fc;
+  GMC_send_ack (c, fwd, GNUNET_YES);
 
   return GNUNET_OK;
 }
@@ -1687,7 +1996,7 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_OK;
 
   GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
-  GMC_send_prebuilt_message (message, c, NULL, fwd);
+  GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
 
   return GNUNET_OK;
 }
@@ -1698,10 +2007,11 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
  * the direction and the position of the peer.
  *
  * @param c Which connection to send the hop-by-hop ACK.
- * @param fwd Is this a fwd ACK? (will go dest->root)
+ * @param fwd Is this a fwd ACK? (will go dest->root).
+ * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
  */
 void
-GMC_send_ack (struct MeshConnection *c, int fwd)
+GMC_send_ack (struct MeshConnection *c, int fwd, int force)
 {
   unsigned int buffer;
 
@@ -1715,11 +2025,17 @@ GMC_send_ack (struct MeshConnection *c, int fwd)
     return;
   }
 
+  if (GNUNET_NO != c->destroy)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
+    return;
+  }
+
   /* Get available buffer space */
   if (GMC_is_terminal (c, fwd))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
-    buffer = GMT_get_buffer (c->t, fwd);
+    buffer = GMT_get_channels_buffer (c->t);
   }
   else
   {
@@ -1727,22 +2043,20 @@ GMC_send_ack (struct MeshConnection *c, int fwd)
     buffer = GMC_get_buffer (c, fwd);
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
+  if (0 == buffer && GNUNET_NO == force)
+    return;
 
   /* Send available buffer space */
   if (GMC_is_origin (c, fwd))
   {
     GNUNET_assert (NULL != c->t);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
-    if (0 < buffer)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  really sending!\n");
-      GMT_unchoke_channels (c->t, fwd);
-    }
+    GMT_unchoke_channels (c->t);
   }
   else
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
-    send_ack (c, buffer, fwd);
+    send_ack (c, buffer, fwd, force);
   }
 }
 
@@ -1785,16 +2099,41 @@ GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
+  create_connection_time = GNUNET_TIME_UNIT_SECONDS;
   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
 }
 
+
+/**
+ * Destroy each connection on shutdown.
+ *
+ * @param cls Closure (unused).
+ * @param key Current key code (CID, unused).
+ * @param value Value in the hash map (connection)
+ *
+ * @return #GNUNET_YES, because we should continue to iterate,
+ */
+static int
+shutdown_iterator (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct MeshConnection *c = value;
+
+  GMC_destroy (c);
+  return GNUNET_YES;
+}
+
+
 /**
  * Shut down the connections subsystem.
  */
 void
 GMC_shutdown (void)
 {
+  GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
   GNUNET_CONTAINER_multihashmap_destroy (connections);
+  connections = NULL;
 }
 
 
@@ -1828,10 +2167,15 @@ GMC_new (const struct GNUNET_HashCode *cid,
   if (0 == own_pos)
   {
     c->fwd_maintenance_task =
-            GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
+            GNUNET_SCHEDULER_add_delayed (create_connection_time,
                                           &connection_fwd_keepalive, c);
   }
-  register_neighbors (c);
+  if (GNUNET_OK != register_neighbors (c))
+  {
+    GMC_destroy (c);
+    return NULL;
+  }
+
   return c;
 }
 
@@ -1842,17 +2186,48 @@ GMC_destroy (struct MeshConnection *c)
   if (NULL == c)
     return;
 
+  if (2 == c->destroy) /* cancel queues -> GMP_queue_cancel -> q_destroy -> */
+    return;            /* -> message_sent -> GMC_destroy. Don't loop. */
+  c->destroy = 2;
+
   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
+       &c->fwd_fc, &c->bck_fc);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
+       c->fwd_fc.poll_task, c->bck_fc.poll_task);
 
   /* Cancel all traffic */
   connection_cancel_queues (c, GNUNET_YES);
   connection_cancel_queues (c, GNUNET_NO);
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
+       c->fwd_fc.poll_task, c->bck_fc.poll_task);
+
   /* Cancel maintainance task (keepalive/timeout) */
   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
+  if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
+  {
+    GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
+  }
+  if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
+  {
+    GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
+  }
+  if (NULL != c->fwd_fc.poll_msg)
+  {
+    GMC_cancel (c->fwd_fc.poll_msg);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
+  }
+  if (NULL != c->bck_fc.poll_msg)
+  {
+    GMC_cancel (c->bck_fc.poll_msg);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
+  }
 
   /* Unregister from neighbors */
   unregister_neighbors (c);
@@ -1861,6 +2236,13 @@ GMC_destroy (struct MeshConnection *c)
   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
   if (NULL != c->t)
     GMT_remove_connection (c->t, c);
+
+  if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES))
+    path_destroy (c->path);
+
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c));
+
   GNUNET_free (c);
 }
 
@@ -1938,7 +2320,7 @@ GMC_get_buffer (struct MeshConnection *c, int fwd)
 }
 
 /**
- * Get how many messages have we allowed to send to us from a direction..
+ * Get how many messages have we allowed to send to us from a direction.
  *
  * @param c Connection.
  * @param fwd Are we asking about traffic from FWD (BCK messages)?
@@ -1990,7 +2372,7 @@ GMC_get_qn (struct MeshConnection *c, int fwd)
 void
 GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
 {
-  send_ack (c, buffer, fwd);
+  send_ack (c, buffer, fwd, GNUNET_NO);
 }
 
 
@@ -2007,22 +2389,31 @@ GMC_notify_broken (struct MeshConnection *c,
 {
   int fwd;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       " notify broken on %s due to %s disconnect\n",
+       GMC_2s (c), GMP_2s (peer));
+
   fwd = peer == get_prev_hop (c);
 
-  connection_cancel_queues (c, !fwd);
-  if (GMC_is_terminal (c, fwd))
+  if (GNUNET_YES == GMC_is_terminal (c, fwd))
   {
     /* Local shutdown, no one to notify about this. */
     GMC_destroy (c);
     return;
   }
-
-  send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
+  if (GNUNET_NO == c->destroy)
+    send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
 
   /* Connection will have at least one pending message
    * (the one we just scheduled), so no point in checking whether to
    * destroy immediately. */
   c->destroy = GNUNET_YES;
+  c->state = MESH_CONNECTION_DESTROYED;
+
+  /**
+   * Cancel all queues, if no message is left, connection will be destroyed.
+   */
+  connection_cancel_queues (c, !fwd);
 
   return;
 }
@@ -2034,7 +2425,7 @@ GMC_notify_broken (struct MeshConnection *c,
  * @param c Connection.
  * @param fwd Is this about fwd traffic?
  *
- * @return GNUNET_YES if origin, GNUNET_NO if relay/terminal.
+ * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
  */
 int
 GMC_is_origin (struct MeshConnection *c, int fwd)
@@ -2054,7 +2445,7 @@ GMC_is_origin (struct MeshConnection *c, int fwd)
  * @param fwd Is this about fwd traffic?
  *            Note that the ROOT is the terminal for BCK traffic!
  *
- * @return GNUNET_YES if terminal, GNUNET_NO if relay/origin.
+ * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
  */
 int
 GMC_is_terminal (struct MeshConnection *c, int fwd)
@@ -2069,7 +2460,7 @@ GMC_is_terminal (struct MeshConnection *c, int fwd)
  * @param c Connection.
  * @param fwd Is this about fwd traffic?
  *
- * @return GNUNET_YES in case it's OK.
+ * @return #GNUNET_YES in case it's OK to send.
  */
 int
 GMC_is_sendable (struct MeshConnection *c, int fwd)
@@ -2089,16 +2480,21 @@ GMC_is_sendable (struct MeshConnection *c, int fwd)
  * @param message Message to send. Function makes a copy of it.
  *                If message is not hop-by-hop, decrements TTL of copy.
  * @param c Connection on which this message is transmitted.
- * @param ch Channel on which this message is transmitted, or NULL.
  * @param fwd Is this a fwd message?
+ * @param cont Continuation called once message is sent. Can be NULL.
+ * @param cont_cls Closure for @c cont.
+ *
+ * @return Handle to cancel the message before it's sent.
+ *         NULL on error or if @c cont is NULL.
+ *         Invalid on @c cont call.
  */
-void
+struct MeshConnectionQueue *
 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
-                           struct MeshConnection *c,
-                           struct MeshChannel *ch,
-                           int fwd)
+                           struct MeshConnection *c, int fwd,
+                           GMC_sent cont, void *cont_cls)
 {
   struct MeshFlowControl *fc;
+  struct MeshConnectionQueue *q;
   void *data;
   size_t size;
   uint16_t type;
@@ -2108,7 +2504,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
   data = GNUNET_malloc (size);
   memcpy (data, message, size);
   type = ntohs (message->type);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u bytes) on connection %s\n",
               GNUNET_MESH_DEBUG_M2S (type), size, GMC_2s (c));
 
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
@@ -2116,6 +2512,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
   switch (type)
   {
     struct GNUNET_MESH_Encrypted *emsg;
+    struct GNUNET_MESH_KX        *kmsg;
     struct GNUNET_MESH_ACK       *amsg;
     struct GNUNET_MESH_Poll      *pmsg;
     struct GNUNET_MESH_ConnectionDestroy *dmsg;
@@ -2128,22 +2525,28 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
       if (0 == ttl)
       {
         GNUNET_break_op (0);
-        return;
+        GNUNET_free (data);
+        return NULL;
       }
       emsg->cid = c->id;
       emsg->ttl = htonl (ttl - 1);
-      emsg->pid = htonl (fwd ? c->fwd_fc.next_pid++ : c->bck_fc.next_pid++);
+      emsg->pid = htonl (fc->next_pid++);
       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
       fc->queue_n++;
       LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack %u\n", fc->last_ack_recv);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
       if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
       {
         GMC_start_poll (c, fwd);
       }
       break;
 
+    case GNUNET_MESSAGE_TYPE_MESH_KX:
+      kmsg = (struct GNUNET_MESH_KX *) data;
+      kmsg->cid = c->id;
+      break;
+
     case GNUNET_MESSAGE_TYPE_MESH_ACK:
       amsg = (struct GNUNET_MESH_ACK *) data;
       amsg->cid = c->id;
@@ -2154,7 +2557,6 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
     case GNUNET_MESSAGE_TYPE_MESH_POLL:
       pmsg = (struct GNUNET_MESH_Poll *) data;
       pmsg->cid = c->id;
-      pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent);
       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
       droppable = GNUNET_NO;
       break;
@@ -2188,14 +2590,56 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
                 "queue full: %u/%u\n",
                 fc->queue_n, fc->queue_max);
     if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type)
+    {
       fc->queue_n--;
-    return; /* Drop this message */
+      fc->next_pid--;
+    }
+    GNUNET_free (data);
+    return NULL; /* Drop this message */
   }
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u\n", c, c->pending_messages);
   c->pending_messages++;
 
-  GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd,
-                 &message_sent, NULL);
+  if (NULL == cont)
+  {
+    (void) GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
+                          &message_sent, NULL);
+    return NULL;
+  }
+
+  q = GNUNET_new (struct MeshConnectionQueue);
+  q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
+                        &message_sent, q);
+  if (NULL == q->q)
+  {
+    GNUNET_break (0);
+    GNUNET_free (data);
+    GNUNET_free (q);
+    return NULL;
+  }
+  q->cont = cont;
+  q->cont_cls = cont_cls;
+  return q;
+}
+
+
+/**
+ * Cancel a previously sent message while it's in the queue.
+ *
+ * ONLY can be called before the continuation given to the send function
+ * is called. Once the continuation is called, the message is no longer in the
+ * queue.
+ *
+ * @param q Handle to the queue.
+ */
+void
+GMC_cancel (struct MeshConnectionQueue *q)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
+
+  /* queue destroy calls message_sent, which calls q->cont and frees q */
+  GMP_queue_destroy (q->q, GNUNET_YES);
 }
 
 
@@ -2216,8 +2660,10 @@ GMC_send_create (struct MeshConnection *connection)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
   GMP_queue_add (get_next_hop (connection), NULL,
                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
-                 size, connection, NULL,
-                 GNUNET_YES, &message_sent, NULL);
+                 size, connection, GNUNET_YES, &message_sent, NULL);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
+       connection, connection->pending_messages);
+  connection->pending_messages++;
   state = GMT_get_state (connection->t);
   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
     GMT_change_state (connection->t, MESH_TUNNEL3_WAITING);
@@ -2251,10 +2697,11 @@ GMC_send_destroy (struct MeshConnection *c)
               GMC_2s (c));
 
   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
-    GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_YES);
+    GMC_send_prebuilt_message (&msg.header, c, GNUNET_YES, NULL, NULL);
   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
-    GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_NO);
+    GMC_send_prebuilt_message (&msg.header, c, GNUNET_NO, NULL, NULL);
   c->destroy = GNUNET_YES;
+  c->state = MESH_CONNECTION_DESTROYED;
 }
 
 
@@ -2274,10 +2721,15 @@ GMC_start_poll (struct MeshConnection *c, int fwd)
   struct MeshFlowControl *fc;
 
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
-  if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
+       fwd ? "FWD" : "BCK");
+  if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
+         fc->poll_task, fc->poll_msg);
     return;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
                                                 &connection_poll,
                                                 fc);
@@ -2321,4 +2773,4 @@ GMC_2s (struct MeshConnection *c)
     return buf;
   }
   return GNUNET_h2s (&c->id);
-}
\ No newline at end of file
+}