multicast: add new test case to Makefile.am
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
index de65e0ab743a773531cffefefca95a81ce17dfbd..b068f13083e72b557ca586532d857b2aeb125a1a 100644 (file)
@@ -162,6 +162,16 @@ struct Channel
    */
   struct GNUNET_PeerIdentity peer;
 
+  /**
+   * Current window size, set by cadet_notify_window_change()
+   */
+  int32_t window_size;
+
+  /**
+   * Is the connection established?
+   */
+  int8_t is_connected;
+
   /**
    * Is the remote peer admitted to the group?
    * @see enum JoinStatus
@@ -336,6 +346,17 @@ struct ReplayRequestKey
 };
 
 
+static struct Channel *
+cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer);
+
+static void
+cadet_channel_destroy (struct Channel *chn);
+
+static void
+client_send_join_decision (struct Member *mem,
+                           const struct MulticastJoinDecisionMessageHeader *hdcsn);
+
+
 /**
  * Task run during shutdown.
  *
@@ -497,7 +518,7 @@ replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *clien
   {
     if (c == client)
     {
-      GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
+      GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client);
       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
       return GNUNET_YES;
     }
@@ -653,6 +674,9 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
 static void
 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Sending message ACK to client.\n");
+
   static struct GNUNET_MessageHeader *msg = NULL;
   if (NULL == msg)
   {
@@ -671,36 +695,6 @@ struct CadetTransmitClosure
 };
 
 
-/**
- * CADET is ready to transmit a message.
- */
-size_t
-cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
-{
-  if (0 == buf_size)
-  {
-    /* FIXME: connection closed */
-    return 0;
-  }
-  struct CadetTransmitClosure *tcls = cls;
-  struct Channel *chn = tcls->chn;
-  uint16_t msg_size = ntohs (tcls->msg->size);
-  GNUNET_assert (msg_size <= buf_size);
-  GNUNET_memcpy (buf, tcls->msg, msg_size);
-  GNUNET_free (tcls);
-
-  if (0 == chn->msgs_pending)
-  {
-    GNUNET_break (0);
-  }
-  else if (0 == --chn->msgs_pending)
-  {
-    client_send_ack (&chn->group_pub_hash);
-  }
-  return msg_size;
-}
-
-
 /**
  * Send a message to a CADET channel.
  *
@@ -710,53 +704,22 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
 static void
 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
 {
-  uint16_t msg_size = ntohs (msg->size);
-  struct GNUNET_MessageHeader *msg_copy = GNUNET_malloc (msg_size);
-  GNUNET_memcpy (msg_copy, msg, msg_size);
-
-  struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
-  tcls->chn = chn;
-  tcls->msg = msg_copy;
-
-  chn->msgs_pending++;
-  chn->tmit_handle
-    = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          msg_size,
-                                          &cadet_notify_transmit_ready,
-                                          tcls);
-  GNUNET_assert (NULL != chn->tmit_handle);
-}
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_copy (msg);
 
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env);
 
-/**
- * Create new outgoing CADET channel.
- *
- * @param peer
- *        Peer to connect to.
- * @param group_pub_key
- *        Public key of group the channel belongs to.
- * @param group_pub_hash
- *        Hash of @a group_pub_key.
- *
- * @return Channel.
- */
-static struct Channel *
-cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
-{
-  struct Channel *chn = GNUNET_malloc (sizeof (*chn));
-  chn->group = grp;
-  chn->group_pub_key = grp->pub_key;
-  chn->group_pub_hash = grp->pub_key_hash;
-  chn->peer = *peer;
-  chn->direction = DIR_OUTGOING;
-  chn->join_status = JOIN_WAITING;
-  chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
-                                              &grp->cadet_port_hash,
-                                              GNUNET_CADET_OPTION_RELIABLE);
-  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  return chn;
+  if (0 < chn->window_size)
+  {
+    client_send_ack (&chn->group_pub_hash);
+  }
+  else
+  {
+    chn->msgs_pending++;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "%p Queuing message. Pending messages: %u\n",
+                chn, chn->msgs_pending);
+  }
 }
 
 
