Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / cadet / cadet_api_new.c
index eb8bc2549faa903d90e8bdc904f3d57314306db8..2d5d853b3700fc210e0be7cdec10438f1fd6700a 100644 (file)
@@ -146,7 +146,7 @@ struct GNUNET_CADET_Channel
   void *ctx;
 
   /**
-   * Message Queue for the channel.
+   * Message Queue for the channel (which we are implementing).
    */
   struct GNUNET_MQ_Handle *mq;
 
@@ -156,7 +156,9 @@ struct GNUNET_CADET_Channel
   struct GNUNET_SCHEDULER_Task *mq_cont;
 
   /**
-   * Pending envelope in case we don't have an ACK from the service.
+   * Pending envelope with a message to be transmitted to the
+   * service as soon as we are allowed to.  Should only be
+   * non-NULL if @e allow_send is 0.
    */
   struct GNUNET_MQ_Envelope *pending_env;
 
@@ -181,7 +183,7 @@ struct GNUNET_CADET_Channel
   enum GNUNET_CADET_ChannelOption options;
 
   /**
-   * Are we allowed to send to the service?
+   * How many messages are we allowed to send to the service right now?
    */
   unsigned int allow_send;
 
@@ -193,6 +195,12 @@ struct GNUNET_CADET_Channel
  */
 struct GNUNET_CADET_Port
 {
+
+  /**
+   * Port "number"
+   */
+  struct GNUNET_HashCode id;
+
   /**
    * Handle to the CADET session this port belongs to.
    */
@@ -208,11 +216,6 @@ struct GNUNET_CADET_Port
    */
   void *cls;
 
-  /**
-   * Port "number"
-   */
-  struct GNUNET_HashCode id;
-
   /**
    * Handler for incoming channels on this port
    */
@@ -229,7 +232,7 @@ struct GNUNET_CADET_Port
   GNUNET_CADET_WindowSizeEventHandler window_changes;
 
   /**
-   * Handler called when an incoming channel is destroyed..
+   * Handler called when an incoming channel is destroyed.
    */
   GNUNET_CADET_DisconnectEventHandler disconnects;
 
@@ -327,7 +330,7 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
   struct GNUNET_CADET_Handle *h = ch->cadet;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       " destroy_channel %X of %p\n",
+       "Destroying channel %X of %p\n",
        ch->ccn,
        h);
   GNUNET_assert (GNUNET_YES ==
@@ -343,6 +346,9 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
   if (NULL != ch->disconnects)
     ch->disconnects (ch->ctx,
                      ch);
+  if (NULL != ch->pending_env)
+    GNUNET_MQ_discard (ch->pending_env);
+  GNUNET_MQ_destroy (ch->mq);
   GNUNET_free (ch);
 }
 
@@ -373,6 +379,28 @@ reconnect_cbk (void *cls)
 }
 
 
+/**
+ * Function called during #reconnect() to destroy
+ * all channels that are still open.
+ *
+ * @param cls the `struct GNUNET_CADET_Handle`
+ * @param cid chanenl ID
+ * @param value a `struct GNUNET_CADET_Channel` to destroy
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+destroy_channel_on_reconnect_cb (void *cls,
+                                 uint32_t cid,
+                                 void *value)
+{
+  /* struct GNUNET_CADET_Handle *handle = cls; */
+  struct GNUNET_CADET_Channel *ch = value;
+
+  destroy_channel (ch);
+  return GNUNET_OK;
+}
+
+
 /**
  * Reconnect to the service, retransmit all infomation to try to restore the
  * original state.
@@ -384,12 +412,17 @@ reconnect_cbk (void *cls)
 static void
 schedule_reconnect (struct GNUNET_CADET_Handle *h)
 {
-  if (NULL == h->reconnect_task)
-  {
-    h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
-                                                      &reconnect_cbk, h);
-    h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
-  }
+  if (NULL != h->reconnect_task)
+    return;
+  GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
+                                           &destroy_channel_on_reconnect_cb,
+                                           h);
+  h->reconnect_task
+    = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
+                                    &reconnect_cbk,
+                                    h);
+  h->reconnect_time
+    = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
 }
 
 
@@ -402,25 +435,44 @@ static void
 notify_window_size (struct GNUNET_CADET_Channel *ch)
 {
   if (NULL != ch->window_changes)
-    ch->window_changes (ch->ctx, ch,
+    ch->window_changes (ch->ctx,
+                        ch, /* FIXME: remove 'ch'? */
                         ch->allow_send);
 }
 
 
 /**
- * Allow the MQ implementation to send the next message.
+ * Transmit the next message from our queue.
  *
  * @param cls Closure (channel whose mq to activate).
  */
 static void
