remove legacy core api code (now dead)
[oweals/gnunet.git] / src / multicast / multicast_api.c
index 25af614afc94b959036ed16a4382262ab94811fd..7cfe708359db25b447ffc246f3ffcb21ed38c69f 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012, 2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
  * @file multicast/multicast_api.c
- * @brief multicast service; establish tunnels to distant peers
+ * @brief Multicast service; implements multicast groups using CADET connections.
  * @author Christian Grothoff
  * @author Gabor X Toth
  */
+
 #include "platform.h"
 #include "gnunet_util_lib.h"
-#include "gnunet_signatures.h"
 #include "gnunet_multicast_service.h"
 #include "multicast.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
 
 
-/**
- * Started origins.
- * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
- */
-static struct GNUNET_CONTAINER_MultiHashMap *origins;
-
-/**
- * Joined members.
- * group_key_hash -> struct GNUNET_MULTICAST_Member
- */
-static struct GNUNET_CONTAINER_MultiHashMap *members;
-
-
 /**
  * Handle for a request to send a message to all multicast group members
  * (from the origin).
  */
-struct GNUNET_MULTICAST_OriginMessageHandle
+struct GNUNET_MULTICAST_OriginTransmitHandle
 {
   GNUNET_MULTICAST_OriginTransmitNotify notify;
   void *notify_cls;
@@ -62,47 +49,95 @@ struct GNUNET_MULTICAST_OriginMessageHandle
 };
 
 
-struct GNUNET_MULTICAST_Group
+/**
+ * Handle for a message to be delivered from a member to the origin.
+ */
+struct GNUNET_MULTICAST_MemberTransmitHandle
 {
-  uint8_t is_origin;
+  GNUNET_MULTICAST_MemberTransmitNotify notify;
+  void *notify_cls;
+  struct GNUNET_MULTICAST_Member *member;
+
+  uint64_t request_id;
+  uint64_t fragment_offset;
 };
 
-/**
- * Handle for the origin of a multicast group.
- */
-struct GNUNET_MULTICAST_Origin
+
+struct GNUNET_MULTICAST_Group
 {
-  struct GNUNET_MULTICAST_Group grp;
+  /**
+   * Configuration to use.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
-  struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
-  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+  /**
+   * Client connection to the service.
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Message to send on connect.
+   */
+  struct GNUNET_MQ_Envelope *connect_env;
+
+  /**
+   * Time to wait until we try to reconnect on failure.
+   */
+  struct GNUNET_TIME_Relative reconnect_delay;
 
-  GNUNET_MULTICAST_JoinCallback join_cb;
-  GNUNET_MULTICAST_MembershipTestCallback mem_test_cb;
+  /**
+   * Task for reconnecting when the listener fails.
+   */
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+  GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
-  GNUNET_MULTICAST_RequestCallback request_cb;
   GNUNET_MULTICAST_MessageCallback message_cb;
-  void *cls;
+  void *cb_cls;
 
-  uint64_t next_fragment_id;
+  /**
+   * Function called after disconnected from the service.
+   */
+  GNUNET_ContinuationCallback disconnect_cb;
+
+  /**
+   * Closure for @a disconnect_cb.
+   */
+  void *disconnect_cls;
+
+  /**
+   * Are we currently transmitting a message?
+   */
+  uint8_t in_transmit;
+
+  /**
+   * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+   */
+  uint8_t acks_pending;
+
+  /**
+   * Is this the origin or a member?
+   */
+  uint8_t is_origin;
 
-  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-  struct GNUNET_HashCode pub_key_hash;
+  /**
+   * Is this channel in the process of disconnecting from the service?
+   * #GNUNET_YES or #GNUNET_NO
+   */
+  uint8_t is_disconnecting;
 };
 
 
 /**
- * Handle for a message to be delivered from a member to the origin.
+ * Handle for the origin of a multicast group.
  */
-struct GNUNET_MULTICAST_MemberRequestHandle
+struct GNUNET_MULTICAST_Origin
 {
-  GNUNET_MULTICAST_MemberTransmitNotify notify;
-  void *notify_cls;
-  struct GNUNET_MULTICAST_Member *member;
+  struct GNUNET_MULTICAST_Group grp;
+  struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
 
-  uint64_t request_id;
-  uint64_t fragment_offset;
+  GNUNET_MULTICAST_RequestCallback request_cb;
 };
 
 
@@ -112,43 +147,38 @@ struct GNUNET_MULTICAST_MemberRequestHandle
 struct GNUNET_MULTICAST_Member
 {
   struct GNUNET_MULTICAST_Group grp;
+  struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
 
-  struct GNUNET_MULTICAST_MemberRequestHandle req_handle;
+  GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
 
-  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
-  struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
-  struct GNUNET_PeerIdentity origin;
-  struct GNUNET_PeerIdentity relays;
-  uint32_t relay_count;
-  struct GNUNET_MessageHeader *join_request;
-  GNUNET_MULTICAST_JoinCallback join_cb;
-  GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
-  GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
-  GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
-  GNUNET_MULTICAST_MessageCallback message_cb;
-  void *cls;
+  /**
+   * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
 
   uint64_t next_fragment_id;
-  struct GNUNET_HashCode group_key_hash;
 };
 
 
 /**
  * Handle that identifies a join request.
  *
- * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
+ * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
  * corresponding calls to #GNUNET_MULTICAST_join_decision().
  */
 struct GNUNET_MULTICAST_JoinHandle
 {
-};
+  struct GNUNET_MULTICAST_Group *group;
 
+  /**
+   * Public key of the member requesting join.
+   */
+  struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
 
-/**
- * Handle to pass back for the answer of a membership test.
- */
-struct GNUNET_MULTICAST_MembershipTestHandle
-{
+  /**
+   * Peer identity of the member requesting join.
+   */
+  struct GNUNET_PeerIdentity peer;
 };
 
 
@@ -157,6 +187,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle
  */
 struct GNUNET_MULTICAST_ReplayHandle
 {
+  struct GNUNET_MULTICAST_Group *grp;
+  struct MulticastReplayRequestMessage req;
 };
 
 
@@ -168,104 +200,373 @@ struct GNUNET_MULTICAST_MemberReplayHandle
 };
 
 