@@ -787,7 +750,7 @@ cadet_send_join_decision_cb (void *cls,
   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
   struct Channel *chn = channel;
 
-  const struct MulticastJoinDecisionMessage *dcsn = 
+  const struct MulticastJoinDecisionMessage *dcsn =
     (struct MulticastJoinDecisionMessage *) &hdcsn[1];
 
   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
@@ -870,31 +833,74 @@ cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
 
 
 /**
- * New incoming CADET channel.
+ * CADET channel connect handler.
+ *
+ * @see GNUNET_CADET_ConnectEventHandler()
  */
 static void *
-cadet_notify_channel_new (void *cls,
-                          struct GNUNET_CADET_Channel *channel,
-                          const struct GNUNET_PeerIdentity *initiator,
-                          const struct GNUNET_HashCode *port,
-                          enum GNUNET_CADET_ChannelOption options)
+cadet_notify_connect (void *cls,
+                      struct GNUNET_CADET_Channel *channel,
+                      const struct GNUNET_PeerIdentity *source)
+{
+  struct Channel *chn = GNUNET_malloc (sizeof *chn);
+  chn->group = cls;
+  chn->channel = channel;
+  chn->direction = DIR_INCOMING;
+  chn->join_status = JOIN_NOT_ASKED;
+
+  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  return chn;
+}
+
+
+/**
+ * CADET window size change handler.
+ *
+ * @see GNUNET_CADET_WindowSizeEventHandler()
+ */
+static void
+cadet_notify_window_change (void *cls,
+                            const struct GNUNET_CADET_Channel *channel,
+                            int window_size)
 {
-  return NULL;
+  struct Channel *chn = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "%p Window size changed to %d.  Pending messages: %u\n",
+              chn, window_size, chn->msgs_pending);
+
+  chn->is_connected = GNUNET_YES;
+  chn->window_size = (int32_t) window_size;
+
+  for (int i = 0; i < window_size; i++)
+  {
+    if (0 < chn->msgs_pending)
+    {
+      client_send_ack (&chn->group_pub_hash);
+      chn->msgs_pending--;
+    }
+    else
+    {
+      break;
+    }
+  }
 }
 
 
 /**
- * CADET channel is being destroyed.
+ * CADET channel disconnect handler.
+ *
+ * @see GNUNET_CADET_DisconnectEventHandler()
  */
 static void