-cadet_mq_send_continue (void *cls)
+cadet_mq_send_now (void *cls)
 {
   struct GNUNET_CADET_Channel *ch = cls;
+  struct GNUNET_MQ_Envelope *env = ch->pending_env;
 
   ch->mq_cont = NULL;
+  if (0 == ch->allow_send)
+  {
+    /* how did we get here? */
+    GNUNET_break (0);
+    return;
+  }
+  if (NULL == env)
+  {
+    /* how did we get here? */
+    GNUNET_break (0);
+    return;
+  }
+  ch->allow_send--;
+  ch->pending_env = NULL;
+  GNUNET_MQ_send (ch->cadet->mq,
+                  env);
   GNUNET_MQ_impl_send_continue (ch->mq);
 }
 
+
 /**
  * Implement sending functionality of a message queue for
  * us sending messages to a peer.
@@ -444,7 +496,6 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
   struct GNUNET_MQ_Envelope *env;
   struct GNUNET_CADET_LocalData *cadet_msg;
 
-
   if (NULL == h->mq)
   {
     /* We're currently reconnecting, pretend this worked */
@@ -460,26 +511,16 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
     GNUNET_MQ_impl_send_continue (mq);
     return;
   }
-
   env = GNUNET_MQ_msg_nested_mh (cadet_msg,
                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
                                  msg);
   cadet_msg->ccn = ch->ccn;
-
+  GNUNET_assert (NULL == ch->pending_env);
+  ch->pending_env = env;
   if (0 < ch->allow_send)
-  {
-    /* Service has allowed this message, just send it and continue accepting */
-    GNUNET_MQ_send (h->mq, env);
-    ch->allow_send--;
-    ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
-    // notify_window_size (ch); /* FIXME add "verbose" setting? */
-  }
-  else
-  {
-    /* Service has NOT allowed this message, queue it and wait for an ACK */
-    GNUNET_assert (NULL == ch->pending_env);
-    ch->pending_env = env;
-  }
+    ch->mq_cont
+      = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now,
+                                  ch);
 }
 
 