+static void
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
+
+
 /**
- * Iterator callback for calling message callbacks for all groups.
+ * Check join request message.
  */
 static int
-message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
-                   void *group)
+check_group_join_request (void *cls,
+                          const struct MulticastJoinRequestMessage *jreq)
 {
-  const struct GNUNET_MessageHeader *msg = cls;
-  struct GNUNET_MULTICAST_Group *grp = group;
+  uint16_t size = ntohs (jreq->header.size);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Calling message callback for a message of type %u and size %u.\n",
-              ntohs (msg->type), ntohs (msg->size));
+  if (sizeof (*jreq) == size)
+    return GNUNET_OK;
 
-  if (GNUNET_YES == grp->is_origin)
+  if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
+    return GNUNET_OK;
+
+  return GNUNET_SYSERR;
+}
+
+
+/**
+ * Receive join request from service.
+ */
+static void
+handle_group_join_request (void *cls,
+                           const struct MulticastJoinRequestMessage *jreq)
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+  struct GNUNET_MULTICAST_JoinHandle *jh;
+  const struct GNUNET_MessageHeader *jmsg = NULL;
+
+  if (NULL == grp)
   {
-    struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
-    orig->message_cb (orig->cls, msg);
+    GNUNET_break (0);
+    return;
   }
-  else
+  if (NULL == grp->join_req_cb)
+    return;
+
+  if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
+    jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
+
+  jh = GNUNET_malloc (sizeof (*jh));
+  jh->group = grp;
+  jh->member_pub_key = jreq->member_pub_key;
+  jh->peer = jreq->peer;
+  grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Check multicast message.
+ */
+static int
+check_group_message (void *cls,
+                     const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  return GNUNET_OK;
+}
+
+
+/**
+ * Receive multicast message from service.
+ */
+static void
+handle_group_message (void *cls,
+                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+
+  if (GNUNET_YES == grp->is_disconnecting)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Calling message callback with a message of size %u.\n",
+              ntohs (mmsg->header.size));
+
+  if (NULL != grp->message_cb)
+    grp->message_cb (grp->cb_cls, mmsg);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Receive message/request fragment acknowledgement from service.
+ */
+static void
+handle_group_fragment_ack (void *cls,
+                           const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+       grp, grp->in_transmit, grp->acks_pending);
+
+  if (0 == grp->acks_pending)
   {
-    struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
-    mem->message_cb (mem->cls, msg);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p Ignoring extraneous fragment ACK.\n", grp);
+    return;
   }
+  grp->acks_pending--;
+
+  if (GNUNET_YES != grp->in_transmit)
+    return;
 
-  return GNUNET_YES;
+  if (GNUNET_YES == grp->is_origin)
+    origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+  else
+    member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
 /**
- * Handle a multicast message from the service.
- *
- * Call message callbacks of all origins and members of the destination group.
- *
- * @param grp Destination group of the message.
- * @param msg The message.
+ * Check unicast request.
+ */
+static int
+check_origin_request (void *cls,
+                      const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  return GNUNET_OK;
+}
+
+
+/**
+ * Origin receives unicast request from a member.
  */
 static void
-handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
-                          const struct GNUNET_MULTICAST_MessageHeader *msg)
+handle_origin_request (void *cls,
+                       const struct GNUNET_MULTICAST_RequestHeader *req)
 {
-  struct GNUNET_HashCode *hash;
+  struct GNUNET_MULTICAST_Group *grp;
+  struct GNUNET_MULTICAST_Origin *orig = cls;
+  grp = &orig->grp;
 
-  if (GNUNET_YES == grp->is_origin)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Calling request callback with a request of size %u.\n",
+              ntohs (req->header.size));
+
+  if (NULL != orig->request_cb)
+    orig->request_cb (grp->cb_cls, req);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Receive multicast replay request from service.
+ */
+static void
+handle_group_replay_request (void *cls,
+                             const struct MulticastReplayRequestMessage *rep)
+
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+
+  if (GNUNET_YES == grp->is_disconnecting)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
+
+  if (0 != rep->fragment_id)
   {
-    struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
-    hash = &orig->pub_key_hash;
+    if (NULL != grp->replay_frag_cb)
+    {
+      struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+      rh->grp = grp;
+      rh->req = *rep;
+      grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
+                           GNUNET_ntohll (rep->fragment_id),
+                           GNUNET_ntohll (rep->flags), rh);
+    }
   }
-  else
+  else if (0 != rep->message_id)
   {
-    struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
-    hash = &mem->group_key_hash;
+    if (NULL != grp->replay_msg_cb)
+    {
+      struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+      rh->grp = grp;
+      rh->req = *rep;
+      grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
+                          GNUNET_ntohll (rep->message_id),
+                          GNUNET_ntohll (rep->fragment_offset),
+                          GNUNET_ntohll (rep->flags), rh);
+    }
   }
 
-  if (origins != NULL)
-    GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, message_callback,
-                                                (void *) msg);
-  if (members != NULL)
-    GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, message_callback,
-                                                (void *) msg);
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
 /**
- * Iterator callback for calling request callbacks of origins.
+ * Check replay response.
  */
 static int
