remove legacy core api code (now dead)
[oweals/gnunet.git] / src / multicast / multicast_api.c
index 76c7fb004f4fa2994a6b5b0d8a3f0b29082ba407..7cfe708359db25b447ffc246f3ffcb21ed38c69f 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012, 2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -73,12 +73,22 @@ struct GNUNET_MULTICAST_Group
   /**
    * Client connection to the service.
    */
-  struct GNUNET_CLIENT_MANAGER_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
-   * Message to send on reconnect.
+   * Message to send on connect.
    */
-  struct GNUNET_MessageHeader *connect_msg;
+  struct GNUNET_MQ_Envelope *connect_env;
+
+  /**
+   * Time to wait until we try to reconnect on failure.
+   */
+  struct GNUNET_TIME_Relative reconnect_delay;
+
+  /**
+   * Task for reconnecting when the listener fails.
+   */
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
@@ -101,6 +111,11 @@ struct GNUNET_MULTICAST_Group
    */
   uint8_t in_transmit;
 
+  /**
+   * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+   */
+  uint8_t acks_pending;
+
   /**
    * Is this the origin or a member?
    */
@@ -158,7 +173,7 @@ struct GNUNET_MULTICAST_JoinHandle
   /**
    * Public key of the member requesting join.
    */
-  struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
+  struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
 
   /**
    * Peer identity of the member requesting join.
@@ -182,37 +197,32 @@ struct GNUNET_MULTICAST_ReplayHandle
  */
 struct GNUNET_MULTICAST_MemberReplayHandle
 {
-
-  GNUNET_MULTICAST_ResultCallback result_cb;
-  void *result_cls;
 };
 
 
-/**
- * Send first message to the service after connecting.
- */
 static void
-group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
-{
-  uint16_t cmsg_size = ntohs (grp->connect_msg->size);
-  struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
-  memcpy (cmsg, grp->connect_msg, cmsg_size);
-  GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
-}
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
 
 
 /**
- * Got disconnected from service.  Reconnect.
+ * Check join request message.
  */
-static void
-group_recv_disconnect (void *cls,
-                        struct GNUNET_CLIENT_MANAGER_Connection *client,
-                        const struct GNUNET_MessageHeader *msg)
+static int
+check_group_join_request (void *cls,
+                          const struct MulticastJoinRequestMessage *jreq)
 {
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
-  GNUNET_CLIENT_MANAGER_reconnect (client);
-  group_send_connect_msg (grp);
+  uint16_t size = ntohs (jreq->header.size);
+
+  if (sizeof (*jreq) == size)
+    return GNUNET_OK;
+
+  if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
+    return GNUNET_OK;
+
+  return GNUNET_SYSERR;
 }
 
 
