multicast: add new test case to Makefile.am
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
index 69a0d3fd7745f22c3517cd05e4a41f1c9b1576e3..b068f13083e72b557ca586532d857b2aeb125a1a 100644 (file)
@@ -28,7 +28,6 @@
 #include "gnunet_signatures.h"
 #include "gnunet_applications.h"
 #include "gnunet_statistics_service.h"
-#include "gnunet_core_service.h"
 #include "gnunet_cadet_service.h"
 #include "gnunet_multicast_service.h"
 #include "multicast.h"
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
- * Server handle.
+ * Service handle.
  */
-static struct GNUNET_SERVER_Handle *server;
-
-/**
- * Core handle.
- * Only used during initialization.
- */
-static struct GNUNET_CORE_Handle *core;
+static struct GNUNET_SERVICE_Handle *service;
 
 /**
  * CADET handle.
@@ -64,11 +57,6 @@ static struct GNUNET_PeerIdentity this_peer;
  */
 static struct GNUNET_STATISTICS_Handle *stats;
 
-/**
- * Notification context, simplifies client broadcasts.
- */
-static struct GNUNET_SERVER_NotificationContext *nc;
-
 /**
  * All connected origin clients.
  * Group's pub_key_hash -> struct Origin * (uniq)
@@ -109,7 +97,7 @@ static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
 /**
  * Incoming replay requests from clients.
  * Group's pub_key_hash ->
- *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVER_Client *
+ *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client *
  */
 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
 
@@ -142,7 +130,7 @@ struct Channel
    *
    * Only set for outgoing channels.
    */
-  struct Group *grp;
+  struct Group *group;
 
   /**
    * CADET channel.
@@ -174,6 +162,16 @@ struct Channel
    */
   struct GNUNET_PeerIdentity peer;
 
+  /**
+   * Current window size, set by cadet_notify_window_change()
+   */
+  int32_t window_size;
+
+  /**
+   * Is the connection established?
+   */
+  int8_t is_connected;
+
   /**
    * Is the remote peer admitted to the group?
    * @see enum JoinStatus
@@ -200,11 +198,12 @@ struct ClientList
 {
   struct ClientList *prev;
   struct ClientList *next;
-  struct GNUNET_SERVER_Client *client;
+  struct GNUNET_SERVICE_Client *client;
 };
 
+
 /**
- * Common part of the client context for both an origin and member.
+ * Client context for an origin or member.
  */
 struct Group
 {
@@ -222,29 +221,44 @@ struct Group
   struct GNUNET_HashCode pub_key_hash;
 
   /**
-   * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
+   * CADET port hash.
    */
-  uint8_t is_origin;
+  struct GNUNET_HashCode cadet_port_hash;
 
   /**
    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
    */
   uint8_t disconnected;
+
+  /**
+   * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
+   */
+  uint8_t is_origin;
+
+  union {
+    struct Origin *origin;
+    struct Member *member;
+  };
 };
 
 
 /**
- * Client context for a group's origin.
+* Client context for a group's origin.
  */
 struct Origin
 {
-  struct Group grp;
+  struct Group group;
 
   /**
    * Private key of the group.
    */
   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
 
+  /**
+   * CADET port.
+   */
+  struct GNUNET_CADET_Port *cadet_port;
+
   /**
    * Last message fragment ID sent to the group.
    */
@@ -257,7 +271,7 @@ struct Origin
  */
 struct Member
 {
-  struct Group grp;
+  struct Group group;
 
   /**
    * Private key of the member.
@@ -314,6 +328,15 @@ struct Member
 };
 
 
+/**
+ * Client context.
+ */
+struct Client {
+  struct GNUNET_SERVICE_Client *client;
+  struct Group *group;
+};
+
+
 struct ReplayRequestKey
 {
   uint64_t fragment_id;
@@ -323,20 +346,25 @@ struct ReplayRequestKey
 };
 
 
+static struct Channel *
+cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer);
+
+static void
+cadet_channel_destroy (struct Channel *chn);
+
+static void
+client_send_join_decision (struct Member *mem,
+                           const struct MulticastJoinDecisionMessageHeader *hdcsn);
+
+
 /**
  * Task run during shutdown.
  *
  * @param cls unused
- * @param tc unused
  */
 static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls)
 {
-  if (NULL != core)
-  {
-    GNUNET_CORE_disconnect (core);
-    core = NULL;
-  }
   if (NULL != cadet)
   {
     GNUNET_CADET_disconnect (cadet);
@@ -357,8 +385,14 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 static void
 cleanup_origin (struct Origin *orig)
 {
-  struct Group *grp = &orig->grp;
+  struct Group *grp = &orig->group;
   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
+  if (NULL != orig->cadet_port)
+  {
+    GNUNET_CADET_close_port (orig->cadet_port);
+    orig->cadet_port = NULL;
+  }
+  GNUNET_free (orig);
 }
 
 
@@ -368,7 +402,7 @@ cleanup_origin (struct Origin *orig)
 static void
 cleanup_member (struct Member *mem)
 {
-  struct Group *grp = &mem->grp;
+  struct Group *grp = &mem->group;
   struct GNUNET_CONTAINER_MultiHashMap *
     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
                                                  &grp->pub_key_hash);
@@ -387,6 +421,7 @@ cleanup_member (struct Member *mem)
     mem->join_dcsn = NULL;
   }
   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
+  GNUNET_free (mem);
 }
 
 
@@ -397,10 +432,8 @@ static void
 cleanup_group (struct Group *grp)
 {
   (GNUNET_YES == grp->is_origin)
-    ? cleanup_origin ((struct Origin *) grp)
-    : cleanup_member ((struct Member *) grp);
-
-  GNUNET_free (grp);
+    ? cleanup_origin (grp->origin)
+    : cleanup_member (grp->member);
 }
 
 
@@ -433,7 +466,7 @@ replay_req_remove_cadet (struct Channel *chn)
 {
   struct GNUNET_CONTAINER_MultiHashMap *
     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
-                                                        &chn->grp->pub_key_hash);
+                                                        &chn->group->pub_key_hash);
   if (NULL == grp_replay_req)
     return GNUNET_NO;
 
@@ -448,6 +481,7 @@ replay_req_remove_cadet (struct Channel *chn)
     if (c == chn)
     {
       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
+      GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
       return GNUNET_YES;
     }
   }
@@ -466,7 +500,7 @@ replay_req_remove_cadet (struct Channel *chn)
  *         #GNUNET_NO when reached end of hashmap.
  */
 static int
-replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client)
+replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client)
 {
   struct GNUNET_CONTAINER_MultiHashMap *
     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
@@ -477,14 +511,15 @@ replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client
   struct GNUNET_CONTAINER_MultiHashMapIterator *
     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
   struct GNUNET_HashCode key;
-  const struct GNUNET_SERVER_Client *c;
+  const struct GNUNET_SERVICE_Client *c;
   while (GNUNET_YES
          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
                                                          (const void **) &c))
   {
     if (c == client)
     {
-      GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
+      GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client);
+      GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
       return GNUNET_YES;
     }
   }