@@ -515,12 +556,16 @@ cadet_mq_error_handler (void *cls,
 {
   struct GNUNET_CADET_Channel *ch = cls;
 
-  GNUNET_break_op (0);
-  if (GNUNET_MQ_ERROR_NO_MATCH)
+  GNUNET_break (0);
+  if (GNUNET_MQ_ERROR_NO_MATCH == error)
   {
     /* Got a message we did not understand, still try to continue! */
     GNUNET_CADET_receive_done (ch);
   }
+  else
+  {
+    schedule_reconnect (ch->cadet);
+  }
 }
 
 
@@ -537,11 +582,14 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
 {
   struct GNUNET_CADET_Channel *ch = impl_state;
 
-  LOG (GNUNET_ERROR_TYPE_WARNING,
-       "Cannot cancel mq message on channel %X of %p\n",
-       ch->ccn.channel_of_client,
-       ch->cadet);
-  GNUNET_break (0);
+  GNUNET_assert (NULL != ch->pending_env);
+  GNUNET_MQ_discard (ch->pending_env);
+  ch->pending_env = NULL;
+  if (NULL != ch->mq_cont)
+  {
+    GNUNET_SCHEDULER_cancel (ch->mq_cont);
+    ch->mq_cont = NULL;
+  }
 }
 
 
@@ -568,7 +616,8 @@ handle_channel_created (void *cls,
     GNUNET_break (0);
     return;
   }
-  port = find_port (h, port_number);
+  port = find_port (h,
+                    port_number);
   if (NULL == port)
   {
     /* We could have closed the port but the service didn't know about it yet
@@ -577,7 +626,6 @@ handle_channel_created (void *cls,
     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
     struct GNUNET_MQ_Envelope *env;
 
-    GNUNET_break (0);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "No handler for incoming channel %X (on port %s, recently closed?)\n",
          ntohl (ccn.channel_of_client),
@@ -633,16 +681,18 @@ handle_channel_destroy (void *cls,
   struct GNUNET_CADET_Handle *h = cls;
   struct GNUNET_CADET_Channel *ch;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received channel destroy for channel %X from CADET service\n",
-       ntohl (msg->ccn.channel_of_client));
   ch = find_channel (h,
                      msg->ccn);
   if (NULL == ch)
   {
-    GNUNET_break (0);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received channel destroy for unknown channel %X from CADET service (recently close?)\n",
+         ntohl (msg->ccn.channel_of_client));
     return;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received channel destroy for channel %X from CADET service\n",
+       ntohl (msg->ccn.channel_of_client));
   destroy_channel (ch);
 }
 
@@ -659,25 +709,14 @@ static int
 check_local_data (void *cls,
                   const struct GNUNET_CADET_LocalData *message)
 {
-  struct GNUNET_CADET_Handle *h = cls;
-  struct GNUNET_CADET_Channel *ch;
   uint16_t size;
 
   size = ntohs (message->header.size);
   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  ch = find_channel (h,
-                     message->ccn);
-  if (NULL == ch)
-  {
-    GNUNET_break_op (0);
+    GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-
   return GNUNET_OK;
 }
 
@@ -702,12 +741,13 @@ handle_local_data (void *cls,
                      message->ccn);
   if (NULL == ch)
   {
-    GNUNET_break_op (0);
-    reconnect (h);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Unknown channel %X for incoming data (recently closed?)\n",
+         ntohl (message->ccn.channel_of_client));
     return;
   }
 
-  payload = (struct GNUNET_MessageHeader *) &message[1];
+  payload = (const struct GNUNET_MessageHeader *) &message[1];
   type = ntohs (payload->type);
   fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -752,19 +792,16 @@ handle_local_ack (void *cls,
          ntohl (ch->ccn.channel_of_client),
          ch->allow_send);
     notify_window_size (ch);
+    return;
   }
-  else
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Got an ACK on mq channel %X, sending pending message!\n",
-         ntohl (ch->ccn.channel_of_client));
-    GNUNET_MQ_send (h->mq,
-                    ch->pending_env);
-    ch->allow_send--;
-    ch->pending_env = NULL;
-    ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue,
-                                            ch);
-  }
+  if (NULL != ch->mq_cont)
+    return; /* already working on it! */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got an ACK on mq channel %X, sending pending message!\n",
+       ntohl (ch->ccn.channel_of_client));
+  ch->mq_cont
+    = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now,
+                                ch);
 }
 
 
@@ -782,48 +819,15 @@ handle_mq_error (void *cls,
 {
   struct GNUNET_CADET_Handle *h = cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "MQ ERROR: %u\n",
+              error);
   GNUNET_MQ_destroy (h->mq);
   h->mq = NULL;
   reconnect (h);
 }
 
 