@@ -220,16 +230,13 @@ group_recv_disconnect (void *cls,
  * Receive join request from service.
  */
 static void
-group_recv_join_request (void *cls,
-                          struct GNUNET_CLIENT_MANAGER_Connection *client,
-                          const struct GNUNET_MessageHeader *msg)
+handle_group_join_request (void *cls,
+                           const struct MulticastJoinRequestMessage *jreq)
 {
-  struct GNUNET_MULTICAST_Group *grp;
-  const struct MulticastJoinRequestMessage *jreq;
+  struct GNUNET_MULTICAST_Group *grp = cls;
   struct GNUNET_MULTICAST_JoinHandle *jh;
-  const struct GNUNET_MessageHeader *jmsg;
+  const struct GNUNET_MessageHeader *jmsg = NULL;
 
-  grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
   if (NULL == grp)
   {
     GNUNET_break (0);
@@ -237,17 +244,28 @@ group_recv_join_request (void *cls,
   }
   if (NULL == grp->join_req_cb)
     return;
-  /* FIXME: this fails to check that 'msg' is well-formed! */
-  jreq = (const struct MulticastJoinRequestMessage *) msg;
+
   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
-  else
-    jmsg = NULL;
+
   jh = GNUNET_malloc (sizeof (*jh));
   jh->group = grp;
-  jh->member_key = jreq->member_key;
+  jh->member_pub_key = jreq->member_pub_key;
   jh->peer = jreq->peer;
-  grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
+  grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Check multicast message.
+ */
+static int
+check_group_message (void *cls,
+                     const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  return GNUNET_OK;
 }
 
 
@@ -255,14 +273,10 @@ group_recv_join_request (void *cls,
  * Receive multicast message from service.
  */
 static void
-group_recv_message (void *cls,
-                    struct GNUNET_CLIENT_MANAGER_Connection *client,
-                    const struct GNUNET_MessageHeader *msg)
+handle_group_message (void *cls,
+                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
 {
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
-  struct GNUNET_MULTICAST_MessageHeader *
-    mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
+  struct GNUNET_MULTICAST_Group *grp = cls;
 
   if (GNUNET_YES == grp->is_disconnecting)
     return;
@@ -273,23 +287,65 @@ group_recv_message (void *cls,
 
   if (NULL != grp->message_cb)
     grp->message_cb (grp->cb_cls, mmsg);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
 /**
- * Origin receives uniquest request from a member.
+ * Receive message/request fragment acknowledgement from service.
  */
 static void
-origin_recv_request (void *cls,
-                     struct GNUNET_CLIENT_MANAGER_Connection *client,
-                     const struct GNUNET_MessageHeader *msg)
+handle_group_fragment_ack (void *cls,
+                           const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+       grp, grp->in_transmit, grp->acks_pending);
+
+  if (0 == grp->acks_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p Ignoring extraneous fragment ACK.\n", grp);
+    return;
+  }
+  grp->acks_pending--;
+
+  if (GNUNET_YES != grp->in_transmit)
+    return;
+
+  if (GNUNET_YES == grp->is_origin)
+    origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+  else
+    member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Check unicast request.
+ */
+static int
+check_origin_request (void *cls,
+                      const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  return GNUNET_OK;
+}
+
+
+/**
+ * Origin receives unicast request from a member.
+ */
+static void
+handle_origin_request (void *cls,
+                       const struct GNUNET_MULTICAST_RequestHeader *req)
 {
   struct GNUNET_MULTICAST_Group *grp;
-  struct GNUNET_MULTICAST_Origin *
-    orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Origin *orig = cls;
   grp = &orig->grp;
-  struct GNUNET_MULTICAST_RequestHeader *
-    req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Calling request callback with a request of size %u.\n",
@@ -297,6 +353,8 @@ origin_recv_request (void *cls,
 
   if (NULL != orig->request_cb)
     orig->request_cb (grp->cb_cls, req);
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
@@ -304,14 +362,11 @@ origin_recv_request (void *cls,
  * Receive multicast replay request from service.
  */
 static void
-group_recv_replay_request (void *cls,
-                           struct GNUNET_CLIENT_MANAGER_Connection *client,
-                           const struct GNUNET_MessageHeader *msg)
+handle_group_replay_request (void *cls,
+                             const struct MulticastReplayRequestMessage *rep)
+
 {
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
-  struct MulticastReplayRequestMessage *
-    rep = (struct MulticastReplayRequestMessage *) msg;
+  struct GNUNET_MULTICAST_Group *grp = cls;
 
   if (GNUNET_YES == grp->is_disconnecting)
     return;
@@ -325,7 +380,7 @@ group_recv_replay_request (void *cls,
       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
       rh->grp = grp;
       rh->req = *rep;
-      grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
+      grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
                            GNUNET_ntohll (rep->fragment_id),
                            GNUNET_ntohll (rep->flags), rh);
     }
@@ -337,51 +392,78 @@ group_recv_replay_request (void *cls,
       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
       rh->grp = grp;
       rh->req = *rep;
-      grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
+      grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
                           GNUNET_ntohll (rep->message_id),
                           GNUNET_ntohll (rep->fragment_offset),
                           GNUNET_ntohll (rep->flags), rh);
     }
   }
+
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
 /**
- * Receive multicast replay request from service.
+ * Check replay response.
+ */
+static int
+check_member_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
+{
+  uint16_t size = ntohs (res->header.size);
+
+  if (sizeof (*res) == size)
+    return GNUNET_OK;
+
+  if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
+    return GNUNET_OK;
+
+  return GNUNET_SYSERR;
+}
+
+
+/**
+ * Receive replay response from service.
  */
 static void
-member_recv_replay_response (void *cls,
-                            struct GNUNET_CLIENT_MANAGER_Connection *client,
-                            const struct GNUNET_MessageHeader *msg)
+handle_member_replay_response (void *cls,
+                               const struct MulticastReplayResponseMessage *res)
 {
   struct GNUNET_MULTICAST_Group *grp;
-  struct GNUNET_MULTICAST_Member *
-    mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Member *mem = cls;
   grp = &mem->grp;
-  struct MulticastReplayResponseMessage *
-    res = (struct MulticastReplayResponseMessage *) msg;
 
   if (GNUNET_YES == grp->is_disconnecting)
     return;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
+
+  // FIXME: return result
+}
+
+
+/**
+ * Check join decision.
+ */
+static int
+check_member_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  return GNUNET_OK; // checked in handle below
 }
 
+
 /**
  * Member receives join decision.
  */
 static void
-member_recv_join_decision (void *cls,
-                           struct GNUNET_CLIENT_MANAGER_Connection *client,
-                           const struct GNUNET_MessageHeader *msg)
+handle_member_join_decision (void *cls,
+                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
 {
   struct GNUNET_MULTICAST_Group *grp;
-  struct GNUNET_MULTICAST_Member *
-    mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Member *mem = cls;
   grp = &mem->grp;
 
-  const struct MulticastJoinDecisionMessageHeader *
-    hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
   const struct MulticastJoinDecisionMessage *
     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
 
@@ -432,91 +514,59 @@ member_recv_join_decision (void *cls,
   // FIXME:
   //if (GNUNET_YES != is_admitted)
   //  GNUNET_MULTICAST_member_part (mem);
-}
-
-
-/**
- * Message handlers for an origin.
- */
-static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
-{
-  { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
-
-  { group_recv_message, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
-    sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
-
-  { origin_recv_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
-    sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
-
-  { group_recv_join_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
-    sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
-
-  { group_recv_replay_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
-    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
-  { NULL, NULL, 0, 0, GNUNET_NO }
-};
-
-
-/**
- * Message handlers for a member.
- */
-static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
-{
-  { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
-
-  { group_recv_message, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
-    sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
-
-  { group_recv_join_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
-    sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
 
-  { member_recv_join_decision, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
-    sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
-
-  { group_recv_replay_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
-    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
-  { member_recv_replay_response, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
-    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
-  { NULL, NULL, 0, 0, GNUNET_NO }
-};
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
+}
 
 
 static void
 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
 {
-  GNUNET_free (grp->connect_msg);
+  if (NULL != grp->connect_env)
+  {
+    GNUNET_MQ_discard (grp->connect_env);
+    grp->connect_env = NULL;
+  }
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
   if (NULL != grp->disconnect_cb)
+  {
     grp->disconnect_cb (grp->disconnect_cls);
+    grp->disconnect_cb = NULL;
+  }
+  GNUNET_free (grp);
 }
 
 
 static void
-origin_cleanup (void *cls)
+group_disconnect (struct GNUNET_MULTICAST_Group *grp,
+                  GNUNET_ContinuationCallback cb,
+                  void *cls)
 {
-  struct GNUNET_MULTICAST_Origin *orig = cls;
-  group_cleanup (&orig->grp);
-  GNUNET_free (orig);
-}
-
+  grp->is_disconnecting = GNUNET_YES;
+  grp->disconnect_cb = cb;
+  grp->disconnect_cls = cls;
 
-static void
-member_cleanup (void *cls)
-{
-  struct GNUNET_MULTICAST_Member *mem = cls;
-  group_cleanup (&mem->grp);
-  GNUNET_free (mem);
+  if (NULL != grp->mq)
+  {
+    struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
+    if (NULL != last)
+    {
+      GNUNET_MQ_notify_sent (last,
+                             (GNUNET_MQ_NotifyCallback) group_cleanup, grp);
+    }
+    else
+    {
+      group_cleanup (grp);
+    }
+  }
+  else
+  {
+    group_cleanup (grp);
+  }
 }
 
 
@@ -559,14 +609,12 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
   uint16_t relay_size = relay_count * sizeof (*relays);
 
-  struct MulticastJoinDecisionMessageHeader * hdcsn;
+  struct MulticastJoinDecisionMessageHeader *hdcsn;
   struct MulticastJoinDecisionMessage *dcsn;
-  hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
-                         + relay_size + join_resp_size);
-  hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
-                              + relay_size + join_resp_size);
-  hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
-  hdcsn->member_key = join->member_key;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
+                               GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+  hdcsn->member_pub_key = join->member_pub_key;
   hdcsn->peer = join->peer;
 
   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -575,11 +623,11 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
   dcsn->is_admitted = htonl (is_admitted);
   dcsn->relay_count = htonl (relay_count);
   if (0 < relay_size)
-    memcpy (&dcsn[1], relays, relay_size);
+    GNUNET_memcpy (&dcsn[1], relays, relay_size);
   if (0 < join_resp_size)
-    memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
+    GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
 
-  GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+  GNUNET_MQ_send (grp->mq, env);
   GNUNET_free (join);
   return NULL;
 }
@@ -602,28 +650,23 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
 {
   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
-  struct MulticastReplayResponseMessage *
-    res = GNUNET_malloc (sizeof (*res) + msg_size);
-  *res = (struct MulticastReplayResponseMessage) {
-    .header = {
-      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
-      .size = htons (sizeof (*res) + msg_size),
-    },
-    .fragment_id = rh->req.fragment_id,
-    .message_id = rh->req.message_id,
-    .fragment_offset = rh->req.fragment_offset,
-    .flags = rh->req.flags,
-    .error_code = htonl (ec),
-  };
+  struct MulticastReplayResponseMessage *res;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (res, msg_size,
+                               GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
+  res->fragment_id = rh->req.fragment_id;
+  res->message_id = rh->req.message_id;
+  res->fragment_offset = rh->req.fragment_offset;
+  res->flags = rh->req.flags;
+  res->error_code = htonl (ec);
 
   if (GNUNET_MULTICAST_REC_OK == ec)
   {
     GNUNET_assert (NULL != msg);
-    memcpy (&res[1], msg, msg_size);
+    GNUNET_memcpy (&res[1], msg, msg_size);
   }
 
-  GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
-  GNUNET_free (res);
+  GNUNET_MQ_send (rh->grp->mq, env);
 
   if (GNUNET_MULTICAST_REC_OK != ec)
     GNUNET_free (rh);
@@ -641,18 +684,16 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
 void
 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  struct MulticastReplayResponseMessage end = {
-    .header = {
-      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
-      .size = htons (sizeof (end)),
-    },
-    .fragment_id = rh->req.fragment_id,
-    .message_id = rh->req.message_id,
-    .fragment_offset = rh->req.fragment_offset,
-    .flags = rh->req.flags,
-  };
+  struct MulticastReplayResponseMessage *end;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
+
+  end->fragment_id = rh->req.fragment_id;
+  end->message_id = rh->req.message_id;
+  end->fragment_offset = rh->req.fragment_offset;
+  end->flags = rh->req.flags;
 
-  GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
+  GNUNET_MQ_send (rh->grp->mq, env);
   GNUNET_free (rh);
 }
 
@@ -675,6 +716,83 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
 }
 
 
+static void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig);
+
+
+static void
+origin_reconnect (void *cls)
+{
+  origin_connect (cls);
+}
+
+
+/**
+ * Origin client disconnected from service.
+ *
+ * Reconnect after backoff period.
+ */
+static void
+origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_MULTICAST_Origin *orig = cls;
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Origin client disconnected (%d), re-connecting\n",
+       (int) error);
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
+
+  grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
+                                                      origin_reconnect,
+                                                      orig);
+  grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+}
+
+
+/**
+ * Connect to service as origin.
+ */
+static void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig)
+{
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (group_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           grp),
+    GNUNET_MQ_hd_var_size (origin_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                           struct GNUNET_MULTICAST_RequestHeader,
+                           orig),
+    GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+                             struct GNUNET_MessageHeader,
+                             grp),
+    GNUNET_MQ_hd_var_size (group_join_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                           struct MulticastJoinRequestMessage,
+                           grp),
+    GNUNET_MQ_hd_fixed_size (group_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+    GNUNET_MQ_handler_end ()
+  };
+
+  grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+                                   handlers, origin_disconnected, orig);
+  GNUNET_assert (NULL != grp->mq);
+  GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
+}
+
+
 /**
  * Start a multicast group.
  *
@@ -725,16 +843,16 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
 {
   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
-  struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
 
-  start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
-  start->header.size = htons (sizeof (*start));
+  struct MulticastOriginStartMessage *start;
+  grp->connect_env = GNUNET_MQ_msg (start,
+                                    GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
   start->max_fragment_id = max_fragment_id;
-  memcpy (&start->group_key, priv_key, sizeof (*priv_key));
+  start->group_key = *priv_key;
 
-  grp->connect_msg = (struct GNUNET_MessageHeader *) start;
-  grp->is_origin = GNUNET_YES;
   grp->cfg = cfg;
+  grp->is_origin = GNUNET_YES;
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 
   grp->cb_cls = cls;
   grp->join_req_cb = join_request_cb;
@@ -744,10 +862,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
   orig->request_cb = request_cb;
 
-  grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers);
-  GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
-  group_send_connect_msg (grp);
-
+  origin_connect (orig);
   return orig;
 }
 
@@ -765,50 +880,56 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
 {
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
 
-  grp->is_disconnecting = GNUNET_YES;
-  grp->disconnect_cb = stop_cb;
-  grp->disconnect_cls = stop_cls;
-
-  GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES,
-                                    &origin_cleanup, orig);
+  group_disconnect (grp, stop_cb, stop_cls);
 }
 
 
 static void
 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
-  struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
+  struct GNUNET_MULTICAST_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
+                               GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+
   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "OriginTransmitNotify() returned error or invalid message size.\n");
+         "%p OriginTransmitNotify() returned error or invalid message size.\n",
+         orig);
     /* FIXME: handle error */
-    GNUNET_free (msg);
+    GNUNET_MQ_discard (env);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
   {
-    GNUNET_free (msg);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p OriginTransmitNotify() - transmission paused.\n", orig);
+    GNUNET_MQ_discard (env);
     return; /* Transmission paused. */
   }
 
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
   msg->header.size = htons (sizeof (*msg) + buf_size);
   msg->message_id = GNUNET_htonll (tmit->message_id);
   msg->group_generation = tmit->group_generation;
   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*msg) + buf_size;
 
-  GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+  grp->acks_pending++;
+  GNUNET_MQ_send (grp->mq, env);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -837,15 +958,15 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
-/* FIXME
-  if (GNUNET_YES == orig->grp.in_transmit)
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  if (GNUNET_YES == grp->in_transmit)
     return NULL;
-  orig->grp.in_transmit = GNUNET_YES;
-*/
+  grp->in_transmit = GNUNET_YES;
 
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
   tmit->origin = orig;
   tmit->message_id = message_id;
+  tmit->fragment_offset = 0;
   tmit->group_generation = group_generation;
   tmit->notify = notify;
   tmit->notify_cls = notify_cls;
@@ -864,6 +985,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
 void
 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   origin_to_all (th->origin);
 }
 
@@ -877,6 +1001,85 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan
 void
 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  th->origin->grp.in_transmit = GNUNET_NO;
+}
+
+
+static void
+member_connect (struct GNUNET_MULTICAST_Member *mem);
+
+
+static void
+member_reconnect (void *cls)
+{
+  member_connect (cls);
+}
+
+
+/**
+ * Member client disconnected from service.
+ *
+ * Reconnect after backoff period.
+ */
+static void
+member_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_MULTICAST_Member *mem = cls;
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Member client disconnected (%d), re-connecting\n",
+       (int) error);
+  GNUNET_MQ_destroy (grp->mq);
+  grp->mq = NULL;
+
+  grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
+                                                      member_reconnect,
+                                                      mem);
+  grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+}
+
+
+/**
+ * Connect to service as member.
+ */
+static void
+member_connect (struct GNUNET_MULTICAST_Member *mem)
+{
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (group_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           grp),
+    GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+                             struct GNUNET_MessageHeader,
+                             grp),
+    GNUNET_MQ_hd_var_size (group_join_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                           struct MulticastJoinRequestMessage,
+                           grp),
+    GNUNET_MQ_hd_var_size (member_join_decision,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                           struct MulticastJoinDecisionMessageHeader,
+                           mem),
+    GNUNET_MQ_hd_fixed_size (group_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+    GNUNET_MQ_hd_var_size (member_replay_response,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                           struct MulticastReplayResponseMessage,
+                           mem),
+    GNUNET_MQ_handler_end ()
+  };
+
+  grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+                                   handlers, member_disconnected, mem);
+  GNUNET_assert (NULL != grp->mq);
+  GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
 }
 
 
@@ -933,7 +1136,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
  */
 struct GNUNET_MULTICAST_Member *
 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
+                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
                               const struct GNUNET_PeerIdentity *origin,
                               uint16_t relay_count,
@@ -951,22 +1154,21 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
   uint16_t relay_size = relay_count * sizeof (*relays);
   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
-  struct MulticastMemberJoinMessage *
-    join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
-  join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
-  join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
-  join->group_key = *group_key;
+  struct MulticastMemberJoinMessage *join;
+  grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
+                                          GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
+  join->group_pub_key = *group_pub_key;
   join->member_key = *member_key;
   join->origin = *origin;
   join->relay_count = ntohl (relay_count);
   if (0 < relay_size)
-    memcpy (&join[1], relays, relay_size);
+    GNUNET_memcpy (&join[1], relays, relay_size);
   if (0 < join_msg_size)
-    memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
+    GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
 
-  grp->connect_msg = (struct GNUNET_MessageHeader *) join;
-  grp->is_origin = GNUNET_NO;
   grp->cfg = cfg;
+  grp->is_origin = GNUNET_NO;
+  grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
 
   mem->join_dcsn_cb = join_decision_cb;
   grp->join_req_cb = join_request_cb;
@@ -975,10 +1177,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
   grp->message_cb = message_cb;
   grp->cb_cls = cls;
 
-  grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers);
-  GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
-  group_send_connect_msg (grp);
-
+  member_connect (mem);
   return mem;
 }
 