-request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
-                  void *origin)
+check_member_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
 {
-  const struct GNUNET_MULTICAST_RequestHeader *req = cls;
-  struct GNUNET_MULTICAST_Origin *orig = origin;
+  uint16_t size = ntohs (res->header.size);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Calling request callback for a request of type %u and size %u.\n",
-              ntohs (req->header.type), ntohs (req->header.size));
+  if (sizeof (*res) == size)
+    return GNUNET_OK;
+
+  if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
+    return GNUNET_OK;
 
-  orig->request_cb (orig->cls, &req->member_key,
-                    (const struct GNUNET_MessageHeader *) req, 0);
-  return GNUNET_YES;
+  return GNUNET_SYSERR;
 }
 
 
 /**
- * Handle a multicast request from the service.
- *
- * Call request callbacks of all origins of the destination group.
- *
- * @param grp Destination group of the message.
- * @param msg The message.
+ * Receive replay response from service.
+ */
+static void
+handle_member_replay_response (void *cls,
+                               const struct MulticastReplayResponseMessage *res)
+{
+  struct GNUNET_MULTICAST_Group *grp;
+  struct GNUNET_MULTICAST_Member *mem = cls;
+  grp = &mem->grp;
+
+  if (GNUNET_YES == grp->is_disconnecting)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
+
+  // FIXME: return result
+}
+
+
+/**
+ * Check join decision.
+ */
+static int
+check_member_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  return GNUNET_OK; // checked in handle below
+}
+
+
+/**
+ * Member receives join decision.
  */
 static void
-handle_multicast_request (const struct GNUNET_HashCode *group_key_hash,
-                          const struct GNUNET_MULTICAST_RequestHeader *req)
+handle_member_join_decision (void *cls,
+                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  struct GNUNET_MULTICAST_Group *grp;
+  struct GNUNET_MULTICAST_Member *mem = cls;
+  grp = &mem->grp;
+
+  const struct MulticastJoinDecisionMessage *
+    dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
+
+  uint16_t dcsn_size = ntohs (dcsn->header.size);
+  int is_admitted = ntohl (dcsn->is_admitted);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "%p Member got join decision from multicast: %d\n",
+       mem, is_admitted);
+
+  const struct GNUNET_MessageHeader *join_resp = NULL;
+  uint16_t join_resp_size = 0;
+
+  uint16_t relay_count = ntohl (dcsn->relay_count);
+  const struct GNUNET_PeerIdentity *relays = NULL;
+  uint16_t relay_size = relay_count * sizeof (*relays);
+  if (0 < relay_count)
+  {
+    if (dcsn_size < sizeof (*dcsn) + relay_size)
+    {
+      GNUNET_break_op (0);
+      is_admitted = GNUNET_SYSERR;
+    }
+    else
+    {
+      relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
+    }
+  }
+
+  if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
+  {
+    join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
+    join_resp_size = ntohs (join_resp->size);
+  }
+  if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
+         dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
+    GNUNET_break_op (0);
+    is_admitted = GNUNET_SYSERR;
+  }
+
+  if (NULL != mem->join_dcsn_cb)
+    mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
+                       relay_count, relays, join_resp);
+
+  // FIXME:
+  //if (GNUNET_YES != is_admitted)
+  //  GNUNET_MULTICAST_member_part (mem);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+static void
+group_cleanup (struct GNUNET_MULTICAST_Group *grp)
 {
-  if (NULL != origins)
-    GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash,
-                                                request_callback, (void *) req);
+  if (NULL != grp->connect_env)
+  {
+    GNUNET_MQ_discard (grp->connect_env);
+    grp->connect_env = NULL;
+  }
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
+  if (NULL != grp->disconnect_cb)
+  {
+    grp->disconnect_cb (grp->disconnect_cls);
+    grp->disconnect_cb = NULL;
+  }
+  GNUNET_free (grp);
+}
+
+
+static void
+group_disconnect (struct GNUNET_MULTICAST_Group *grp,
+                  GNUNET_ContinuationCallback cb,
+                  void *cls)
+{
+  grp->is_disconnecting = GNUNET_YES;
+  grp->disconnect_cb = cb;
+  grp->disconnect_cls = cls;
+
+  if (NULL != grp->mq)
+  {
+    struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
+    if (NULL != last)
+    {
+      GNUNET_MQ_notify_sent (last,
+                             (GNUNET_MQ_NotifyCallback) group_cleanup, grp);
+    }
+    else
+    {
+      group_cleanup (grp);
+    }
+  }
+  else
+  {
+    group_cleanup (grp);
+  }
 }
 
 