@@ -493,78 +528,21 @@ replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client
 }
 
 
-/**
- * Called whenever a client is disconnected.
- *
- * Frees our resources associated with that client.
- *
- * @param cls  Closure.
- * @param client  Client handle.
- */
-static void
-client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
-{
-  if (NULL == client)
-    return;
-
-  struct Group *grp
-    = GNUNET_SERVER_client_get_user_context (client, struct Group);
-
-  if (NULL == grp)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "%p User context is NULL in client_disconnect()\n", grp);
-    GNUNET_break (0);
-    return;
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Client (%s) disconnected from group %s\n",
-              grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
-              GNUNET_h2s (&grp->pub_key_hash));
-
-  struct ClientList *cl = grp->clients_head;
-  while (NULL != cl)
-  {
-    if (cl->client == client)
-    {
-      GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
-      GNUNET_free (cl);
-      break;
-    }
-    cl = cl->next;
-  }
-
-  while (GNUNET_YES == replay_req_remove_client (grp, client));
-
-  if (NULL == grp->clients_head)
-  { /* Last client disconnected. */
-#if FIXME
-    if (NULL != grp->tmit_head)
-    { /* Send pending messages via CADET before cleanup. */
-      transmit_message (grp);
-    }
-    else
-#endif
-    {
-      cleanup_group (grp);
-    }
-  }
-}
-
-
 /**
  * Send message to a client.
  */
 static void
-client_send (struct GNUNET_SERVER_Client *client,
+client_send (struct GNUNET_SERVICE_Client *client,
              const struct GNUNET_MessageHeader *msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "%p Sending message to client.\n", client);
 
-  GNUNET_SERVER_notification_context_add (nc, client);
-  GNUNET_SERVER_notification_context_unicast (nc, client, msg, GNUNET_NO);
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_copy (msg);
+
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
+                  env);
 }
 
 
@@ -575,14 +553,17 @@ static void
 client_send_group (const struct Group *grp,
                    const struct GNUNET_MessageHeader *msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "%p Sending message to all clients of the group.\n", grp);
 
   struct ClientList *cl = grp->clients_head;
   while (NULL != cl)
   {
-    GNUNET_SERVER_notification_context_add (nc, cl->client);
-    GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
+    struct GNUNET_MQ_Envelope *
+      env = GNUNET_MQ_msg_copy (msg);
+
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
+                    env);
     cl = cl->next;
   }
 }
@@ -598,7 +579,7 @@ client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
   const struct GNUNET_MessageHeader *msg = cls;
   struct Member *orig = origin;
 
-  client_send_group (&orig->grp, msg);
+  client_send_group (&orig->group, msg);
   return GNUNET_YES;
 }
 
@@ -615,7 +596,7 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
 
   if (NULL != mem->join_dcsn)
   { /* Only send message to admitted members */
-    client_send_group (&mem->grp, msg);
+    client_send_group (&mem->group, msg);
   }
   return GNUNET_YES;
 }
@@ -693,6 +674,9 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
 static void
 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Sending message ACK to client.\n");
+
   static struct GNUNET_MessageHeader *msg = NULL;
   if (NULL == msg)
   {
@@ -711,36 +695,6 @@ struct CadetTransmitClosure
 };
 
 
-/**
- * CADET is ready to transmit a message.
- */
-size_t
-cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
-{
-  if (0 == buf_size)
-  {
-    /* FIXME: connection closed */
-    return 0;
-  }
-  struct CadetTransmitClosure *tcls = cls;
-  struct Channel *chn = tcls->chn;
-  uint16_t msg_size = ntohs (tcls->msg->size);
-  GNUNET_assert (msg_size <= buf_size);
-  memcpy (buf, tcls->msg, msg_size);
-  GNUNET_free (tcls);
-
-  if (0 == chn->msgs_pending)
-  {
-    GNUNET_break (0);
-  }
-  else if (0 == --chn->msgs_pending)
-  {
-    client_send_ack (&chn->group_pub_hash);
-  }
-  return msg_size;
-}
-
-
 /**
  * Send a message to a CADET channel.
  *
@@ -750,49 +704,22 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
 static void
 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
 {
-  struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
-  tcls->chn = chn;
-  tcls->msg = msg;
-
-  chn->msgs_pending++;
-  chn->tmit_handle
-    = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          ntohs (msg->size),
-                                          &cadet_notify_transmit_ready,
-                                          (void *) msg);
-  GNUNET_assert (NULL != chn->tmit_handle);
-}
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_copy (msg);
 
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env);
 
-/**
- * Create new outgoing CADET channel.
- *
- * @param peer
- *        Peer to connect to.
- * @param group_pub_key
- *        Public key of group the channel belongs to.
- * @param group_pub_hash
- *        Hash of @a group_pub_key.
- *
- * @return Channel.
- */
-static struct Channel *
-cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
-{
-  struct Channel *chn = GNUNET_malloc (sizeof (*chn));
-  chn->grp = grp;
-  chn->group_pub_key = grp->pub_key;
-  chn->group_pub_hash = grp->pub_key_hash;
-  chn->peer = *peer;
-  chn->direction = DIR_OUTGOING;
-  chn->join_status = JOIN_WAITING;
-  chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
-                                              GNUNET_APPLICATION_TYPE_MULTICAST,
-                                              GNUNET_CADET_OPTION_RELIABLE);
-  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  return chn;
+  if (0 < chn->window_size)
+  {
+    client_send_ack (&chn->group_pub_hash);
+  }
+  else
+  {
+    chn->msgs_pending++;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "%p Queuing message. Pending messages: %u\n",
+                chn, chn->msgs_pending);
+  }
 }
 
 
@@ -802,14 +729,14 @@ cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
 static void
 cadet_send_join_request (struct Member *mem)
 {
-  mem->origin_channel = cadet_channel_create (&mem->grp, &mem->origin);
+  mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin);
   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
 
   uint32_t i;
   for (i = 0; i < mem->relay_count; i++)
   {
     struct Channel *
-      chn = cadet_channel_create (&mem->grp, &mem->relays[i]);
+      chn = cadet_channel_create (&mem->group, &mem->relays[i]);
     cadet_send_channel (chn, &mem->join_req->header);
   }
 }
@@ -823,9 +750,21 @@ cadet_send_join_decision_cb (void *cls,
   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
   struct Channel *chn = channel;
 
+  const struct MulticastJoinDecisionMessage *dcsn =
+    (struct MulticastJoinDecisionMessage *) &hdcsn[1];
+
   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
   {
+    if (GNUNET_YES == ntohl (dcsn->is_admitted))
+    {
+      chn->join_status = JOIN_ADMITTED;
+    }
+    else
+    {
+      chn->join_status = JOIN_REFUSED;
+    }
+
     cadet_send_channel (chn, &hdcsn->header);
     return GNUNET_NO;
   }
@@ -876,6 +815,7 @@ cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
 }
 
 