@@ -1002,18 +1201,13 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
 
-  grp->is_disconnecting = GNUNET_YES;
-  grp->disconnect_cb = part_cb;
-  grp->disconnect_cls = part_cls;
-
   mem->join_dcsn_cb = NULL;
   grp->join_req_cb = NULL;
   grp->message_cb = NULL;
   grp->replay_msg_cb = NULL;
   grp->replay_frag_cb = NULL;
 
-  GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
-                                    member_cleanup, mem);
+  group_disconnect (grp, part_cb, part_cls);
 }
 
 
@@ -1024,17 +1218,16 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem,
                        uint64_t fragment_offset,
                        uint64_t flags)
 {
-  struct MulticastReplayRequestMessage rep = {
-    .header = {
-      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
-      .size = htons (sizeof (rep)),
-    },
-    .fragment_id = GNUNET_htonll (fragment_id),
-    .message_id = GNUNET_htonll (message_id),
-    .fragment_offset = GNUNET_htonll (fragment_offset),
-    .flags = GNUNET_htonll (flags),
-  };
-  GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
+  struct MulticastReplayRequestMessage *rep;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
+
+  rep->fragment_id = GNUNET_htonll (fragment_id);
+  rep->message_id = GNUNET_htonll (message_id);
+  rep->fragment_offset = GNUNET_htonll (fragment_offset);
+  rep->flags = GNUNET_htonll (flags);
+
+  GNUNET_MQ_send (mem->grp.mq, env);
 }
 
 