@@ -273,61 +574,102 @@ handle_multicast_request (const struct GNUNET_HashCode *group_key_hash,
  * Function to call with the decision made for a join request.
  *
  * Must be called once and only once in response to an invocation of the
- * #GNUNET_MULTICAST_JoinCallback.
+ * #GNUNET_MULTICAST_JoinRequestCallback.
  *
- * @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- *        #GNUNET_NO if it is disapproved
- * @param relay_count Number of relays given.
- * @param relays Array of suggested peers that might be useful relays to use
+ * @param join
+ *        Join request handle.
+ * @param is_admitted
+ *        #GNUNET_YES    if the join is approved,
+ *        #GNUNET_NO     if it is disapproved,
+ *        #GNUNET_SYSERR if we cannot answer the request.
+ * @param relay_count
+ *        Number of relays given.
+ * @param relays
+ *        Array of suggested peers that might be useful relays to use
  *        when joining the multicast group (essentially a list of peers that
  *        are already part of the multicast group and might thus be willing
  *        to help with routing).  If empty, only this local peer (which must
  *        be the multicast origin) is a good candidate for building the
  *        multicast tree.  Note that it is unnecessary to specify our own
  *        peer identity in this array.
- * @param join_response Message to send in response to the joining peer;
+ * @param join_resp
+ *        Message to send in response to the joining peer;
  *        can also be used to redirect the peer to a different group at the
  *        application layer; this response is to be transmitted to the
  *        peer that issued the request even if admission is denied.
  */
 struct GNUNET_MULTICAST_ReplayHandle *
-GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
+GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
                                 int is_admitted,
-                                unsigned int relay_count,
+                                uint16_t relay_count,
                                 const struct GNUNET_PeerIdentity *relays,
-                                const struct GNUNET_MessageHeader *join_response)
+                                const struct GNUNET_MessageHeader *join_resp)
 {
+  struct GNUNET_MULTICAST_Group *grp = join->group;
+  uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
+  uint16_t relay_size = relay_count * sizeof (*relays);
+
+  struct MulticastJoinDecisionMessageHeader *hdcsn;
+  struct MulticastJoinDecisionMessage *dcsn;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
+                               GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+  hdcsn->member_pub_key = join->member_pub_key;
+  hdcsn->peer = join->peer;
+
+  dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
+  dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+  dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
+  dcsn->is_admitted = htonl (is_admitted);
+  dcsn->relay_count = htonl (relay_count);
+  if (0 < relay_size)
+    GNUNET_memcpy (&dcsn[1], relays, relay_size);
+  if (0 < join_resp_size)
+    GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
+
+  GNUNET_MQ_send (grp->mq, env);
+  GNUNET_free (join);
   return NULL;
 }
 
 
-/**
- * Call informing multicast about the decision taken for a membership test.
- *
- * @param mth Handle that was given for the query.
- * @param result #GNUNET_YES if peer was a member, #GNUNET_NO if peer was not a member,
- *        #GNUNET_SYSERR if we cannot answer the membership test.
- */
-void
-GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestHandle *mth,
-                                         int result)
-{
-}
-
-
 /**
  * Replay a message fragment for the multicast group.
  *
- * @param rh Replay handle identifying which replay operation was requested.
- * @param msg Replayed message fragment, NULL if unknown/error.
- * @param ec Error code.
+ * @param rh
+ *        Replay handle identifying which replay operation was requested.
+ * @param msg
+ *        Replayed message fragment, NULL if not found / an error occurred.
+ * @param ec
+ *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
+ *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
  */
 void
 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
                                   const struct GNUNET_MessageHeader *msg,
                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
 {
+  uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
+  struct MulticastReplayResponseMessage *res;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (res, msg_size,
+                               GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
+  res->fragment_id = rh->req.fragment_id;
+  res->message_id = rh->req.message_id;
+  res->fragment_offset = rh->req.fragment_offset;
+  res->flags = rh->req.flags;
+  res->error_code = htonl (ec);
+
+  if (GNUNET_MULTICAST_REC_OK == ec)
+  {
+    GNUNET_assert (NULL != msg);
+    GNUNET_memcpy (&res[1], msg, msg_size);
+  }
+
+  GNUNET_MQ_send (rh->grp->mq, env);
+
+  if (GNUNET_MULTICAST_REC_OK != ec)
+    GNUNET_free (rh);
 }
 
 
@@ -336,20 +678,35 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
  *
  * Invalidates the replay handle.
  *
- * @param rh Replay session to end.
+ * @param rh
+ *        Replay session to end.
  */
 void
 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
+  struct MulticastReplayResponseMessage *end;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
+
+  end->fragment_id = rh->req.fragment_id;
+  end->message_id = rh->req.message_id;
+  end->fragment_offset = rh->req.fragment_offset;
+  end->flags = rh->req.flags;
+
+  GNUNET_MQ_send (rh->grp->mq, env);
+  GNUNET_free (rh);
 }
 
 
 /**
  * Replay a message for the multicast group.
  *
- * @param rh Replay handle identifying which replay operation was requested.
- * @param notify Function to call to get the message.
- * @param notify_cls Closure for @a notify.
+ * @param rh
+ *        Replay handle identifying which replay operation was requested.
+ * @param notify
+ *        Function to call to get the message.
+ * @param notify_cls
+ *        Closure for @a notify.
  */
 void
 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
@@ -359,6 +716,83 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
 }
 
 
+static void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig);
+
+
+static void
+origin_reconnect (void *cls)
+{
+  origin_connect (cls);
+}
+
+
+/**
+ * Origin client disconnected from service.
+ *
+ * Reconnect after backoff period.
+ */
+static void
+origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_MULTICAST_Origin *orig = cls;
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Origin client disconnected (%d), re-connecting\n",
+       (int) error);
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
+
+  grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
+                                                      origin_reconnect,
+                                                      orig);
+  grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+}
+
+
+/**
+ * Connect to service as origin.
+ */
+static void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig)
+{
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (group_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           grp),
+    GNUNET_MQ_hd_var_size (origin_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                           struct GNUNET_MULTICAST_RequestHeader,
+                           orig),
+    GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+                             struct GNUNET_MessageHeader,
+                             grp),
+    GNUNET_MQ_hd_var_size (group_join_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                           struct MulticastJoinRequestMessage,
+                           grp),
+    GNUNET_MQ_hd_fixed_size (group_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+    GNUNET_MQ_handler_end ()
+  };
+
+  grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+                                   handlers, origin_disconnected, orig);
+  GNUNET_assert (NULL != grp->mq);
+  GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
+}
+
+
 /**
  * Start a multicast group.
  *
@@ -371,28 +805,36 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
  * candidate will be given a response.  Members in the group can send messages
  * to the origin (one at a time).
  *
- * @param cfg Configuration to use.
- * @param priv_key ECC key that will be used to sign messages for this
+ * @param cfg
+ *        Configuration to use.
+ * @param priv_key
+ *        ECC key that will be used to sign messages for this
  *        multicast session; public key is used to identify the multicast group;
- * @param next_fragment_id Next fragment ID to continue counting fragments from
- *        when restarting the origin.  0 for a new group.
- * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param mem_test_cb Function multicast can use to test group membership.
- * @param replay_frag_cb Function that can be called to replay a message fragment.
- * @param replay_msg_cb Function that can be called to replay a message.
- * @param request_cb Function called with message fragments from group members.
- * @param message_cb Function called with the message fragments sent to the
+ * @param max_fragment_id
+ *        Maximum fragment ID already sent to the group.
+ *        0 for a new group.
+ * @param join_request_cb
+ *        Function called to approve / disapprove joining of a peer.
+ * @param replay_frag_cb
+ *        Function that can be called to replay a message fragment.
+ * @param replay_msg_cb
+ *        Function that can be called to replay a message.
+ * @param request_cb
+ *        Function called with message fragments from group members.
+ * @param message_cb
+ *        Function called with the message fragments sent to the
  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
  *        should be stored for answering replay requests later.
- * @param cls Closure for the various callbacks that follow.
+ * @param cls
+ *        Closure for the various callbacks that follow.
+ *
  * @return Handle for the origin, NULL on error.
  */
 struct GNUNET_MULTICAST_Origin *
 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