+#if 0      // unused as yet
 /**
  * Send message to all connected parents.
  */
@@ -889,137 +829,694 @@ cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
                                                      cadet_send_cb, (void *) msg);
   return n;
 }
+#endif
 
 
 /**
- * Handle a connecting client starting an origin.
+ * CADET channel connect handler.
+ *
+ * @see GNUNET_CADET_ConnectEventHandler()
  */
-static void
-client_recv_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *m)
+static void *
+cadet_notify_connect (void *cls,
+                      struct GNUNET_CADET_Channel *channel,
+                      const struct GNUNET_PeerIdentity *source)
 {
-  const struct MulticastOriginStartMessage *
-    msg = (const struct MulticastOriginStartMessage *) m;
+  struct Channel *chn = GNUNET_malloc (sizeof *chn);
+  chn->group = cls;
+  chn->channel = channel;
+  chn->direction = DIR_INCOMING;
+  chn->join_status = JOIN_NOT_ASKED;
 
-  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-  struct GNUNET_HashCode pub_key_hash;
+  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  return chn;
+}
 
-  GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
-  GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
 
-  struct Origin *
-    orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
-  struct Group *grp;
+/**
+ * CADET window size change handler.
+ *
+ * @see GNUNET_CADET_WindowSizeEventHandler()
+ */
+static void
+cadet_notify_window_change (void *cls,
+                            const struct GNUNET_CADET_Channel *channel,
+                            int window_size)
+{
+  struct Channel *chn = cls;
 
-  if (NULL == orig)
-  {
-    orig = GNUNET_new (struct Origin);
-    orig->priv_key = msg->group_key;
-    orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
-    grp = &orig->grp;
-    grp->is_origin = GNUNET_YES;
-    grp->pub_key = pub_key;
-    grp->pub_key_hash = pub_key_hash;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "%p Window size changed to %d.  Pending messages: %u\n",
+              chn, window_size, chn->msgs_pending);
 
-    GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-  }
-  else
+  chn->is_connected = GNUNET_YES;
+  chn->window_size = (int32_t) window_size;
+
+  for (int i = 0; i < window_size; i++)
   {
-    grp = &orig->grp;
+    if (0 < chn->msgs_pending)
+    {
+      client_send_ack (&chn->group_pub_hash);
+      chn->msgs_pending--;
+    }
+    else
+    {
+      break;
+    }
   }
-
-  struct ClientList *cl = GNUNET_new (struct ClientList);
-  cl->client = client;
-  GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Client connected as origin to group %s.\n",
-              orig, GNUNET_h2s (&grp->pub_key_hash));
-
-  GNUNET_SERVER_client_set_user_context (client, grp);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 /**
- * Handle a connecting client joining a group.
+ * CADET channel disconnect handler.
+ *
+ * @see GNUNET_CADET_DisconnectEventHandler()
  */
 static void