@@ -1051,21 +1244,17 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem,
  * @param flags
  *        Additional flags for the replay request.
  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
- * @param result_cb
- *        Function to call when the replayed message fragment arrives.
- * @param result_cls
- *        Closure for @a result_cb.
  *
  * @return Replay request handle.
  */
 struct GNUNET_MULTICAST_MemberReplayHandle *
 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
                                          uint64_t fragment_id,
-                                         uint64_t flags,
-                                         GNUNET_MULTICAST_ResultCallback result_cb,
-                                         void *result_cls)
+                                         uint64_t flags)
 {
   member_replay_request (mem, fragment_id, 0, 0, flags);
+  // FIXME: return something useful
+  return NULL;
 }
 
 
@@ -1084,10 +1273,6 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
  * @param flags
  *        Additional flags for the replay request.
  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
- * @param result_cb
- *        Function to call for each replayed message fragment.
- * @param result_cls
- *        Closure for @a result_cb.
  *
  * @return Replay request handle, NULL on error.
  */
@@ -1095,23 +1280,11 @@ struct GNUNET_MULTICAST_MemberReplayHandle *
 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
                                         uint64_t message_id,
                                         uint64_t fragment_offset,
-                                        uint64_t flags,
-                                        GNUNET_MULTICAST_ResultCallback result_cb,
-                                        void *result_cls)
+                                        uint64_t flags)
 {
   member_replay_request (mem, 0, message_id, fragment_offset, flags);
-}
-
-
-/**
- * Cancel a replay request.
- *
- * @param rh
- *        Request to cancel.
- */
-void
-GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh)
-{
+  // FIXME: return something useful
+  return NULL;
 }
 
 