-                               uint64_t next_fragment_id,
-                               GNUNET_MULTICAST_JoinCallback join_cb,
-                               GNUNET_MULTICAST_MembershipTestCallback mem_test_cb,
+                               uint64_t max_fragment_id,
+                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
                                GNUNET_MULTICAST_RequestCallback request_cb,
@@ -400,29 +842,27 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                void *cls)
 {
   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
-  orig->grp.is_origin = GNUNET_YES;
-  orig->priv_key = *priv_key;
-  orig->next_fragment_id = next_fragment_id;
-  orig->join_cb = join_cb;
-  orig->mem_test_cb = mem_test_cb;
-  orig->replay_frag_cb = replay_frag_cb;
-  orig->replay_msg_cb = replay_msg_cb;
-  orig->request_cb = request_cb;
-  orig->message_cb = message_cb;
-  orig->cls = cls;
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
 
-  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key);
-  GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key),
-                      &orig->pub_key_hash);
+  struct MulticastOriginStartMessage *start;
+  grp->connect_env = GNUNET_MQ_msg (start,
+                                    GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
+  start->max_fragment_id = max_fragment_id;
+  start->group_key = *priv_key;
 
-  if (NULL == origins)
-    origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  grp->cfg = cfg;
+  grp->is_origin = GNUNET_YES;
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 
-  GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  grp->cb_cls = cls;
+  grp->join_req_cb = join_request_cb;
+  grp->replay_frag_cb = replay_frag_cb;
+  grp->replay_msg_cb = replay_msg_cb;
+  grp->message_cb = message_cb;
 
-  /* FIXME: send ORIGIN_START to service */
+  orig->request_cb = request_cb;
 
+  origin_connect (orig);
   return orig;
 }
 
@@ -430,130 +870,216 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
 /**
  * Stop a multicast group.
  *
- * @param origin Multicast group to stop.
+ * @param origin
+ *        Multicast group to stop.
  */
 void
-GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
+GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
+                              GNUNET_ContinuationCallback stop_cb,
+                              void *stop_cls)
 {
-  GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig);
-  GNUNET_free (orig);
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  group_disconnect (grp, stop_cb, stop_cls);
 }
 
 
-/* FIXME: for now just call clients' callbacks
- *        without sending anything to multicast. */
 static void
-schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n");
-  struct GNUNET_MULTICAST_Origin *orig = cls;
-  struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
+
+  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+  struct GNUNET_MULTICAST_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
+                               GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
 
-  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
-  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
-  struct GNUNET_MULTICAST_MessageHeader *msg
-    = (struct GNUNET_MULTICAST_MessageHeader *) buf;
-  int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
+  int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
-      || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+      || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "OriginTransmitNotify() returned error or invalid message size.\n");
+         "%p OriginTransmitNotify() returned error or invalid message size.\n",
+         orig);
     /* FIXME: handle error */
+    GNUNET_MQ_discard (env);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p OriginTransmitNotify() - transmission paused.\n", orig);
+    GNUNET_MQ_discard (env);
     return; /* Transmission paused. */
+  }
 
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
   msg->header.size = htons (sizeof (*msg) + buf_size);
-  msg->message_id = GNUNET_htonll (mh->message_id);
-  msg->group_generation = mh->group_generation;
-
-  /* FIXME: add fragment ID and signature in the service instead of here */
-  msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++);
-  msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset);
-  mh->fragment_offset += sizeof (*msg) + buf_size;
-  msg->purpose.size = htonl (sizeof (*msg) + buf_size
-                             - sizeof (msg->header)
-                             - sizeof (msg->hop_counter)
-                             - sizeof (msg->signature));
-  msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
-
-  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
-                                           &msg->signature))
-  {
-    /* FIXME: handle error */
-    return;
-  }
+  msg->message_id = GNUNET_htonll (tmit->message_id);
+  msg->group_generation = tmit->group_generation;
+  msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
+  tmit->fragment_offset += sizeof (*msg) + buf_size;
 
-  /* FIXME: send msg to the service and only then call handle_multicast_message
-   *        with the returned signed message.
-   */
-  handle_multicast_message (&orig->grp, msg);
+  grp->acks_pending++;
+  GNUNET_MQ_send (grp->mq, env);
 