-client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
-                         const struct GNUNET_MessageHeader *m)
+cadet_notify_disconnect (void *cls,
+                         const struct GNUNET_CADET_Channel *channel)
 {
-  const struct MulticastMemberJoinMessage *
-    msg = (const struct MulticastMemberJoinMessage *) m;
-  uint16_t msg_size = ntohs (msg->header.size);
-
-  struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
-  struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
-
-  GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
-  GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
-  GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
-
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
-  struct Member *mem = NULL;
-  struct Group *grp;
+  if (NULL == cls)
+    return;
 
-  if (NULL != grp_mem)
-  {
-    mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
-  }
-  if (NULL == mem)
+  struct Channel *chn = cls;
+  if (NULL != chn->group)
   {
-    mem = GNUNET_new (struct Member);
-    mem->priv_key = msg->member_key;
-    mem->pub_key = mem_pub_key;
-    mem->pub_key_hash = mem_pub_key_hash;
-    mem->max_fragment_id = 0; // FIXME
-
-    grp = &mem->grp;
-    grp->is_origin = GNUNET_NO;
-    grp->pub_key = msg->group_pub_key;
-    grp->pub_key_hash = pub_key_hash;
-
-    if (NULL == grp_mem)
+    if (GNUNET_NO == chn->group->is_origin)
     {
-      grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-      GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
-                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+      struct Member *mem = (struct Member *) chn->group;
+      if (chn == mem->origin_channel)
+        mem->origin_channel = NULL;
     }
-    GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-    GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   }
-  else
+
+  int ret;
+  do
   {
-    grp = &mem->grp;
+    ret = replay_req_remove_cadet (chn);
   }
+  while (GNUNET_YES == ret);
 
-  struct ClientList *cl = GNUNET_new (struct ClientList);
-  cl->client = client;
-  GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+  GNUNET_free (chn);
+}
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Client connected to group %s..\n",
-              mem, GNUNET_h2s (&grp->pub_key_hash));
+
+static int
+check_cadet_join_request (void *cls,
+                          const struct MulticastJoinRequestMessage *req)
+{
+  struct Channel *chn = cls;
+
+  if (NULL == chn
+      || JOIN_NOT_ASKED != chn->join_status)
+  {
+    return GNUNET_SYSERR;
+  }
+
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (req->purpose.size) != (size
+                                    - sizeof (req->header)
+                                    - sizeof (req->reserved)
+                                    - sizeof (req->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
+                                  &req->purpose, &req->signature,
+                                  &req->member_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming join request message from CADET.
+ */
+static void
+handle_cadet_join_request (void *cls,
+                           const struct MulticastJoinRequestMessage *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  struct GNUNET_HashCode group_pub_hash;
+  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
+  chn->group_pub_key = req->group_pub_key;
+  chn->group_pub_hash = group_pub_hash;
+  chn->member_pub_key = req->member_pub_key;
+  chn->peer = req->peer;
+  chn->join_status = JOIN_WAITING;
+
+  client_send_all (&group_pub_hash, &req->header);
+}
+
+
+static int
+check_cadet_join_decision (void *cls,
+                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  uint16_t size = ntohs (hdcsn->header.size);
+  if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
+             sizeof (struct MulticastJoinDecisionMessage))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  switch (chn->join_status)
+  {
+  case JOIN_REFUSED:
+    return GNUNET_SYSERR;
+
+  case JOIN_ADMITTED:
+    return GNUNET_OK;
+
+  case JOIN_NOT_ASKED:
+  case JOIN_WAITING:
+    break;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming join decision message from CADET.
+ */
+static void
+handle_cadet_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  const struct MulticastJoinDecisionMessage *
+    dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
+
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
+  struct Member *mem = (struct Member *) chn->group;
+  client_send_join_decision (mem, hdcsn);
+  if (GNUNET_YES == ntohl (dcsn->is_admitted))
+  {
+    chn->join_status = JOIN_ADMITTED;
+  }
+  else
+  {
+    chn->join_status = JOIN_REFUSED;
+    cadet_channel_destroy (chn);
+  }
+}
+
+
+static int
+check_cadet_message (void *cls,
+                     const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  uint16_t size = ntohs (msg->header.size);
+  if (size < sizeof (*msg))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (msg->purpose.size) != (size
+                                    - sizeof (msg->header)
+                                    - sizeof (msg->hop_counter)
+                                    - sizeof (msg->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
+                                  &msg->purpose, &msg->signature,
+                                  &chn->group_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast message from CADET.
+ */
+static void
+handle_cadet_message (void *cls,
+                      const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+  client_send_all (&chn->group_pub_hash, &msg->header);
+}
+
+
+static int
+check_cadet_request (void *cls,
+                     const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (req->purpose.size) != (size
+                                    - sizeof (req->header)
+                                    - sizeof (req->member_pub_key)
+                                    - sizeof (req->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
+                                  &req->purpose, &req->signature,
+                                  &req->member_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast request message from CADET.
+ */
+static void
+handle_cadet_request (void *cls,
+                      const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+  client_send_origin (&chn->group_pub_hash, &req->header);
+}
+
+
+static int
+check_cadet_replay_request (void *cls,
+                            const struct MulticastReplayRequestMessage *req)
+{
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay request from CADET.
+ */
+static void
+handle_cadet_replay_request (void *cls,
+                             const struct MulticastReplayRequestMessage *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  struct MulticastReplayRequestMessage rep = *req;
+  GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                        &chn->group->pub_key_hash);
+  if (NULL == grp_replay_req)
+  {
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
+                                       &chn->group->pub_key_hash, grp_replay_req,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
+                   rep.flags, &key_hash);
+  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  client_send_random (&chn->group_pub_hash, &rep.header);
+}
+
+
+static int
+check_cadet_replay_response (void *cls,
+                             const struct MulticastReplayResponseMessage *res)
+{
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay response from CADET.
+ */
+static void
+handle_cadet_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  /* @todo FIXME: got replay error response, send request to other members */
+}
+
+
+static void
+group_set_cadet_port_hash (struct Group *grp)
+{
+  struct CadetPort {
+    struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+    uint32_t app_type;
+  } port = {
+    grp->pub_key,
+    GNUNET_APPLICATION_TYPE_MULTICAST,
+  };
+  GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
+}
+
+
+
+/**
+ * Create new outgoing CADET channel.
+ *
+ * @param peer
+ *        Peer to connect to.
+ * @param group_pub_key
+ *        Public key of group the channel belongs to.
+ * @param group_pub_hash
+ *        Hash of @a group_pub_key.
+ *
+ * @return Channel.
+ */
+static struct Channel *
+cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
+{
+  struct Channel *chn = GNUNET_malloc (sizeof (*chn));
+  chn->group = grp;
+  chn->group_pub_key = grp->pub_key;
+  chn->group_pub_hash = grp->pub_key_hash;
+  chn->peer = *peer;
+  chn->direction = DIR_OUTGOING;
+  chn->is_connected = GNUNET_NO;
+  chn->join_status = JOIN_WAITING;
+
+  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+    GNUNET_MQ_hd_var_size (cadet_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           chn),
+
+    GNUNET_MQ_hd_var_size (cadet_join_decision,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                           struct MulticastJoinDecisionMessageHeader,
+                           chn),
+
+    GNUNET_MQ_hd_var_size (cadet_replay_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                           struct MulticastReplayRequestMessage,
+                           chn),
+
+    GNUNET_MQ_hd_var_size (cadet_replay_response,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                           struct MulticastReplayResponseMessage,
+                           chn),
+
+    GNUNET_MQ_handler_end ()
+  };
+
+  chn->channel = GNUNET_CADET_channel_creatE (cadet, chn, &chn->peer,
+                                              &grp->cadet_port_hash,
+                                              GNUNET_CADET_OPTION_RELIABLE,
+                                              cadet_notify_window_change,
+                                              cadet_notify_disconnect,
+                                              cadet_handlers);
+  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  return chn;
+}
+
+
+/**
+ * Destroy outgoing CADET channel.
+ */
+static void
+cadet_channel_destroy (struct Channel *chn)
+{
+  GNUNET_CADET_channel_destroy (chn->channel);
+  GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash);
+  GNUNET_free (chn);
+}
+
+/**
+ * Handle a connecting client starting an origin.
+ */
+static void
+handle_client_origin_start (void *cls,
+                            const struct MulticastOriginStartMessage *msg)
+{
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+  struct GNUNET_HashCode pub_key_hash;
+
+  GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
+  GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
+
+  struct Origin *
+    orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
+  struct Group *grp;
+
+  if (NULL == orig)
+  {
+    orig = GNUNET_new (struct Origin);
+    orig->priv_key = msg->group_key;
+    orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
+
+    grp = c->group = &orig->group;
+    grp->origin = orig;
+    grp->is_origin = GNUNET_YES;
+    grp->pub_key = pub_key;
+    grp->pub_key_hash = pub_key_hash;
+
+    GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+    group_set_cadet_port_hash (grp);
+
+    struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+      GNUNET_MQ_hd_var_size (cadet_message,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                             struct GNUNET_MULTICAST_MessageHeader,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                             struct GNUNET_MULTICAST_RequestHeader,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_join_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                             struct MulticastJoinRequestMessage,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_replay_response,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                             struct MulticastReplayResponseMessage,
+                             grp),
+
+      GNUNET_MQ_handler_end ()
+    };
+
+
+    orig->cadet_port = GNUNET_CADET_open_porT (cadet,
+                                               &grp->cadet_port_hash,
+                                               cadet_notify_connect,
+                                               NULL,
+                                               cadet_notify_window_change,
+                                               cadet_notify_disconnect,
+                                               cadet_handlers);
+  }
+  else
+  {
+    grp = &orig->group;
+  }
+
+  struct ClientList *cl = GNUNET_new (struct ClientList);
+  cl->client = client;
+  GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client connected as origin to group %s.\n",
+              orig, GNUNET_h2s (&grp->pub_key_hash));
+  GNUNET_SERVICE_client_continue (client);
+}
+
+
+static int
+check_client_member_join (void *cls,
+                          const struct MulticastMemberJoinMessage *msg)
+{
+  uint16_t msg_size = ntohs (msg->header.size);
+  struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
+  uint32_t relay_count = ntohl (msg->relay_count);
+  uint16_t relay_size = relay_count * sizeof (*relays);
+  struct GNUNET_MessageHeader *join_msg = NULL;
+  uint16_t join_msg_size = 0;
+  if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
+      <= msg_size)
+  {
+    join_msg = (struct GNUNET_MessageHeader *)
+      (((char *) &msg[1]) + relay_size);
+    join_msg_size = ntohs (join_msg->size);
+  }
+  return
+    msg_size == (sizeof (*msg) + relay_size + join_msg_size)
+    ? GNUNET_OK
+    : GNUNET_SYSERR;
+}
+
+
+/**
+ * Handle a connecting client joining a group.
+ */
+static void
+handle_client_member_join (void *cls,
+                           const struct MulticastMemberJoinMessage *msg)
+{
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+
+  uint16_t msg_size = ntohs (msg->header.size);
+
+  struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
+  struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
+
+  GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
+  GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
+  GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
+  struct Member *mem = NULL;
+  struct Group *grp;
+
+  if (NULL != grp_mem)
+  {
+    mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
+  }
+  if (NULL == mem)
+  {
+    mem = GNUNET_new (struct Member);
+    mem->origin = msg->origin;
+    mem->priv_key = msg->member_key;
+    mem->pub_key = mem_pub_key;
+    mem->pub_key_hash = mem_pub_key_hash;
+    mem->max_fragment_id = 0; // FIXME
+
+    grp = c->group = &mem->group;
+    grp->member = mem;
+    grp->is_origin = GNUNET_NO;
+    grp->pub_key = msg->group_pub_key;
+    grp->pub_key_hash = pub_key_hash;
+    group_set_cadet_port_hash (grp);
+
+    if (NULL == grp_mem)
+    {
+      grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+      GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
+                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    }
+    GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  }
+  else
+  {
+    grp = &mem->group;
+  }
+
+  struct ClientList *cl = GNUNET_new (struct ClientList);
+  cl->client = client;
+  GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client connected to group %s..\n",
+              mem, GNUNET_h2s (&grp->pub_key_hash));
   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p ..as member %s (%s).\n",
               mem, GNUNET_h2s (&mem->pub_key_hash), str);
   GNUNET_free (str);
 
-  GNUNET_SERVER_client_set_user_context (client, grp);
-
   if (NULL != mem->join_dcsn)
   { /* Already got a join decision, send it to client. */
-    GNUNET_SERVER_notification_context_add (nc, client);
-    GNUNET_SERVER_notification_context_unicast (nc, client,
-                                                (struct GNUNET_MessageHeader *)
-                                                mem->join_dcsn,
-                                                GNUNET_NO);
+    struct GNUNET_MQ_Envelope *
+      env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header);
+
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
+                    env);
   }
   else
   { /* First client of the group, send join request. */
@@ -1035,25 +1532,20 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
         (((char *) &msg[1]) + relay_size);
       join_msg_size = ntohs (join_msg->size);
     }
-    if (sizeof (*msg) + relay_size + join_msg_size != msg_size)
-    {
-      GNUNET_break (0);
-      GNUNET_SERVER_client_disconnect (client);
-      return;
-    }
 
+    uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size;
     struct MulticastJoinRequestMessage *
-      req = GNUNET_malloc (sizeof (*req) + join_msg_size);
-    req->header.size = htons (sizeof (*req) + join_msg_size);
+      req = GNUNET_malloc (req_msg_size);
+    req->header.size = htons (req_msg_size);
     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
     req->group_pub_key = grp->pub_key;
     req->peer = this_peer;
     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
     if (0 < join_msg_size)
-      memcpy (&req[1], join_msg, join_msg_size);
+      GNUNET_memcpy (&req[1], join_msg, join_msg_size);
 
     req->member_pub_key = mem->pub_key;
-    req->purpose.size = htonl (msg_size
+    req->purpose.size = htonl (req_msg_size
                                - sizeof (req->header)
                                - sizeof (req->reserved)
                                - sizeof (req->signature));
@@ -1075,7 +1567,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
       cadet_send_join_request (mem);
     }
   }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