-/**
- * Check that message received from CADET service is well-formed.
- *
- * @param cls the `struct GNUNET_CADET_Handle`
- * @param message the message we got
- * @return #GNUNET_OK if the message is well-formed,
- *         #GNUNET_SYSERR otherwise
- */
-static int
-check_get_peers (void *cls,
-                 const struct GNUNET_CADET_LocalInfoPeer *message)
-{
-  struct GNUNET_CADET_Handle *h = cls;
-  uint16_t size;
-
-  if (NULL == h->info_cb.peers_cb)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "  no handler for peesr monitor message!\n");
-    return GNUNET_SYSERR;
-  }
-
-  size = ntohs (message->header.size);
-  if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size)
-  {
-    h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0);
-    h->info_cb.peers_cb = NULL;
-    h->info_cls = NULL;
-    return GNUNET_SYSERR;
-  }
-
-  return GNUNET_OK;
-}
-
-
 /**
  * Process a local reply about info on all tunnels, pass info to the user.
  *
@@ -835,9 +839,13 @@ handle_get_peers (void *cls,
                   const struct GNUNET_CADET_LocalInfoPeer *msg)
 {
   struct GNUNET_CADET_Handle *h = cls;
-  h->info_cb.peers_cb (h->info_cls, &msg->destination,
+
+  if (NULL == h->info_cb.peers_cb)
+    return;
+  h->info_cb.peers_cb (h->info_cls,
+                       &msg->destination,
                        (int) ntohs (msg->tunnel),
-                       (unsigned int ) ntohs (msg->paths),
+                       (unsigned int) ntohs (msg->paths),
                        0);
 }
 
@@ -854,62 +862,39 @@ static int
 check_get_peer (void *cls,
                 const struct GNUNET_CADET_LocalInfoPeer *message)
 {
-  struct GNUNET_CADET_Handle *h = cls;
-  const size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
-  struct GNUNET_PeerIdentity *paths_array;
+  size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
+  const struct GNUNET_PeerIdentity *paths_array;
   size_t esize;
   unsigned int epaths;
   unsigned int paths;
   unsigned int peers;
 
-  if (NULL == h->info_cb.peer_cb)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "  no handler for peer monitor message!\n");
-    goto clean_cls;
-  }
-
-  /* Verify message sanity */
   esize = ntohs (message->header.size);
   if (esize < msize)
   {
-    GNUNET_break_op (0);
-    h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
-    goto clean_cls;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity)))
   {
-    GNUNET_break_op (0);
-    h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
-    goto clean_cls;
-
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
-  epaths = (unsigned int) ntohs (message->paths);
-  paths_array = (struct GNUNET_PeerIdentity *) &message[1];
+  epaths = ntohs (message->paths);
+  paths_array = (const struct GNUNET_PeerIdentity *) &message[1];
   paths = 0;
-  for (int i = 0; i < peers; i++)
-  {
-    if (0 == memcmp (&paths_array[i], &message->destination,
+  for (unsigned int i = 0; i < peers; i++)
+    if (0 == memcmp (&paths_array[i],
+                     &message->destination,
                      sizeof (struct GNUNET_PeerIdentity)))
-    {
       paths++;
-    }
-  }
   if (paths != epaths)
   {
-    GNUNET_break_op (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths);
-    h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL);
-    goto clean_cls;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-
   return GNUNET_OK;
-
-clean_cls:
-  h->info_cb.peer_cb = NULL;
-  h->info_cls = NULL;
-  return GNUNET_SYSERR;
 }
 
 
@@ -924,22 +909,26 @@ handle_get_peer (void *cls,
                  const struct GNUNET_CADET_LocalInfoPeer *message)
 {
   struct GNUNET_CADET_Handle *h = cls;
-  struct GNUNET_PeerIdentity *paths_array;
+  const struct GNUNET_PeerIdentity *paths_array;
   unsigned int paths;
   unsigned int path_length;
   int neighbor;
   unsigned int peers;
 
-  paths = (unsigned int) ntohs (message->paths);
-  paths_array = (struct GNUNET_PeerIdentity *) &message[1];
+  if (NULL == h->info_cb.peer_cb)
+    return;
+  paths = ntohs (message->paths);
+  paths_array = (const struct GNUNET_PeerIdentity *) &message[1];
   peers = (ntohs (message->header.size) - sizeof (*message))
           / sizeof (struct GNUNET_PeerIdentity);
   path_length = 0;
   neighbor = GNUNET_NO;
 
-  for (int i = 0; i < peers; i++)
+  for (unsigned int i = 0; i < peers; i++)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&paths_array[i]));
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                " %s\n",
+                GNUNET_i2s (&paths_array[i]));
     path_length++;
     if (0 == memcmp (&paths_array[i], &message->destination,
                      sizeof (struct GNUNET_PeerIdentity)))