@@ -1121,9 +1294,14 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
-  struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
+  struct GNUNET_MULTICAST_RequestHeader *req;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
+                               GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
+
   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -1133,24 +1311,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
          "MemberTransmitNotify() returned error or invalid message size. "
          "ret=%d, buf_size=%u\n", ret, buf_size);
     /* FIXME: handle error */
-    GNUNET_free (req);
+    GNUNET_MQ_discard (env);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
   {
     /* Transmission paused. */
-    GNUNET_free (req);
+    GNUNET_MQ_discard (env);
     return;
   }
 
-  req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
   req->header.size = htons (sizeof (*req) + buf_size);
   req->request_id = GNUNET_htonll (tmit->request_id);
   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*req) + buf_size;
 
-  GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+  GNUNET_MQ_send (grp->mq, env);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -1174,15 +1354,14 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
                                    void *notify_cls)
 {
-/* FIXME
   if (GNUNET_YES == mem->grp.in_transmit)
     return NULL;
   mem->grp.in_transmit = GNUNET_YES;
-*/
 
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
   tmit->member = mem;
   tmit->request_id = request_id;
+  tmit->fragment_offset = 0;
   tmit->notify = notify;
   tmit->notify_cls = notify_cls;
 
@@ -1200,6 +1379,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
 void
 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   member_to_origin (th->member);
 }
 
@@ -1213,6 +1395,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit
 void
 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  th->member->grp.in_transmit = GNUNET_NO;
 }