-  if (GNUNET_NO == ret)
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                  (GNUNET_TIME_UNIT_SECONDS, 1),
-                                  schedule_origin_to_all, orig);
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
 /**
  * Send a message to the multicast group.
  *
- * @param origin Handle to the multicast group.
- * @param message_id Application layer ID for the message.  Opaque to multicast.
- * @param group_generation Group generation of the message.  Documented in
- *             `struct GNUNET_MULTICAST_MessageHeader`.
- * @param notify Function to call to get the message.
- * @param notify_cls Closure for @a notify.
- * @return NULL on error (i.e. request already pending).
+ * @param orig
+ *        Handle to the multicast group.
+ * @param message_id
+ *        Application layer ID for the message.  Opaque to multicast.
+ * @param group_generation
+ *        Group generation of the message.
+ *        Documented in struct GNUNET_MULTICAST_MessageHeader.
+ * @param notify
+ *        Function to call to get the message.
+ * @param notify_cls
+ *        Closure for @a notify.
+ *
+ * @return Message handle on success,
+ *         NULL on error (i.e. another request is already pending).
  */
-struct GNUNET_MULTICAST_OriginMessageHandle *
-GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
+struct GNUNET_MULTICAST_OriginTransmitHandle *
+GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
                                 uint64_t message_id,
                                 uint64_t group_generation,
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
-  struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle;
-  mh->origin = origin;
-  mh->message_id = message_id;
-  mh->group_generation = group_generation;
-  mh->notify = notify;
-  mh->notify_cls = notify_cls;
-
-  /* FIXME: remove delay, it's there only for testing */
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                (GNUNET_TIME_UNIT_SECONDS, 1),
-                                schedule_origin_to_all, origin);
-  return &origin->msg_handle;
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  if (GNUNET_YES == grp->in_transmit)
+    return NULL;
+  grp->in_transmit = GNUNET_YES;
+
+  struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  tmit->origin = orig;
+  tmit->message_id = message_id;
+  tmit->fragment_offset = 0;
+  tmit->group_generation = group_generation;
+  tmit->notify = notify;
+  tmit->notify_cls = notify_cls;
+
+  origin_to_all (orig);
+  return tmit;
 }
 
 
 /**
  * Resume message transmission to multicast group.
  *
- * @param mh Request to cancel.
+ * @param th
+ *        Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh)
+GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
-  GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin);
+  struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
+  origin_to_all (th->origin);
 }
 
 
 /**
  * Cancel request for message transmission to multicast group.
  *
- * @param mh Request to cancel.
+ * @param th
+ *        Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHandle *mh)
+GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
+{
+  th->origin->grp.in_transmit = GNUNET_NO;
+}
+
+
+static void
+member_connect (struct GNUNET_MULTICAST_Member *mem);
+
+
+static void
+member_reconnect (void *cls)
+{
+  member_connect (cls);
+}
+
+
+/**
+ * Member client disconnected from service.
+ *
+ * Reconnect after backoff period.
+ */
+static void
+member_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_MULTICAST_Member *mem = cls;
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Member client disconnected (%d), re-connecting\n",
+       (int) error);
+  GNUNET_MQ_destroy (grp->mq);
+  grp->mq = NULL;
+
+  grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
+                                                      member_reconnect,
+                                                      mem);
+  grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+}
+
+
+/**
+ * Connect to service as member.
+ */
+static void
+member_connect (struct GNUNET_MULTICAST_Member *mem)
 {
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (group_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           grp),
+    GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+                             struct GNUNET_MessageHeader,
+                             grp),
+    GNUNET_MQ_hd_var_size (group_join_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                           struct MulticastJoinRequestMessage,
+                           grp),
+    GNUNET_MQ_hd_var_size (member_join_decision,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                           struct MulticastJoinDecisionMessageHeader,
+                           mem),
+    GNUNET_MQ_hd_fixed_size (group_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+    GNUNET_MQ_hd_var_size (member_replay_response,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                           struct MulticastReplayResponseMessage,
+                           mem),
+    GNUNET_MQ_handler_end ()
+  };
+
+  grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+                                   handlers, member_disconnected, mem);
+  GNUNET_assert (NULL != grp->mq);
+  GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
 }
 
 
@@ -565,83 +1091,93 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHand
  * @a message_cb is invoked with a (failure) response and then with NULL.  If
  * the join succeeds, outstanding (state) messages and ongoing multicast
  * messages will be given to the @a message_cb until the member decides to part
- * the group.  The @a test_cb and @a replay_cb functions may be called at
- * anytime by the multicast service to support relaying messages to other
- * members of the group.
- *
- * @param cfg Configuration to use.
- * @param group_key ECC public key that identifies the group to join.
- * @param member_key ECC key that identifies the member and used to sign
- *        requests sent to the origin.
- * @param origin Peer ID of the origin to send unicast requsets to.  If NULL,
+ * the group.  The @a replay_cb function may be called at any time by the
+ * multicast service to support relaying messages to other members of the group.
+ *
+ * @param cfg
+ *        Configuration to use.
+ * @param group_key
+ *        ECC public key that identifies the group to join.
+ * @param member_key
+ *        ECC key that identifies the member
+ *        and used to sign requests sent to the origin.
+ * @param origin
+ *        Peer ID of the origin to send unicast requsets to.  If NULL,
  *        unicast requests are sent back via multiple hops on the reverse path
  *        of multicast messages.
- * @param relay_count Number of peers in the @a relays array.
- * @param relays Peer identities of members of the group, which serve as relays
+ * @param relay_count
+ *        Number of peers in the @a relays array.
+ * @param relays
+ *        Peer identities of members of the group, which serve as relays
  *        and can be used to join the group at. and send the @a join_request to.
  *        If empty, the @a join_request is sent directly to the @a origin.