@@ -1083,7 +1575,7 @@ static void
 client_send_join_decision (struct Member *mem,
                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
 {
-  client_send_group (&mem->grp, &hdcsn->header);
+  client_send_group (&mem->group, &hdcsn->header);
 
   const struct MulticastJoinDecisionMessage *
     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -1091,7 +1583,7 @@ client_send_join_decision (struct Member *mem,
   { /* Member admitted, store join_decision. */
     uint16_t dcsn_size = ntohs (dcsn->header.size);
     mem->join_dcsn = GNUNET_malloc (dcsn_size);
-    memcpy (mem->join_dcsn, dcsn, dcsn_size);
+    GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
   }
   else
   { /* Refused entry, but replay would be still possible for past members. */
@@ -1099,22 +1591,29 @@ client_send_join_decision (struct Member *mem,
 }
 
 
+static int
+check_client_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  return GNUNET_OK;
+}
+
+
 /**
  * Join decision from client.
  */
 static void
-client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
-                           const struct GNUNET_MessageHeader *m)
+handle_client_join_decision (void *cls,
+                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
 {
-  struct Group *
-    grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
-  const struct MulticastJoinDecisionMessageHeader *
-    hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Group *grp = c->group;
 
   if (NULL == grp)
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1143,7 +1642,15 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
   { /* Look for remote member */
     cadet_send_join_decision (grp, hdcsn);
   }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
+}
+
+
+static int
+check_client_multicast_message (void *cls,
+                                const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  return GNUNET_OK;
 }
 
 
@@ -1151,25 +1658,25 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
  * Incoming message from a client.
  */
 static void
-client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
-                               const struct GNUNET_MessageHeader *m)
+handle_client_multicast_message (void *cls,
+                                 const struct GNUNET_MULTICAST_MessageHeader *msg)
 {
-  struct Group *
-    grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
-  struct GNUNET_MULTICAST_MessageHeader *out;
-  struct Origin *orig;
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Group *grp = c->group;
 
   if (NULL == grp)
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
   GNUNET_assert (GNUNET_YES == grp->is_origin);
-  orig = (struct Origin *) grp;
+  struct Origin *orig = grp->origin;
 
   /* FIXME: yucky, should use separate message structs for P2P and CS! */
-  out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (m);
+  struct GNUNET_MULTICAST_MessageHeader *
+    out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
   out->purpose.size = htonl (ntohs (out->header.size)
                              - sizeof (out->header)
@@ -1184,13 +1691,19 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
   }
 
   client_send_all (&grp->pub_key_hash, &out->header);
-  if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
-  {
-    client_send_ack (&grp->pub_key_hash);
-  }
+  cadet_send_children (&grp->pub_key_hash, &out->header);
+  client_send_ack (&grp->pub_key_hash);
   GNUNET_free (out);
 
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
+}
+
+
+static int
+check_client_multicast_request (void *cls,
+                                const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  return GNUNET_OK;
 }
 
 
@@ -1198,23 +1711,25 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
  * Incoming request from a client.
  */
 static void
-client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
-                               const struct GNUNET_MessageHeader *m)
+handle_client_multicast_request (void *cls,
+                                 const struct GNUNET_MULTICAST_RequestHeader *req)
 {
-  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
-  struct Member *mem;
-  struct GNUNET_MULTICAST_RequestHeader *out;
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Group *grp = c->group;
+
   if (NULL == grp)
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
   GNUNET_assert (GNUNET_NO == grp->is_origin);
-  mem = (struct Member *) grp;
+  struct Member *mem = grp->member;
 
   /* FIXME: yucky, should use separate message structs for P2P and CS! */
-  out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
+  struct GNUNET_MULTICAST_RequestHeader *
+    out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header);
   out->member_pub_key = mem->pub_key;
   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
   out->purpose.size = htonl (ntohs (out->header.size)
@@ -1240,7 +1755,7 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
     else
     {
       /* FIXME: not yet connected to origin */
-      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      GNUNET_SERVICE_client_drop (client);
       GNUNET_free (out);
       return;
     }
@@ -1250,7 +1765,7 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
     client_send_ack (&grp->pub_key_hash);
   }
   GNUNET_free (out);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
@@ -1258,19 +1773,21 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
  * Incoming replay request from a client.
  */
 static void
-client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client,
-                            const struct GNUNET_MessageHeader *m)
+handle_client_replay_request (void *cls,
+                              const struct MulticastReplayRequestMessage *rep)
 {
-  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
-  struct Member *mem;
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Group *grp = c->group;
+
   if (NULL == grp)
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
   GNUNET_assert (GNUNET_NO == grp->is_origin);
-  mem = (struct Member *) grp;
+  struct Member *mem = grp->member;
 
   struct GNUNET_CONTAINER_MultiHashMap *
     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
@@ -1282,545 +1799,279 @@ client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client,
                                        &grp->pub_key_hash, grp_replay_req,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
-  struct MulticastReplayRequestMessage *
-    rep = (struct MulticastReplayRequestMessage *) m;
+
   struct GNUNET_HashCode key_hash;
   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
                    rep->flags, &key_hash);
   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 