-cadet_notify_channel_end (void *cls,
-                          const struct GNUNET_CADET_Channel *channel,
-                          void *ctx)
+cadet_notify_disconnect (void *cls,
+                         const struct GNUNET_CADET_Channel *channel)
 {
-  if (NULL == ctx)
+  if (NULL == cls)
     return;
 
-  struct Channel *chn = ctx;
+  struct Channel *chn = cls;
   if (NULL != chn->group)
   {
     if (GNUNET_NO == chn->group->is_origin)
@@ -905,26 +911,417 @@ cadet_notify_channel_end (void *cls,
     }
   }
 
-  while (GNUNET_YES == replay_req_remove_cadet (chn));
+  int ret;
+  do
+  {
+    ret = replay_req_remove_cadet (chn);
+  }
+  while (GNUNET_YES == ret);
+
+  GNUNET_free (chn);
+}
+
+
+static int
+check_cadet_join_request (void *cls,
+                          const struct MulticastJoinRequestMessage *req)
+{
+  struct Channel *chn = cls;
+
+  if (NULL == chn
+      || JOIN_NOT_ASKED != chn->join_status)
+  {
+    return GNUNET_SYSERR;
+  }
+
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (req->purpose.size) != (size
+                                    - sizeof (req->header)
+                                    - sizeof (req->reserved)
+                                    - sizeof (req->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
+                                  &req->purpose, &req->signature,
+                                  &req->member_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming join request message from CADET.
+ */
+static void
+handle_cadet_join_request (void *cls,
+                           const struct MulticastJoinRequestMessage *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  struct GNUNET_HashCode group_pub_hash;
+  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
+  chn->group_pub_key = req->group_pub_key;
+  chn->group_pub_hash = group_pub_hash;
+  chn->member_pub_key = req->member_pub_key;
+  chn->peer = req->peer;
+  chn->join_status = JOIN_WAITING;
+
+  client_send_all (&group_pub_hash, &req->header);
+}
+
+
+static int
+check_cadet_join_decision (void *cls,
+                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  uint16_t size = ntohs (hdcsn->header.size);
+  if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
+             sizeof (struct MulticastJoinDecisionMessage))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  switch (chn->join_status)
+  {
+  case JOIN_REFUSED:
+    return GNUNET_SYSERR;
+
+  case JOIN_ADMITTED:
+    return GNUNET_OK;
+
+  case JOIN_NOT_ASKED:
+  case JOIN_WAITING:
+    break;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming join decision message from CADET.
+ */
+static void
+handle_cadet_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  const struct MulticastJoinDecisionMessage *
+    dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
+
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
+  struct Member *mem = (struct Member *) chn->group;
+  client_send_join_decision (mem, hdcsn);
+  if (GNUNET_YES == ntohl (dcsn->is_admitted))
+  {
+    chn->join_status = JOIN_ADMITTED;
+  }
+  else
+  {
+    chn->join_status = JOIN_REFUSED;
+    cadet_channel_destroy (chn);
+  }
+}
+
+
+static int
+check_cadet_message (void *cls,
+                     const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  uint16_t size = ntohs (msg->header.size);
+  if (size < sizeof (*msg))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (msg->purpose.size) != (size
+                                    - sizeof (msg->header)
+                                    - sizeof (msg->hop_counter)
+                                    - sizeof (msg->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
+                                  &msg->purpose, &msg->signature,
+                                  &chn->group_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast message from CADET.
+ */
+static void
+handle_cadet_message (void *cls,
+                      const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+  client_send_all (&chn->group_pub_hash, &msg->header);
+}
+
+
+static int
+check_cadet_request (void *cls,
+                     const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (req->purpose.size) != (size
+                                    - sizeof (req->header)
+                                    - sizeof (req->member_pub_key)
+                                    - sizeof (req->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
+                                  &req->purpose, &req->signature,
+                                  &req->member_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast request message from CADET.
+ */
+static void
+handle_cadet_request (void *cls,
+                      const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+  client_send_origin (&chn->group_pub_hash, &req->header);
+}
+
+
+static int
+check_cadet_replay_request (void *cls,
+                            const struct MulticastReplayRequestMessage *req)
+{
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay request from CADET.
+ */
+static void
+handle_cadet_replay_request (void *cls,
+                             const struct MulticastReplayRequestMessage *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  struct MulticastReplayRequestMessage rep = *req;
+  GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                        &chn->group->pub_key_hash);
+  if (NULL == grp_replay_req)
+  {
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
+                                       &chn->group->pub_key_hash, grp_replay_req,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
+                   rep.flags, &key_hash);
+  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  client_send_random (&chn->group_pub_hash, &rep.header);
+}
+
+
+static int
+check_cadet_replay_response (void *cls,
+                             const struct MulticastReplayResponseMessage *res)
+{
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay response from CADET.
+ */
+static void
+handle_cadet_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  /* @todo FIXME: got replay error response, send request to other members */
+}
+
+
+static void
+group_set_cadet_port_hash (struct Group *grp)
+{
+  struct CadetPort {
+    struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+    uint32_t app_type;
+  } port = {
+    grp->pub_key,
+    GNUNET_APPLICATION_TYPE_MULTICAST,
+  };
+  GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
+}
+
+
+
+/**
+ * Create new outgoing CADET channel.
+ *
+ * @param peer
+ *        Peer to connect to.
+ * @param group_pub_key
+ *        Public key of group the channel belongs to.
+ * @param group_pub_hash
+ *        Hash of @a group_pub_key.
+ *
+ * @return Channel.
+ */
+static struct Channel *
+cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
+{
+  struct Channel *chn = GNUNET_malloc (sizeof (*chn));
+  chn->group = grp;
+  chn->group_pub_key = grp->pub_key;
+  chn->group_pub_hash = grp->pub_key_hash;
+  chn->peer = *peer;
+  chn->direction = DIR_OUTGOING;
+  chn->is_connected = GNUNET_NO;
+  chn->join_status = JOIN_WAITING;
+
+  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+    GNUNET_MQ_hd_var_size (cadet_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           chn),
 
-  GNUNET_free (chn);
+    GNUNET_MQ_hd_var_size (cadet_join_decision,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                           struct MulticastJoinDecisionMessageHeader,
+                           chn),
+
+    GNUNET_MQ_hd_var_size (cadet_replay_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                           struct MulticastReplayRequestMessage,
+                           chn),
+
+    GNUNET_MQ_hd_var_size (cadet_replay_response,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                           struct MulticastReplayResponseMessage,
+                           chn),
+
+    GNUNET_MQ_handler_end ()
+  };
+
+  chn->channel = GNUNET_CADET_channel_creatE (cadet, chn, &chn->peer,
+                                              &grp->cadet_port_hash,
+                                              GNUNET_CADET_OPTION_RELIABLE,
+                                              cadet_notify_window_change,
+                                              cadet_notify_disconnect,
+                                              cadet_handlers);
+  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  return chn;
 }
 
 
+/**
+ * Destroy outgoing CADET channel.
+ */
 static void
-group_set_cadet_port_hash (struct Group *grp)
+cadet_channel_destroy (struct Channel *chn)
 {
-  struct CadetPort {
-    struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-    uint32_t app_type;
-  } port = {
-    grp->pub_key,
-    GNUNET_APPLICATION_TYPE_MULTICAST,
-  };
-  GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
+  GNUNET_CADET_channel_destroy (chn->channel);
+  GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash);
+  GNUNET_free (chn);
 }
 
-
 /**
  * Handle a connecting client starting an origin.
  */
@@ -961,8 +1358,44 @@ handle_client_origin_start (void *cls,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
 
     group_set_cadet_port_hash (grp);
-    orig->cadet_port = GNUNET_CADET_open_port (cadet, &grp->cadet_port_hash,
-                                               cadet_notify_channel_new, NULL);
+
+    struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+      GNUNET_MQ_hd_var_size (cadet_message,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                             struct GNUNET_MULTICAST_MessageHeader,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                             struct GNUNET_MULTICAST_RequestHeader,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_join_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                             struct MulticastJoinRequestMessage,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_replay_response,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                             struct MulticastReplayResponseMessage,
+                             grp),
+
+      GNUNET_MQ_handler_end ()
+    };
+
+
+    orig->cadet_port = GNUNET_CADET_open_porT (cadet,
+                                               &grp->cadet_port_hash,
+                                               cadet_notify_connect,
+                                               NULL,
+                                               cadet_notify_window_change,
+                                               cadet_notify_disconnect,
+                                               cadet_handlers);
   }
   else
   {
@@ -1258,10 +1691,8 @@ handle_client_multicast_message (void *cls,
   }
 
   client_send_all (&grp->pub_key_hash, &out->header);
-  if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
-  {
-    client_send_ack (&grp->pub_key_hash);
-  }
+  cadet_send_children (&grp->pub_key_hash, &out->header);
+  client_send_ack (&grp->pub_key_hash);
   GNUNET_free (out);
 
   GNUNET_SERVICE_client_continue (client);
@@ -1543,278 +1974,6 @@ handle_client_replay_response (void *cls,
 }
 
 
-/**
- * Incoming join request message from CADET.
- */
-int
-cadet_recv_join_request (void *cls,
-                         struct GNUNET_CADET_Channel *channel,
-                         void **ctx,
-                         const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  const struct MulticastJoinRequestMessage *
-    req = (const struct MulticastJoinRequestMessage *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*req))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (NULL != *ctx)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (req->purpose.size) != (size
-                                    - sizeof (req->header)
-                                    - sizeof (req->reserved)
-                                    - sizeof (req->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
-                                  &req->purpose, &req->signature,
-                                  &req->member_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  struct GNUNET_HashCode group_pub_hash;
-  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
-
-  struct Channel *chn = GNUNET_malloc (sizeof *chn);
-  chn->channel = channel;
-  chn->group_pub_key = req->group_pub_key;
-  chn->group_pub_hash = group_pub_hash;
-  chn->member_pub_key = req->member_pub_key;
-  chn->peer = req->peer;
-  chn->join_status = JOIN_WAITING;
-  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  *ctx = chn;
-
-  client_send_all (&group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming join decision message from CADET.
- */
-int
-cadet_recv_join_decision (void *cls,
-                          struct GNUNET_CADET_Channel *channel,
-                          void **ctx,
-                          const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done (channel);
-  const struct MulticastJoinDecisionMessageHeader *
-    hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
-  const struct MulticastJoinDecisionMessage *
-    dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
-             sizeof (struct MulticastJoinDecisionMessage))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  switch (chn->join_status)
-  {
-  case JOIN_REFUSED:
-    return GNUNET_SYSERR;
-
-  case JOIN_ADMITTED:
-    return GNUNET_OK;
-
-  case JOIN_NOT_ASKED:
-  case JOIN_WAITING:
-    break;
-  }
-
-  // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
-  struct Member *mem = (struct Member *) chn->group;
-  client_send_join_decision (mem, hdcsn);
-  if (GNUNET_YES == ntohl (dcsn->is_admitted))
-  {
-    chn->join_status = JOIN_ADMITTED;
-    return GNUNET_OK;
-  }
-  else
-  {
-    chn->join_status = JOIN_REFUSED;
-    return GNUNET_SYSERR;
-  }
-}
-
-/**
- * Incoming multicast message from CADET.
- */
-int
-cadet_recv_message (void *cls,
-                    struct GNUNET_CADET_Channel *channel,
-                    void **ctx,
-                    const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  const struct GNUNET_MULTICAST_MessageHeader *
-    msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*msg))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (msg->purpose.size) != (size
-                                    - sizeof (msg->header)
-                                    - sizeof (msg->hop_counter)
-                                    - sizeof (msg->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
-                                  &msg->purpose, &msg->signature,
-                                  &chn->group_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  client_send_all (&chn->group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming multicast request message from CADET.
- */
-int
-cadet_recv_request (void *cls,
-                    struct GNUNET_CADET_Channel *channel,
-                    void **ctx,
-                    const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  const struct GNUNET_MULTICAST_RequestHeader *
-    req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*req))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (req->purpose.size) != (size
-                                    - sizeof (req->header)
-                                    - sizeof (req->member_pub_key)
-                                    - sizeof (req->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
-                                  &req->purpose, &req->signature,
-                                  &req->member_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  client_send_origin (&chn->group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming multicast replay request from CADET.
- */
-int
-cadet_recv_replay_request (void *cls,
-                           struct GNUNET_CADET_Channel *channel,
-                           void **ctx,
-                           const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  struct MulticastReplayRequestMessage rep;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (rep))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-
-  GNUNET_memcpy (&rep, m, sizeof (rep));
-  GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
-
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
-                                                        &chn->group->pub_key_hash);
-  if (NULL == grp_replay_req)
-  {
-    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
-                                       &chn->group->pub_key_hash, grp_replay_req,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-  }
-  struct GNUNET_HashCode key_hash;
-  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
-                   rep.flags, &key_hash);
-  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  client_send_random (&chn->group_pub_hash, &rep.header);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming multicast replay response from CADET.
- */
-int
-cadet_recv_replay_response (void *cls,
-                            struct GNUNET_CADET_Channel *channel,
-                            void **ctx,
-                            const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  //struct Channel *chn = *ctx;
-
-  /* @todo FIXME: got replay error response, send request to other members */
-
-  return GNUNET_OK;
-}
-
-
 /**
  * A new client connected.
  *
@@ -1898,32 +2057,6 @@ client_notify_disconnect (void *cls,
 }
 
 
-/**
- * Message handlers for CADET.
- */
-static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
-  { cadet_recv_join_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
-
-  { cadet_recv_join_decision,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
-
-  { cadet_recv_message,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
-
-  { cadet_recv_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
-
-  { cadet_recv_replay_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
-
-  { cadet_recv_replay_response,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
-
-  { NULL, 0, 0 }
-};
-
-
 /**
  * Service started.
  *
@@ -1949,9 +2082,8 @@ run (void *cls,
   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 
-  cadet = GNUNET_CADET_connect (cfg, NULL,
-                                cadet_notify_channel_end,
-                                cadet_handlers);
+  cadet = GNUNET_CADET_connecT (cfg);
+
   GNUNET_assert (NULL != cadet);
 
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,