- * @param join_request  Application-dependent join request to be passed to the peer
- *        @a relay (might, for example, contain a user, bind user
- *        identity/pseudonym to peer identity, application-level message to
- *        origin, etc.).
- * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param mem_test_cb Function multicast can use to test group membership.
- * @param replay_frag_cb Function that can be called to replay message fragments
+ * @param join_msg
+ *        Application-dependent join message to be passed to the peer @a origin.
+ * @param join_request_cb
+ *        Function called to approve / disapprove joining of a peer.
+ * @param join_decision_cb
+ *        Function called to inform about the join decision.
+ * @param replay_frag_cb
+ *        Function that can be called to replay message fragments
  *        this peer already knows from this group. NULL if this
  *        client is unable to support replay.
- * @param replay_msg_cb Function that can be called to replay message fragments
+ * @param replay_msg_cb
+ *        Function that can be called to replay message fragments
  *        this peer already knows from this group. NULL if this
  *        client is unable to support replay.
- * @param message_cb Function to be called for all message fragments we
+ * @param message_cb
+ *        Function to be called for all message fragments we
  *        receive from the group, excluding those our @a replay_cb
  *        already has.
- * @param cls Closure for callbacks.
+ * @param cls
+ *        Closure for callbacks.
+ *
  * @return Handle for the member, NULL on error.
  */
 struct GNUNET_MULTICAST_Member *
 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
-                              const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key,
+                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
+                              const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
                               const struct GNUNET_PeerIdentity *origin,
-                              uint32_t relay_count,
+                              uint16_t relay_count,
                               const struct GNUNET_PeerIdentity *relays,
-                              const struct GNUNET_MessageHeader *join_request,
-                              GNUNET_MULTICAST_JoinCallback join_cb,
-                              GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
+                              const struct GNUNET_MessageHeader *join_msg,
+                              GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
+                              GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
                               GNUNET_MULTICAST_MessageCallback message_cb,
                               void *cls)
 {
   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
-  mem->group_key = *group_key;
-  mem->member_key = *member_key;
-  mem->origin = *origin;
-  mem->relay_count = relay_count;
-  mem->relays = *relays;
-  mem->join_cb = join_cb;
-  mem->member_test_cb = member_test_cb;
-  mem->replay_frag_cb = replay_frag_cb;
-  mem->message_cb = message_cb;
-  mem->cls = cls;
-
-  if (NULL != join_request)
-  {
-    uint16_t size = ntohs (join_request->size);
-    mem->join_request = GNUNET_malloc (size);
-    memcpy (mem->join_request, join_request, size);
-  }
-
-  GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), &mem->group_key_hash);
-
-  if (NULL == members)
-    members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
-  GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  /* FIXME: send MEMBER_JOIN to service */
-
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  uint16_t relay_size = relay_count * sizeof (*relays);
+  uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
+  struct MulticastMemberJoinMessage *join;
+  grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
+                                          GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
+  join->group_pub_key = *group_pub_key;
+  join->member_key = *member_key;
+  join->origin = *origin;
+  join->relay_count = ntohl (relay_count);
+  if (0 < relay_size)
+    GNUNET_memcpy (&join[1], relays, relay_size);
+  if (0 < join_msg_size)
+    GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
+
+  grp->cfg = cfg;
+  grp->is_origin = GNUNET_NO;
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+
+  mem->join_dcsn_cb = join_decision_cb;
+  grp->join_req_cb = join_request_cb;
+  grp->replay_frag_cb = replay_frag_cb;
+  grp->replay_msg_cb = replay_msg_cb;
+  grp->message_cb = message_cb;
+  grp->cb_cls = cls;
+
+  member_connect (mem);
   return mem;
 }
 
@@ -654,13 +1190,44 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
  * An application-dependent part message can be transmitted beforehand using
  * #GNUNET_MULTICAST_member_to_origin())
  *
- * @param member Membership handle.
+ * @param member
+ *        Membership handle.
  */
 void
-GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
+GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
+                              GNUNET_ContinuationCallback part_cb,
+                              void *part_cls)
 {
-  GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem);
-  GNUNET_free (mem);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  mem->join_dcsn_cb = NULL;
+  grp->join_req_cb = NULL;
+  grp->message_cb = NULL;
+  grp->replay_msg_cb = NULL;
+  grp->replay_frag_cb = NULL;
+
+  group_disconnect (grp, part_cb, part_cls);
+}
+
+
+void
+member_replay_request (struct GNUNET_MULTICAST_Member *mem,
+                       uint64_t fragment_id,
+                       uint64_t message_id,
+                       uint64_t fragment_offset,
+                       uint64_t flags)
+{
+  struct MulticastReplayRequestMessage *rep;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
+
+  rep->fragment_id = GNUNET_htonll (fragment_id);
+  rep->message_id = GNUNET_htonll (message_id);
+  rep->fragment_offset = GNUNET_htonll (fragment_offset);
+  rep->flags = GNUNET_htonll (flags);
+
+  GNUNET_MQ_send (mem->grp.mq, env);
 }
 
 
@@ -670,19 +1237,23 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
  * Useful if messages below the @e max_known_fragment_id given when joining are
  * needed and not known to the client.
  *
- * @param member Membership handle.
- * @param fragment_id ID of a message fragment that this client would like to
-          see replayed.
- * @param flags Additional flags for the replay request.  It is used and defined
- *        by the replay callback.  FIXME: which replay callback? FIXME: use enum?
- *        FIXME: why not pass reply cb here?
- * @return Replay request handle, NULL on error.
+ * @param member
+ *        Membership handle.
+ * @param fragment_id
+ *        ID of a message fragment that this client would like to see replayed.
+ * @param flags
+ *        Additional flags for the replay request.
+ *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
+ *
+ * @return Replay request handle.
  */
 struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
                                          uint64_t fragment_id,
                                          uint64_t flags)
 {
+  member_replay_request (mem, fragment_id, 0, 0, flags);
+  // FIXME: return something useful
   return NULL;
 }
 