-  if (0 == client_send_origin (&grp->pub_key_hash, m))
+  if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
   { /* No local origin, replay from remote members / origin. */
     if (NULL != mem->origin_channel)
     {
-      cadet_send_channel (mem->origin_channel, m);
+      cadet_send_channel (mem->origin_channel, &rep->header);
     }
     else
     {
       /* FIXME: not yet connected to origin */
-      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-      return;
-    }
-  }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-static int
-cadet_send_replay_response_cb (void *cls,
-                               const struct GNUNET_HashCode *key_hash,
-                               void *value)
-{
-  struct Channel *chn = value;
-  struct GNUNET_MessageHeader *msg = cls;
-
-  cadet_send_channel (chn, msg);
-  return GNUNET_OK;
-}
-
-
-static int
-client_send_replay_response_cb (void *cls,
-                                const struct GNUNET_HashCode *key_hash,
-                                void *value)
-{
-  struct GNUNET_SERVER_Client *client = value;
-  struct GNUNET_MessageHeader *msg = cls;
-
-  client_send (client, msg);
-  return GNUNET_OK;
-}
-
-
-/**
- * End of replay response from a client.
- */
-static void
-client_recv_replay_response_end (void *cls, struct GNUNET_SERVER_Client *client,
-                                 const struct GNUNET_MessageHeader *m)
-{
-  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
-  if (NULL == grp)
-  {
-    GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
-  }
-
-  struct MulticastReplayResponseMessage *
-    res = (struct MulticastReplayResponseMessage *) m;
-
-  struct GNUNET_HashCode key_hash;
-  replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
-                   res->flags, &key_hash);
-
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
-                                                                &grp->pub_key_hash);
-  if (NULL != grp_replay_req_cadet)
-  {
-    GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
-  }
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
-                                                               &grp->pub_key_hash);
-  if (NULL != grp_replay_req_client)
-  {
-    GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
-  }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Incoming replay response from a client.
- *
- * Respond with a multicast message on success, or otherwise with an error code.
- */
-static void
-client_recv_replay_response (void *cls, struct GNUNET_SERVER_Client *client,
-                             const struct GNUNET_MessageHeader *m)
-{
-  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
-  if (NULL == grp)
-  {
-    GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
-  }
-
-  struct MulticastReplayResponseMessage *
-    res = (struct MulticastReplayResponseMessage *) m;
-
-  const struct GNUNET_MessageHeader *msg = m;
-  if (GNUNET_MULTICAST_REC_OK == res->error_code)
-  {
-    msg = (struct GNUNET_MessageHeader *) &res[1];
-  }
-
-  struct GNUNET_HashCode key_hash;
-  replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
-                   res->flags, &key_hash);
-
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
-                                                              &grp->pub_key_hash);
-  if (NULL != grp_replay_req_cadet)
-  {
-    GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
-                                                cadet_send_replay_response_cb,
-                                                (void *) msg);
-  }
-  if (GNUNET_MULTICAST_REC_OK == res->error_code)
-  {
-    struct GNUNET_CONTAINER_MultiHashMap *
-      grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
-                                                                 &grp->pub_key_hash);
-    if (NULL != grp_replay_req_client)
-    {
-      GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
-                                                  client_send_replay_response_cb,
-                                                  (void *) msg);
-    }
-  }
-  else
-  {
-    client_recv_replay_response_end (cls, client, m);
-    return;
-  }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * A new client connected.
- */
-static void
-client_notify_connect (void *cls, struct GNUNET_SERVER_Client *client)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
-  /* FIXME: send connect ACK */
-}
-
-
-/**
- * Message handlers for the server.
- */
-static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-  { client_recv_origin_start, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
-
-  { client_recv_member_join, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
-
-  { client_recv_join_decision, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
-
-  { client_recv_multicast_message, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
-
-  { client_recv_multicast_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
-
-  { client_recv_replay_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
-
-  { client_recv_replay_response, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
-
-  { client_recv_replay_response_end, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, 0 },
-
-  { NULL, NULL, 0, 0 }
-};
-
-
-/**
- * New incoming CADET channel.
- */
-static void *
-cadet_notify_channel_new (void *cls,
-                          struct GNUNET_CADET_Channel *channel,
-                          const struct GNUNET_PeerIdentity *initiator,
-                          uint32_t port,
-                          enum GNUNET_CADET_ChannelOption options)
-{
-  return NULL;
-}
-
-
-/**
- * CADET channel is being destroyed.
- */
-static void
-cadet_notify_channel_end (void *cls,
-                          const struct GNUNET_CADET_Channel *channel,
-                          void *ctx)
-{
-  if (NULL == ctx)
-    return;
-
-  struct Channel *chn = ctx;
-  if (NULL != chn->grp)
-  {
-    if (GNUNET_NO == chn->grp->is_origin)
-    {
-      struct Member *mem = (struct Member *) chn->grp;
-      if (chn == mem->origin_channel)
-        mem->origin_channel = NULL;
-    }
-  }
-
-  while (GNUNET_YES == replay_req_remove_cadet (chn));
-
-  GNUNET_free (chn);
-}
-
-
-/**
- * Incoming join request message from CADET.
- */
-int
-cadet_recv_join_request (void *cls,
-                         struct GNUNET_CADET_Channel *channel,
-                         void **ctx,
-                         const struct GNUNET_MessageHeader *m)
-{
-  const struct MulticastJoinRequestMessage *
-    req = (const struct MulticastJoinRequestMessage *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*req))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (NULL != *ctx)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (req->purpose.size) != (size
-                                    - sizeof (req->header)
-                                    - sizeof (req->reserved)
-                                    - sizeof (req->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
-                                  &req->purpose, &req->signature,
-                                  &req->member_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  struct GNUNET_HashCode group_pub_hash;
-  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
-
-  struct Channel *chn = GNUNET_malloc (sizeof *chn);
-  chn->channel = channel;
-  chn->group_pub_key = req->group_pub_key;
-  chn->group_pub_hash = group_pub_hash;
-  chn->member_pub_key = req->member_pub_key;
-  chn->peer = req->peer;
-  chn->join_status = JOIN_WAITING;
-  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  client_send_all (&group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming join decision message from CADET.
- */
-int
-cadet_recv_join_decision (void *cls,
-                          struct GNUNET_CADET_Channel *channel,
-                          void **ctx,
-                          const struct GNUNET_MessageHeader *m)
-{
-  const struct MulticastJoinDecisionMessage *
-    dcsn = (const struct MulticastJoinDecisionMessage *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*dcsn))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (NULL == chn->grp || GNUNET_NO != chn->grp->is_origin)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+      GNUNET_SERVICE_client_drop (client);
+      return;
+    }
   }
-  switch (chn->join_status)
-  {
-  case JOIN_REFUSED:
-    return GNUNET_SYSERR;
+  GNUNET_SERVICE_client_continue (client);
+}
 
-  case JOIN_ADMITTED:
-    return GNUNET_OK;
 
-  case JOIN_NOT_ASKED:
-  case JOIN_WAITING:
-    break;
-  }
+static int
+cadet_send_replay_response_cb (void *cls,
+                               const struct GNUNET_HashCode *key_hash,
+                               void *value)
+{
+  struct Channel *chn = value;
+  struct GNUNET_MessageHeader *msg = cls;
 
-  struct MulticastJoinDecisionMessageHeader *
-    hdcsn = GNUNET_malloc (sizeof (*hdcsn) + size);
-  hdcsn->peer = chn->peer;
-  memcpy (&hdcsn[1], dcsn, sizeof (*hdcsn) + size);
+  cadet_send_channel (chn, msg);
+  return GNUNET_OK;
+}
 
-  struct Member *mem = (struct Member *) chn->grp;
-  client_send_join_decision (mem, hdcsn);
-  GNUNET_free (hdcsn);
-  if (GNUNET_YES == ntohs (dcsn->is_admitted))
-  {
-    chn->join_status = JOIN_ADMITTED;
-    return GNUNET_OK;
-  }
-  else
-  {
-    chn->join_status = JOIN_REFUSED;
-    return GNUNET_SYSERR;
-  }
+
+static int
+client_send_replay_response_cb (void *cls,
+                                const struct GNUNET_HashCode *key_hash,
+                                void *value)
+{
+  struct GNUNET_SERVICE_Client *client = value;
+  struct GNUNET_MessageHeader *msg = cls;
+
+  client_send (client, msg);
+  return GNUNET_OK;
 }
 
-/**
- * Incoming multicast message from CADET.
- */
-int
-cadet_recv_message (void *cls,
-                    struct GNUNET_CADET_Channel *channel,
-                    void **ctx,
-                    const struct GNUNET_MessageHeader *m)
-{
-  const struct GNUNET_MULTICAST_MessageHeader *
-    msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*msg))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (msg->purpose.size) != (size
-                                    - sizeof (msg->header)
-                                    - sizeof (msg->hop_counter)
-                                    - sizeof (msg->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
-                                  &msg->purpose, &msg->signature,
-                                  &chn->group_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
 
-  client_send_all (&chn->group_pub_hash, m);
+static int
+check_client_replay_response_end (void *cls,
+                                  const struct MulticastReplayResponseMessage *res)
+{
   return GNUNET_OK;
 }
 
 
 /**
- * Incoming multicast request message from CADET.
+ * End of replay response from a client.
  */
-int
-cadet_recv_request (void *cls,
-                    struct GNUNET_CADET_Channel *channel,
-                    void **ctx,
-                    const struct GNUNET_MessageHeader *m)
-{
-  const struct GNUNET_MULTICAST_RequestHeader *
-    req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*req))
+static void
+handle_client_replay_response_end (void *cls,
+                                   const struct MulticastReplayResponseMessage *res)
+{
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Group *grp = c->group;
+
+  if (NULL == grp)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
   }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
+
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
+                   res->flags, &key_hash);
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                              &grp->pub_key_hash);
+  if (NULL != grp_replay_req_cadet)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
   }
-  if (ntohl (req->purpose.size) != (size
-                                    - sizeof (req->header)
-                                    - sizeof (req->member_pub_key)
-                                    - sizeof (req->signature)))
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+                                                               &grp->pub_key_hash);
+  if (NULL != grp_replay_req_client)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
   }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
-                                  &req->purpose, &req->signature,
-                                  &req->member_pub_key))
+  GNUNET_SERVICE_client_continue (client);
+}
+
+
+static int
+check_client_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
+{
+  const struct GNUNET_MessageHeader *msg = &res->header;
+  if (GNUNET_MULTICAST_REC_OK == res->error_code)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    msg = GNUNET_MQ_extract_nested_mh (res);
+    if (NULL == msg)
+    {
+      return GNUNET_SYSERR;
+    }
   }