@@ -951,7 +940,7 @@ handle_get_peer (void *cls,
   }
 
   /* Call Callback with tunnel info. */
-  paths_array = (struct GNUNET_PeerIdentity *) &message[1];
+  paths_array = (const struct GNUNET_PeerIdentity *) &message[1];
   h->info_cb.peer_cb (h->info_cls,
                       &message->destination,
                       (int) ntohs (message->tunnel),
@@ -961,40 +950,6 @@ handle_get_peer (void *cls,
 }
 
 
-/**
- * Check that message received from CADET service is well-formed.
- *
- * @param cls the `struct GNUNET_CADET_Handle`
- * @param msg the message we got
- * @return #GNUNET_OK if the message is well-formed,
- *         #GNUNET_SYSERR otherwise
- */
-static int
-check_get_tunnels (void *cls,
-                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
-{
-  struct GNUNET_CADET_Handle *h = cls;
-  uint16_t size;
-
-  if (NULL == h->info_cb.tunnels_cb)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "  no handler for tunnels monitor message!\n");
-    return GNUNET_SYSERR;
-  }
-
-  size = ntohs (msg->header.size);
-  if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size)
-  {
-    h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0);
-    h->info_cb.tunnels_cb = NULL;
-    h->info_cls = NULL;
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_OK;
-}
-
-
 /**
  * Process a local reply about info on all tunnels, pass info to the user.
  *
@@ -1007,6 +962,8 @@ handle_get_tunnels (void *cls,
 {
   struct GNUNET_CADET_Handle *h = cls;
 
+  if (NULL == h->info_cb.tunnels_cb)
+    return;
   h->info_cb.tunnels_cb (h->info_cls,
                          &msg->destination,
                          ntohl (msg->channels),
@@ -1029,28 +986,18 @@ static int
 check_get_tunnel (void *cls,
                   const struct GNUNET_CADET_LocalInfoTunnel *msg)
 {
-  struct GNUNET_CADET_Handle *h = cls;
   unsigned int ch_n;
   unsigned int c_n;
   size_t esize;
   size_t msize;
 
-  if (NULL == h->info_cb.tunnel_cb)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "  no handler for tunnel monitor message!\n");
-    goto clean_cls;
-  }
-
   /* Verify message sanity */
   msize = ntohs (msg->header.size);
   esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel);
   if (esize > msize)
   {
-    GNUNET_break_op (0);
-    h->info_cb.tunnel_cb (h->info_cls,
-                          NULL, 0, 0, NULL, NULL, 0, 0);
-    goto clean_cls;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   ch_n = ntohl (msg->channels);
   c_n = ntohl (msg->connections);
@@ -1065,17 +1012,9 @@ check_get_tunnel (void *cls,
                 (unsigned int) esize,
                 ch_n,
                 c_n);
-    h->info_cb.tunnel_cb (h->info_cls,
-                          NULL, 0, 0, NULL, NULL, 0, 0);
-    goto clean_cls;
+    return GNUNET_SYSERR;
   }
-
   return GNUNET_OK;
-
-clean_cls:
-  h->info_cb.tunnel_cb = NULL;
-  h->info_cls = NULL;
-  return GNUNET_SYSERR;
 }
 
 