@@ -693,146 +1264,138 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
  * Useful if messages below the @e max_known_fragment_id given when joining are
  * needed and not known to the client.
  *
- * @param member Membership handle.
- * @param message_id ID of the message this client would like to see replayed.
- * @param fragment_offset Offset of the fragment within the message to replay.
- * @param flags Additional flags for the replay request.  It is used & defined
- *        by the replay callback.
- * @param result_cb Function to be called for the replayed message.
- * @param result_cb_cls Closure for @a result_cb.
+ * @param member
+ *        Membership handle.
+ * @param message_id
+ *        ID of the message this client would like to see replayed.
+ * @param fragment_offset
+ *        Offset of the fragment within the message to replay.
+ * @param flags
+ *        Additional flags for the replay request.
+ *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
+ *
  * @return Replay request handle, NULL on error.
  */
 struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
                                         uint64_t message_id,
                                         uint64_t fragment_offset,
-                                        uint64_t flags,
-                                        GNUNET_MULTICAST_ResultCallback result_cb,
-                                        void *result_cb_cls)
+                                        uint64_t flags)
 {
+  member_replay_request (mem, 0, message_id, fragment_offset, flags);
+  // FIXME: return something useful
   return NULL;
 }
 
 
-/**
- * Cancel a replay request.
- *
- * @param rh Request to cancel.
- */
-void
-GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh)
-{
-}
-
-
-/* FIXME: for now just send back to the client what it sent. */
 static void
-schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+member_to_origin (struct GNUNET_MULTICAST_Member *mem)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n");
-  struct GNUNET_MULTICAST_Member *mem = cls;
-  struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+  struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
-  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
-  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
-  struct GNUNET_MULTICAST_RequestHeader *req
-    = (struct GNUNET_MULTICAST_RequestHeader *) buf;
-  int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]);
+  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+  struct GNUNET_MULTICAST_RequestHeader *req;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
+                               GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
+
+  int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
-      || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+      || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MemberTransmitNotify() returned error or invalid message size.\n");
+         "MemberTransmitNotify() returned error or invalid message size. "
+         "ret=%d, buf_size=%u\n", ret, buf_size);
     /* FIXME: handle error */
+    GNUNET_MQ_discard (env);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
-    return; /* Transmission paused. */
-
-  req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
-  req->header.size = htons (sizeof (*req) + buf_size);
-  req->request_id = GNUNET_htonll (rh->request_id);
-
-  /* FIXME: add fragment ID and signature in the service instead of here */
-  req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
-  req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
-  rh->fragment_offset += sizeof (*req) + buf_size;
-  req->purpose.size = htonl (sizeof (*req) + buf_size
-                             - sizeof (req->header)
-                             - sizeof (req->member_key)
-                             - sizeof (req->signature));
-  req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
-
-  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
-                                           &req->signature))
   {
-    /* FIXME: handle error */
+    /* Transmission paused. */
+    GNUNET_MQ_discard (env);
     return;
   }
 
-  /* FIXME: send req to the service and only then call handle_multicast_request
-   *        with the returned request.
-   */
-  handle_multicast_request (&mem->group_key_hash, req);
+  req->header.size = htons (sizeof (*req) + buf_size);
+  req->request_id = GNUNET_htonll (tmit->request_id);
+  req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
+  tmit->fragment_offset += sizeof (*req) + buf_size;
 
-  if (GNUNET_NO == ret)
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                  (GNUNET_TIME_UNIT_SECONDS, 1),
-                                  schedule_member_to_origin, mem);
+  GNUNET_MQ_send (grp->mq, env);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
 /**
  * Send a message to the origin of the multicast group.
  *
- * @param member Membership handle.
- * @param request_id Application layer ID for the request.  Opaque to multicast.
- * @param notify Callback to call to get the message.
- * @param notify_cls Closure for @a notify.
+ * @param mem
+ *        Membership handle.
+ * @param request_id
+ *        Application layer ID for the request.  Opaque to multicast.
+ * @param notify
+ *        Callback to call to get the message.
+ * @param notify_cls
+ *        Closure for @a notify.
+ *
  * @return Handle to cancel request, NULL on error (i.e. request already pending).
  */
-struct GNUNET_MULTICAST_MemberRequestHandle *
-GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
+struct GNUNET_MULTICAST_MemberTransmitHandle *
+GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
                                    uint64_t request_id,
                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
                                    void *notify_cls)
 {
-  struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle;
-  rh->member = member;
-  rh->request_id = request_id;
-  rh->notify = notify;
-  rh->notify_cls = notify_cls;
-
-  /* FIXME: remove delay, it's there only for testing */
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                (GNUNET_TIME_UNIT_SECONDS, 1),
-                                schedule_member_to_origin, member);
-  return &member->req_handle;
+  if (GNUNET_YES == mem->grp.in_transmit)
+    return NULL;
+  mem->grp.in_transmit = GNUNET_YES;
+
+  struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  tmit->member = mem;
+  tmit->request_id = request_id;
+  tmit->fragment_offset = 0;
+  tmit->notify = notify;
+  tmit->notify_cls = notify_cls;
+
+  member_to_origin (mem);
+  return tmit;
 }
 
 
 /**
  * Resume message transmission to origin.
  *
- * @param rh Request to cancel.
+ * @param th
+ *        Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestHandle *rh)
+GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
-
+  struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
+  member_to_origin (th->member);
 }
 
 
 /**
  * Cancel request for message transmission to origin.
  *
- * @param rh Request to cancel.
+ * @param th
+ *        Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberRequestHandle *rh)
+GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  th->member->grp.in_transmit = GNUNET_NO;
 }