-
-  client_send_origin (&chn->group_pub_hash, m);
   return GNUNET_OK;
 }
 
 
 /**
- * Incoming multicast replay request from CADET.
+ * Incoming replay response from a client.
+ *
+ * Respond with a multicast message on success, or otherwise with an error code.
  */
-int
-cadet_recv_replay_request (void *cls,
-                           struct GNUNET_CADET_Channel *channel,
-                           void **ctx,
-                           const struct GNUNET_MessageHeader *m)
+static void
+handle_client_replay_response (void *cls,
+                               const struct MulticastReplayResponseMessage *res)
 {
-  struct MulticastReplayRequestMessage rep;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (rep))
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Group *grp = c->group;
+
+  if (NULL == grp)
   {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
   }
-  struct Channel *chn = *ctx;
-
-  memcpy (&rep, m, sizeof (rep));
-  memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
 
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
-                                                        &chn->grp->pub_key_hash);
-  if (NULL == grp_replay_req)
+  const struct GNUNET_MessageHeader *msg = &res->header;
+  if (GNUNET_MULTICAST_REC_OK == res->error_code)
   {
-    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
-                                       &chn->grp->pub_key_hash, grp_replay_req,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    msg = GNUNET_MQ_extract_nested_mh (res);
   }
+
   struct GNUNET_HashCode key_hash;
-  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
-                   rep.flags, &key_hash);
-  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
+                   res->flags, &key_hash);
 