@@ -1095,6 +1034,9 @@ handle_get_tunnel (void *cls,
   const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns;
   const struct GNUNET_CADET_ChannelTunnelNumber *chns;
 
+  if (NULL == h->info_cb.tunnel_cb)
+    return;
+
   ch_n = ntohl (msg->channels);
   c_n = ntohl (msg->connections);
 
@@ -1112,28 +1054,6 @@ handle_get_tunnel (void *cls,
 }
 
 
-/**
- * Function called during #reconnect() to destroy
- * all channels that are still open.
- *
- * @param cls the `struct GNUNET_CADET_Handle`
- * @param cid chanenl ID
- * @param value a `struct GNUNET_CADET_Channel` to destroy
- * @return #GNUNET_OK (continue to iterate)
- */
-static int
-destroy_channel_on_reconnect_cb (void *cls,
-                                 uint32_t cid,
-                                 void *value)
-{
-  /* struct GNUNET_CADET_Handle *handle = cls; */
-  struct GNUNET_CADET_Channel *ch = value;
-
-  destroy_channel (ch);
-  return GNUNET_OK;
-}
-
-
 /**
  * Reconnect to the service, retransmit all infomation to try to restore the
  * original state.
@@ -1160,18 +1080,18 @@ reconnect (struct GNUNET_CADET_Handle *h)
                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
                              struct GNUNET_CADET_LocalAck,
                              h),
-    GNUNET_MQ_hd_var_size (get_peers,
-                           GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
-                           struct GNUNET_CADET_LocalInfoPeer,
-                           h),
+    GNUNET_MQ_hd_fixed_size (get_peers,
+                             GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
+                             struct GNUNET_CADET_LocalInfoPeer,
+                             h),
     GNUNET_MQ_hd_var_size (get_peer,
                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
                            struct GNUNET_CADET_LocalInfoPeer,
                            h),
-    GNUNET_MQ_hd_var_size (get_tunnels,
-                           GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
-                           struct GNUNET_CADET_LocalInfoTunnel,
-                           h),
+    GNUNET_MQ_hd_fixed_size (get_tunnels,
+                             GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
+                             struct GNUNET_CADET_LocalInfoTunnel,
+                             h),
     GNUNET_MQ_hd_var_size (get_tunnel,
                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
                            struct GNUNET_CADET_LocalInfoTunnel,
@@ -1179,14 +1099,6 @@ reconnect (struct GNUNET_CADET_Handle *h)
     GNUNET_MQ_handler_end ()
   };
 
-  GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
-                                           &destroy_channel_on_reconnect_cb,
-                                           h);
-  if (NULL != h->mq)
-  {
-    GNUNET_MQ_destroy (h->mq);
-    h->mq = NULL;
-  }
   h->mq = GNUNET_CLIENT_connect (h->cfg,
                                  "cadet",
                                  handlers,
@@ -1247,6 +1159,7 @@ destroy_port_cb (void *cls,
   /* struct GNUNET_CADET_Handle *handle = cls; */
   struct GNUNET_CADET_Port *port = value;
 
+  /* This is a warning, the app should have cleanly closed all open ports */
   GNUNET_break (0);
   GNUNET_CADET_close_port (port);
   return GNUNET_OK;
@@ -1309,6 +1222,7 @@ GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
                  GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports,
                                                        &p->id,
                                                        p));
+  GNUNET_free_non_null (p->handlers);
   GNUNET_free (p);
 }
 
@@ -1329,11 +1243,14 @@ GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
   struct GNUNET_CADET_LocalChannelDestroyMessage *msg;
   struct GNUNET_MQ_Envelope *env;
 
-  env = GNUNET_MQ_msg (msg,
-                       GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
-  msg->ccn = channel->ccn;
-  GNUNET_MQ_send (h->mq,
-                  env);
+  if (NULL != h->mq)
+  {
+    env = GNUNET_MQ_msg (msg,
+                         GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
+    msg->ccn = channel->ccn;
+    GNUNET_MQ_send (h->mq,
+                    env);
+  }
   destroy_channel (channel);
 }
 
@@ -1696,17 +1613,7 @@ GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
   p->cls = connects_cls;
   p->window_changes = window_changes;
   p->disconnects = disconnects;
-  if (NULL != handlers)
-  {
-    unsigned int i;
-
-    for (i=0;NULL != handlers[i].cb; i++) ;
-    p->handlers = GNUNET_new_array (i + 1,
-                                    struct GNUNET_MQ_MessageHandler);
-    GNUNET_memcpy ((struct GNUNET_MQ_MessageHandler *) p->handlers,
-                   handlers,
-                   i * sizeof (struct GNUNET_MQ_MessageHandler));
-  }
+  p->handlers = GNUNET_MQ_copy_handlers (handlers);
 
   GNUNET_assert (GNUNET_OK ==
                 GNUNET_CONTAINER_multihashmap_put (h->ports,