-  client_send_random (&chn->group_pub_hash, &rep.header);
-  return GNUNET_OK;
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                              &grp->pub_key_hash);
+  if (NULL != grp_replay_req_cadet)
+  {
+    GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
+                                                cadet_send_replay_response_cb,
+                                                (void *) msg);
+  }
+  if (GNUNET_MULTICAST_REC_OK == res->error_code)
+  {
+    struct GNUNET_CONTAINER_MultiHashMap *
+      grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+                                                                 &grp->pub_key_hash);
+    if (NULL != grp_replay_req_client)
+    {
+      GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
+                                                  client_send_replay_response_cb,
+                                                  (void *) msg);
+    }
+  }
+  else
+  {
+    handle_client_replay_response_end (c, res);
+    return;
+  }
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
 /**
- * Incoming multicast replay response from CADET.
+ * A new client connected.
+ *
+ * @param cls NULL
+ * @param client client to add
+ * @param mq message queue for @a client
+ * @return @a client
  */
-int
-cadet_recv_replay_response (void *cls,
-                            struct GNUNET_CADET_Channel *channel,
-                            void **ctx,
-                            const struct GNUNET_MessageHeader *m)
+static void *
+client_notify_connect (void *cls,
+                       struct GNUNET_SERVICE_Client *client,
+                       struct GNUNET_MQ_Handle *mq)
 {
-  struct Channel *chn = *ctx;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
+  /* FIXME: send connect ACK */
 
-  /* @todo FIXME: got replay error response, send request to other members */
+  struct Client *c = GNUNET_new (struct Client);
+  c->client = client;
 
-  return GNUNET_OK;
+  return c;
 }
 
 
 /**
- * Message handlers for CADET.
+ * Called whenever a client is disconnected.
+ * Frees our resources associated with that client.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param app_ctx must match @a client
  */
-static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
-  { cadet_recv_join_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
-
-  { cadet_recv_message,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
-
-  { cadet_recv_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+static void
+client_notify_disconnect (void *cls,
+                          struct GNUNET_SERVICE_Client *client,
+                          void *app_ctx)
+{
+  struct Client *c = app_ctx;
+  struct Group *grp = c->group;
+  GNUNET_free (c);
 
-  { cadet_recv_replay_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
+  if (NULL == grp)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p User context is NULL in client_disconnect()\n", grp);
+    GNUNET_break (0);
+    return;
+  }
 
-  { cadet_recv_replay_response,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client (%s) disconnected from group %s\n",
+              grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
+              GNUNET_h2s (&grp->pub_key_hash));
 
-  { NULL, 0, 0 }
-};
+  struct ClientList *cl = grp->clients_head;
+  while (NULL != cl)
+  {
+    if (cl->client == client)
+    {
+      GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
+      GNUNET_free (cl);
+      break;
+    }
+    cl = cl->next;
+  }
 
+  while (GNUNET_YES == replay_req_remove_client (grp, client));
 
-/**
- * Listening ports for CADET.
- */
-static const uint32_t cadet_ports[] = { GNUNET_APPLICATION_TYPE_MULTICAST, 0 };
+  if (NULL == grp->clients_head)
+  { /* Last client disconnected. */
+#if FIXME
+    if (NULL != grp->tmit_head)
+    { /* Send pending messages via CADET before cleanup. */
+      transmit_message (grp);
+    }
+    else
+#endif
+    {
+      cleanup_group (grp);
+    }
+  }
+}
 
 
 /**
- * Connected to core service.
+ * Service started.
+ *
+ * @param cls closure
+ * @param server the initialized server
+ * @param cfg configuration to use
  */
 static void
-core_connected_cb  (void *cls, const struct GNUNET_PeerIdentity *my_identity)
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *c,
+     struct GNUNET_SERVICE_Handle *svc)
 {
-  this_peer = *my_identity;
+  cfg = c;
+  service = svc;
+  GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
 
   stats = GNUNET_STATISTICS_create ("multicast", cfg);
   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -1831,52 +2082,56 @@ core_connected_cb  (void *cls, const struct GNUNET_PeerIdentity *my_identity)
   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 
-  cadet = GNUNET_CADET_connect (cfg, NULL,
-                                &cadet_notify_channel_new,
-                                &cadet_notify_channel_end,
-                                cadet_handlers, cadet_ports);
-
-  nc = GNUNET_SERVER_notification_context_create (server, 1);
-  GNUNET_SERVER_add_handlers (server, server_handlers);
-  GNUNET_SERVER_disconnect_notify (server, &client_notify_disconnect, NULL);
-
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
-                                NULL);
-}
+  cadet = GNUNET_CADET_connecT (cfg);
 
+  GNUNET_assert (NULL != cadet);
 
-/**
- * Service started.
- *
- * @param cls closure
- * @param server the initialized server
- * @param cfg configuration to use
- */
-static void
-run (void *cls, struct GNUNET_SERVER_Handle *srv,
-     const struct GNUNET_CONFIGURATION_Handle *c)
-{
-  cfg = c;
-  server = srv;
-  GNUNET_SERVER_connect_notify (server, &client_notify_connect, NULL);
-  core = GNUNET_CORE_connect (cfg, NULL, &core_connected_cb, NULL, NULL,
-                              NULL, GNUNET_NO, NULL, GNUNET_NO, NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
 }
 
 
 /**
- * The main function for the multicast service.
- *
- * @param argc number of arguments from the command line
- * @param argv command line arguments
- * @return 0 ok, 1 on error
- */
-int
-main (int argc, char *const *argv)
-{
-  return (GNUNET_OK ==
-          GNUNET_SERVICE_run (argc, argv, "multicast",
-                              GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
-}
+ * Define "main" method using service macro.
+ */
+GNUNET_SERVICE_MAIN
+("multicast",
+ GNUNET_SERVICE_OPTION_NONE,
+ run,
+ client_notify_connect,
+ client_notify_disconnect,
+ NULL,
+ GNUNET_MQ_hd_fixed_size (client_origin_start,
+                          GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
+                          struct MulticastOriginStartMessage,
+                          NULL),
+ GNUNET_MQ_hd_var_size (client_member_join,
+                        GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN,
+                        struct MulticastMemberJoinMessage,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_join_decision,
+                        GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                        struct MulticastJoinDecisionMessageHeader,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_multicast_message,
+                        GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                        struct GNUNET_MULTICAST_MessageHeader,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_multicast_request,
+                        GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                        struct GNUNET_MULTICAST_RequestHeader,
+                        NULL),
+ GNUNET_MQ_hd_fixed_size (client_replay_request,
+                          GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                          struct MulticastReplayRequestMessage,
+                          NULL),
+ GNUNET_MQ_hd_var_size (client_replay_response,
+                        GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                        struct MulticastReplayResponseMessage,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_replay_response_end,
+                        GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END,
+                        struct MulticastReplayResponseMessage,
+                        NULL));
 
 /* end of gnunet-service-multicast.c */