-fix (C) notices
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
index 35579fcfc230e7dbb21a572afe675ca9e86daeca..75a94bcb7f9d507ae1e9a0ee30685738f7fe60d6 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 GNUnet e.V.
  *
  * GNUnet is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
  *
  * You should have received a copy of the GNU General Public License
  * along with GNUnet; see the file COPYING.  If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
@@ -98,15 +98,14 @@ struct TransmitMessage
   uint16_t size;
 
   /**
-   * @see enum MessageState
+   * Type of first message part.
    */
-  uint8_t state;
+  uint16_t first_ptype;
 
   /**
-   * Whether a message ACK has already been sent to the client.
-   * #GNUNET_YES or #GNUNET_NO
+   * Type of last message part.
    */
-  uint8_t ack_sent;
+  uint16_t last_ptype;
 
   /* Followed by message */
 };
@@ -170,6 +169,11 @@ struct FragmentQueue
    */
   uint8_t state;
 
+  /**
+   * Whether the state is already modified in PSYCstore.
+   */
+  uint8_t state_is_modified;
+
   /**
    * Is the message queued for delivery to the client?
    * i.e. added to the recv_msgs queue
@@ -181,21 +185,37 @@ struct FragmentQueue
 /**
  * List of connected clients.
  */
-struct ClientListItem
+struct Client
 {
-  struct ClientListItem *prev;
-  struct ClientListItem *next;
+  struct Client *prev;
+  struct Client *next;
+
   struct GNUNET_SERVER_Client *client;
 };
 
 
+struct Operation
+{
+  struct Operation *prev;
+  struct Operation *next;
+
+  struct GNUNET_SERVER_Client *client;
+  struct Channel *chn;
+  uint64_t op_id;
+  uint32_t flags;
+};
+
+
 /**
  * Common part of the client context for both a channel master and slave.
  */
 struct Channel
 {
-  struct ClientListItem *clients_head;
-  struct ClientListItem *clients_tail;
+  struct Client *clients_head;
+  struct Client *clients_tail;
+
+  struct Operation *op_head;
+  struct Operation *op_tail;
 
   struct TransmitMessage *tmit_head;
   struct TransmitMessage *tmit_tail;
@@ -249,16 +269,6 @@ struct Channel
    */
   uint32_t tmit_mod_value_size;
 
-  /**
-   * @see enum MessageState
-   */
-  uint8_t tmit_state;
-
-  /**
-   * FIXME: needed?
-   */
-  uint8_t in_transmit;
-
   /**
    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
    */
@@ -305,7 +315,7 @@ struct Master
 
   /**
    * Incoming join requests from multicast.
-   * member_key -> struct GNUNET_MULTICAST_JoinHandle *
+   * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
    */
   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
 
@@ -399,17 +409,33 @@ struct Slave
    * Maximum request ID for this channel.
    */
   uint64_t max_request_id;
+
+  /**
+   * Join flags.
+   */
+  enum GNUNET_PSYC_SlaveJoinFlags join_flags;
 };
 
 
 static void
 transmit_message (struct Channel *chn);
 
+static uint64_t
+message_queue_run (struct Channel *chn);
 
 static uint64_t
 message_queue_drop (struct Channel *chn);
 
 
+static void
+schedule_transmit_message (void *cls,
+                           const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Channel *chn = cls;
+  transmit_message (chn);
+}
+
+
 /**
  * Task run during shutdown.
  *
@@ -432,6 +458,28 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 }
 
 
+static struct Operation *
+op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
+        uint64_t op_id, uint32_t flags)
+{
+  struct Operation *op = GNUNET_malloc (sizeof (*op));
+  op->client = client;
+  op->chn = chn;
+  op->op_id = op_id;
+  op->flags = flags;
+  GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
+  return op;
+}
+
+
+static void
+op_remove (struct Operation *op)
+{
+  GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
+  GNUNET_free (op);
+}
+
+
 /**
  * Clean up master data structures after a client disconnected.
  */
@@ -443,7 +491,7 @@ cleanup_master (struct Master *mst)
   if (NULL != mst->origin)
     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
-  GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
+  GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
 }
 
 
@@ -483,7 +531,7 @@ cleanup_slave (struct Slave *slv)
     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
     slv->member = NULL;
   }
-  GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
+  GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
 }
 
 
@@ -493,8 +541,12 @@ cleanup_slave (struct Slave *slv)
 static void
 cleanup_channel (struct Channel *chn)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Cleaning up channel %s. master? %u\n",
+              chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
   message_queue_drop (chn);
-  GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
+  GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
+  chn->recv_frags = NULL;
 
   if (NULL != chn->store_op)
   {
@@ -538,7 +590,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
               GNUNET_h2s (&chn->pub_key_hash));
 
-  struct ClientListItem *cli = chn->clients_head;
+  struct Client *cli = chn->clients_head;
   while (NULL != cli)
   {
     if (cli->client == client)
@@ -550,8 +602,24 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
     cli = cli->next;
   }
 
+  struct Operation *op = chn->op_head;
+  while (NULL != op)
+  {
+    if (op->client == client)
+    {
+      op->client = NULL;
+      break;
+    }
+    op = op->next;
+  }
+
   if (NULL == chn->clients_head)
   { /* Last client disconnected. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Last client (%s) disconnected from channel %s\n",
+                chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+                GNUNET_h2s (&chn->pub_key_hash));
+    chn->is_disconnected = GNUNET_YES;
     if (NULL != chn->tmit_head)
     { /* Send pending messages to multicast before cleanup. */
       transmit_message (chn);
@@ -571,10 +639,10 @@ static void
 client_send_msg (const struct Channel *chn,
                  const struct GNUNET_MessageHeader *msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Sending message to clients.\n", chn);
 
-  struct ClientListItem *cli = chn->clients_head;
+  struct Client *cli = chn->clients_head;
   while (NULL != cli)
   {
     GNUNET_SERVER_notification_context_add (nc, cli->client);
@@ -584,12 +652,52 @@ client_send_msg (const struct Channel *chn,
 }
 
 
+/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ *        Client that should receive the result code.
+ * @param result_code
+ *        Code to transmit.
+ * @param op_id
+ *        Operation ID in network byte order.
+ * @param data
+ *        Data payload or NULL.
+ * @param data_size
+ *        Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+                    int64_t result_code, const void *data, uint16_t data_size)
+{
+  struct GNUNET_OperationResultMessage *res;
+
+  res = GNUNET_malloc (sizeof (*res) + data_size);
+  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+  res->header.size = htons (sizeof (*res) + data_size);
+  res->result_code = GNUNET_htonll (result_code);
+  res->op_id = op_id;
+  if (0 < data_size)
+    memcpy (&res[1], data, data_size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "%p Sending result to client for operation #%" PRIu64 ": "
+              "%" PRId64 " (size: %u)\n",
+             client, GNUNET_ntohll (op_id), result_code, data_size);
+
+  GNUNET_SERVER_notification_context_add (nc, client);
+  GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+                                              GNUNET_NO);
+  GNUNET_free (res);
+}
+
+
 /**
  * Closure for join_mem_test_cb()
  */
 struct JoinMemTestClosure
 {
-  struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
+  struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
   struct Channel *chn;
   struct GNUNET_MULTICAST_JoinHandle *jh;
   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
@@ -600,22 +708,29 @@ struct JoinMemTestClosure
  * Membership test result callback used for join requests.
  */
 static void
-join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
+join_mem_test_cb (void *cls, int64_t result,
+                  const char *err_msg, uint16_t err_msg_size)
 {
   struct JoinMemTestClosure *jcls = cls;
 
   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
   { /* Pass on join request to client if this is a master channel */
     struct Master *mst = (struct Master *) jcls->chn;
-    struct GNUNET_HashCode slave_key_hash;
-    GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
-                        &slave_key_hash);
-    GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
+    struct GNUNET_HashCode slave_pub_hash;
+    GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
+                        &slave_pub_hash);
+    GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     client_send_msg (jcls->chn, &jcls->join_msg->header);
   }
   else
   {
+    if (GNUNET_SYSERR == result)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Could not perform membership test (%.*s)\n",
+                  err_msg_size, err_msg);
+    }
     // FIXME: add relays
     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
   }
@@ -629,7 +744,7 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
  */
 static void
 mcast_recv_join_request (void *cls,
-                         const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+                         const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
                          const struct GNUNET_MessageHeader *join_msg,
                          struct GNUNET_MULTICAST_JoinHandle *jh)
 {
@@ -655,17 +770,17 @@ mcast_recv_join_request (void *cls,
     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
   req->header.size = htons (sizeof (*req) + join_msg_size);
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
-  req->slave_key = *slave_key;
+  req->slave_pub_key = *slave_pub_key;
   if (0 < join_msg_size)
     memcpy (&req[1], join_msg, join_msg_size);
 
   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
-  jcls->slave_key = *slave_key;
+  jcls->slave_pub_key = *slave_pub_key;
   jcls->chn = chn;
   jcls->jh = jh;
   jcls->join_msg = req;
 
-  GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
+  GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
                                     chn->max_message_id, 0,
                                     &join_mem_test_cb, jcls);
 }
@@ -676,15 +791,20 @@ mcast_recv_join_request (void *cls,
  */
 static void
 mcast_recv_join_decision (void *cls, int is_admitted,
-                        const struct GNUNET_PeerIdentity *peer,
-                        uint16_t relay_count,
-                        const struct GNUNET_PeerIdentity *relays,
-                        const struct GNUNET_MessageHeader *join_resp)
+                          const struct GNUNET_PeerIdentity *peer,
+                          uint16_t relay_count,
+                          const struct GNUNET_PeerIdentity *relays,
+                          const struct GNUNET_MessageHeader *join_resp)
 {
   struct Slave *slv = cls;
   struct Channel *chn = &slv->chn;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Got join decision: %d\n", slv, is_admitted);
+  if (GNUNET_YES == chn->is_ready)
+  {
+    /* Already admitted */
+    return;
+  }
 
   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
   struct GNUNET_PSYC_JoinDecisionMessage *
@@ -697,47 +817,96 @@ mcast_recv_join_decision (void *cls, int is_admitted,
 
   client_send_msg (chn, &dcsn->header);
 
-  if (GNUNET_YES == is_admitted)
+  if (GNUNET_YES == is_admitted
+      && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
   {
     chn->is_ready = GNUNET_YES;
   }
-  else
-  {
-    slv->member = NULL;
-  }
 }
 
 
+static int
+store_recv_fragment_replay (void *cls,
+                            struct GNUNET_MULTICAST_MessageHeader *msg,
+                            enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+  struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
+
+  GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
+ */
 static void
-mcast_recv_membership_test (void *cls,
-                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
-                            uint64_t message_id, uint64_t group_generation,
-                            struct GNUNET_MULTICAST_MembershipTestHandle *mth)
+store_recv_fragment_replay_result (void *cls, int64_t result,
+                                   const char *err_msg, uint16_t err_msg_size)
 {
+  struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
+              rh, result, err_msg_size, err_msg);
+
+  switch (result)
+  {
+  case GNUNET_YES:
+    break;
 
+  case GNUNET_NO:
+    GNUNET_MULTICAST_replay_response (rh, NULL,
+                                      GNUNET_MULTICAST_REC_NOT_FOUND);
+    break;
+
+  case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
+    GNUNET_MULTICAST_replay_response (rh, NULL,
+                                      GNUNET_MULTICAST_REC_ACCESS_DENIED);
+    break;
+
+  case GNUNET_SYSERR:
+    GNUNET_MULTICAST_replay_response (rh, NULL,
+                                      GNUNET_MULTICAST_REC_INTERNAL_ERROR);
+    break;
+  }
+  GNUNET_MULTICAST_replay_response_end (rh);
 }
 
 
+/**
+ * Incoming fragment replay request from multicast.
+ */
 static void
 mcast_recv_replay_fragment (void *cls,
-                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
                             uint64_t fragment_id, uint64_t flags,
                             struct GNUNET_MULTICAST_ReplayHandle *rh)
 
 {
-
+  struct Channel *chn = cls;
+  GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
+                                 fragment_id, fragment_id,
+                                 &store_recv_fragment_replay,
+                                 &store_recv_fragment_replay_result, rh);
 }
 
 
+/**
+ * Incoming message replay request from multicast.
+ */
 static void
 mcast_recv_replay_message (void *cls,
-                           const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+                           const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
                            uint64_t message_id,
                            uint64_t fragment_offset,
                            uint64_t flags,
                            struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-
+  struct Channel *chn = cls;
+  GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
+                                message_id, message_id, 1, NULL,
+                                &store_recv_fragment_replay,
+                                &store_recv_fragment_replay_result, rh);
 }
 
 
@@ -779,29 +948,57 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
 
 
 /**
- * Send multicast message to all clients connected to the channel.
+ * Initialize PSYC message header.
  */
-static void
-client_send_mcast_msg (struct Channel *chn,
-                       const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+static inline void
+psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
+               const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
+{
+  uint16_t size = ntohs (mmsg->header.size);
+  uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+  pmsg->header.size = htons (psize);
+  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  pmsg->message_id = mmsg->message_id;
+  pmsg->fragment_offset = mmsg->fragment_offset;
+  pmsg->flags = htonl (flags);
+
+  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+}
+
+
+/**
+ * Create a new PSYC message from a multicast message for sending it to clients.
+ */
+static inline struct GNUNET_PSYC_MessageHeader *
+psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
 {
   struct GNUNET_PSYC_MessageHeader *pmsg;
   uint16_t size = ntohs (mmsg->header.size);
   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
 
+  pmsg = GNUNET_malloc (psize);
+  psyc_msg_init (pmsg, mmsg, flags);
+  return pmsg;
+}
+
+
+/**
+ * Send multicast message to all clients connected to the channel.
+ */
+static void
+client_send_mcast_msg (struct Channel *chn,
+                       const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+                       uint32_t flags)
+{
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Sending multicast message to client. "
               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
               chn, GNUNET_ntohll (mmsg->fragment_id),
               GNUNET_ntohll (mmsg->message_id));
 
-  pmsg = GNUNET_malloc (psize);
-  pmsg->header.size = htons (psize);
-  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-  pmsg->message_id = mmsg->message_id;
-  pmsg->fragment_offset = mmsg->fragment_offset;
-
-  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+  struct GNUNET_PSYC_MessageHeader *
+    pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
   client_send_msg (chn, &pmsg->header);
   GNUNET_free (pmsg);
 }
@@ -832,6 +1029,7 @@ client_send_mcast_req (struct Master *mst,
   pmsg->message_id = req->request_id;
   pmsg->fragment_offset = req->fragment_offset;
   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+  pmsg->slave_pub_key = req->member_pub_key;
 
   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
   client_send_msg (chn, &pmsg->header);
@@ -842,7 +1040,7 @@ client_send_mcast_req (struct Master *mst,
 /**
  * Insert a multicast message fragment into the queue belonging to the message.
  *
- * @param chn           Channel.
+ * @param chn          Channel.
  * @param mmsg         Multicast message fragment.
  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
  * @param first_ptype  First PSYC message part type in @a mmsg.
@@ -946,8 +1144,8 @@ fragment_queue_insert (struct Channel *chn,
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Header of message %" PRIu64 " is NOT complete yet: "
                   "%" PRIu64 " != %" PRIu64 "\n",
-                  chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
-                  fragq->header_size);
+                  chn, GNUNET_ntohll (mmsg->message_id),
+                  frag_offset, fragq->header_size);
     }
   }
 
@@ -960,8 +1158,8 @@ fragment_queue_insert (struct Channel *chn,
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Message %" PRIu64 " is NOT complete yet: "
                   "%" PRIu64 " != %" PRIu64 "\n",
-                  chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
-                  fragq->size);
+                  chn, GNUNET_ntohll (mmsg->message_id),
+                  frag_offset, fragq->size);
     break;
 
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
@@ -997,11 +1195,15 @@ fragment_queue_insert (struct Channel *chn,
  * Send fragments of a message in order to client, after all modifiers arrived
  * from multicast.
  *
- * @param chn      Channel.
- * @param msg_id  ID of the message @a fragq belongs to.
- * @param fragq   Fragment queue of the message.
- * @param drop    Drop message without delivering to client?
- *                #GNUNET_YES or #GNUNET_NO.
+ * @param chn
+ *        Channel.
+ * @param msg_id
+ *        ID of the message @a fragq belongs to.
+ * @param fragq
+ *        Fragment queue of the message.
+ * @param drop
+ *        Drop message without delivering to client?
+ *        #GNUNET_YES or #GNUNET_NO.
  */
 static void
 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
@@ -1029,7 +1231,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
     {
       if (GNUNET_NO == drop)
       {
-        client_send_mcast_msg (chn, cache_entry->mmsg);
+        client_send_mcast_msg (chn, cache_entry->mmsg, 0);
       }
       if (cache_entry->ref_count <= 1)
       {
@@ -1069,6 +1271,55 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
 }
 
 
+struct StateModifyClosure
+{
+  struct Channel *chn;
+  uint64_t msg_id;
+  struct GNUNET_HashCode msg_id_hash;
+};
+
+
+void
+store_recv_state_modify_result (void *cls, int64_t result,
+                                const char *err_msg, uint16_t err_msg_size)
+{
+  struct StateModifyClosure *mcls = cls;
+  struct Channel *chn = mcls->chn;
+  uint64_t msg_id = mcls->msg_id;
+
+  struct FragmentQueue *
+    fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
+              chn, result, err_msg_size, err_msg);
+
+  switch (result)
+  {
+  case GNUNET_OK:
+  case GNUNET_NO:
+    if (NULL != fragq)
+      fragq->state_is_modified = GNUNET_YES;
+    if (chn->max_state_message_id < msg_id)
+      chn->max_state_message_id = msg_id;
+    if (chn->max_message_id < msg_id)
+      chn->max_message_id = msg_id;
+
+    if (NULL != fragq)
+      fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
+    GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
+    message_queue_run (chn);
+    break;
+
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
+                chn, result, err_msg_size, err_msg);
+    /** @todo FIXME: handle state_modify error */
+  }
+}
+
+
 /**
  * Run message queue.
  *
@@ -1089,6 +1340,7 @@ message_queue_run (struct Channel *chn)
               "%p Running message queue.\n", chn);
   uint64_t n = 0;
   uint64_t msg_id;
+
   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
                                                     &msg_id))
   {
@@ -1108,37 +1360,58 @@ message_queue_run (struct Channel *chn)
       break;
     }
 
-    if (MSG_FRAG_STATE_HEADER == fragq->state)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Fragment queue entry:  state: %u, state delta: "
+                "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
+                chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
+
+    if (MSG_FRAG_STATE_DATA <= fragq->state)
     {
       /* Check if there's a missing message before the current one */
       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
       {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
+
         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
-            && msg_id - 1 != chn->max_message_id)
+            && (chn->max_message_id != msg_id - 1
+                && chn->max_message_id != msg_id))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                       "%p Out of order message. "
-                      "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
-                      chn, msg_id, chn->max_message_id);
+                      "(%" PRIu64 " != %" PRIu64 " - 1)\n",
+                      chn, chn->max_message_id, msg_id);
           break;
+          // FIXME: keep track of messages processed in this queue run,
+          //        and only stop after reaching the end
         }
       }
       else
       {
-        if (msg_id - fragq->state_delta != chn->max_state_message_id)
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
+        if (GNUNET_YES != fragq->state_is_modified)
         {
-          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                      "%p Out of order stateful message. "
-                      "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
-                      chn, msg_id, fragq->state_delta, chn->max_state_message_id);
-          break;
+          if (msg_id - fragq->state_delta != chn->max_state_message_id)
+          {
+            GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                        "%p Out of order stateful message. "
+                        "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
+                        chn, msg_id, fragq->state_delta, chn->max_state_message_id);
+            break;
+            // FIXME: keep track of messages processed in this queue run,
+            //        and only stop after reaching the end
+          }
+
+          struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+          mcls->chn = chn;
+          mcls->msg_id = msg_id;
+          mcls->msg_id_hash = msg_id_hash;
+
+          /* Apply modifiers to state in PSYCstore */
+          GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
+                                         fragq->state_delta,
+                                         store_recv_state_modify_result, mcls);
+          break; // continue after asynchronous state modify result
         }
-#if TODO
-        /* FIXME: apply modifiers to state in PSYCstore */
-        GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
-                                       store_recv_state_modify_result, cls);
-#endif
-        chn->max_state_message_id = msg_id;
       }
       chn->max_message_id = msg_id;
     }
@@ -1146,6 +1419,7 @@ message_queue_run (struct Channel *chn)
     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
     n++;
   }
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
   return n;
@@ -1178,7 +1452,7 @@ message_queue_drop (struct Channel *chn)
 
     struct FragmentQueue *
       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
-
+    GNUNET_assert (NULL != fragq);
     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
     n++;
@@ -1190,15 +1464,16 @@ message_queue_drop (struct Channel *chn)
 
 
 /**
- * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
+ * Received result of GNUNET_PSYCSTORE_fragment_store().
  */
 static void
-store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
+store_recv_fragment_store_result (void *cls, int64_t result,
+                                  const char *err_msg, uint16_t err_msg_size)
 {
   struct Channel *chn = cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
-              chn, result, err_msg);
+              "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
+              chn, result, err_msg_size, err_msg);
 }
 
 
@@ -1214,17 +1489,26 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg
   uint16_t size = ntohs (mmsg->header.size);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received multicast message of size %u.\n",
-              chn, size);
+              "%p Received multicast message of size %u. "
+              "fragment_id=%" PRIu64 ", message_id=%" PRIu64
+              ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
+              chn, size,
+              GNUNET_ntohll (mmsg->fragment_id),
+              GNUNET_ntohll (mmsg->message_id),
+              GNUNET_ntohll (mmsg->fragment_offset),
+              GNUNET_ntohll (mmsg->flags));
 
   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
                                    &store_recv_fragment_store_result, chn);
 
   uint16_t first_ptype = 0, last_ptype = 0;
-  if (GNUNET_SYSERR
-      == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
-                                          (const char *) &mmsg[1],
-                                          &first_ptype, &last_ptype))
+  int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
+                                               (const char *) &mmsg[1],
+                                               &first_ptype, &last_ptype);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Message check result %d, first part type %u, last part type %u\n",
+              chn, check, first_ptype, last_ptype);
+  if (GNUNET_SYSERR == check)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "%p Dropping incoming multicast message with invalid parts.\n",
@@ -1233,10 +1517,6 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg
     return;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Message parts: first: type %u, last: type %u\n",
-              first_ptype, last_ptype);
-
   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
   message_queue_run (chn);
 }
@@ -1255,9 +1535,11 @@ mcast_recv_request (void *cls,
   struct Master *mst = cls;
   uint16_t size = ntohs (req->header.size);
 
+  char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received multicast request of size %u.\n",
-              mst, size);
+              "%p Received multicast request of size %u from %s.\n",
+              mst, size, str);
+  GNUNET_free (str);
 
   uint16_t first_ptype = 0, last_ptype = 0;
   if (GNUNET_SYSERR
@@ -1293,7 +1575,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
   struct Channel *chn = &mst->chn;
   chn->store_op = NULL;
 
-  struct CountersResult res;
+  struct GNUNET_PSYC_CountersResultMessage res;
   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
   res.header.size = htons (sizeof (res));
   res.result_code = htonl (result);
@@ -1307,12 +1589,11 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
     mst->max_group_generation = max_group_generation;
     mst->origin
       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
-                                       &mcast_recv_join_request,
-                                       &mcast_recv_membership_test,
-                                       &mcast_recv_replay_fragment,
-                                       &mcast_recv_replay_message,
-                                       &mcast_recv_request,
-                                       &mcast_recv_message, chn);
+                                       mcast_recv_join_request,
+                                       mcast_recv_replay_fragment,
+                                       mcast_recv_replay_message,
+                                       mcast_recv_request,
+                                       mcast_recv_message, chn);
     chn->is_ready = GNUNET_YES;
   }
   else
@@ -1339,7 +1620,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
   struct Channel *chn = &slv->chn;
   chn->store_op = NULL;
 
-  struct CountersResult res;
+  struct GNUNET_PSYC_CountersResultMessage res;
   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
   res.header.size = htons (sizeof (res));
   res.result_code = htonl (result);
@@ -1354,12 +1635,11 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
                                       &slv->origin,
                                       slv->relay_count, slv->relays,
                                       &slv->join_msg->header,
-                                      &mcast_recv_join_request,
-                                      &mcast_recv_join_decision,
-                                      &mcast_recv_membership_test,
-                                      &mcast_recv_replay_fragment,
-                                      &mcast_recv_replay_message,
-                                      &mcast_recv_message, chn);
+                                      mcast_recv_join_request,
+                                      mcast_recv_join_decision,
+                                      mcast_recv_replay_fragment,
+                                      mcast_recv_replay_message,
+                                      mcast_recv_message, chn);
     if (NULL != slv->join_msg)
     {
       GNUNET_free (slv->join_msg);
@@ -1429,7 +1709,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
   {
     chn = &mst->chn;
 
-    struct CountersResult res;
+    struct GNUNET_PSYC_CountersResultMessage res;
     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
     res.header.size = htons (sizeof (res));
     res.result_code = htonl (GNUNET_OK);
@@ -1444,7 +1724,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
               "%p Client connected as master to channel %s.\n",
               mst, GNUNET_h2s (&chn->pub_key_hash));
 
-  struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+  struct Client *cli = GNUNET_new (struct Client);
   cli->client = client;
   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
 
@@ -1465,11 +1745,11 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
   uint16_t req_size = ntohs (req->header.size);
 
   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
-  struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
+  struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
 
   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
-  GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
-  GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
+  GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
+  GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
 
   struct GNUNET_CONTAINER_MultiHashMap *
     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
@@ -1478,16 +1758,17 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
 
   if (NULL != chn_slv)
   {
-    slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
+    slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
   }
   if (NULL == slv)
   {
     slv = GNUNET_new (struct Slave);
     slv->priv_key = req->slave_key;
     slv->pub_key = slv_pub_key;
-    slv->pub_key_hash = slv_pub_key_hash;
+    slv->pub_key_hash = slv_pub_hash;
     slv->origin = req->origin;
     slv->relay_count = ntohl (req->relay_count);
+    slv->join_flags = ntohl (req->flags);
 
     const struct GNUNET_PeerIdentity *
       relays = (const struct GNUNET_PeerIdentity *) &req[1];
@@ -1497,9 +1778,11 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
         <= req_size)
     {
-      join_msg_size = ntohs (slv->join_msg->header.size);
+      struct GNUNET_PSYC_Message *
+        join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
+      join_msg_size = ntohs (join_msg->header.size);
       slv->join_msg = GNUNET_malloc (join_msg_size);
-      memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
+      memcpy (slv->join_msg, join_msg, join_msg_size);
     }
     if (sizeof (*req) + relay_size + join_msg_size != req_size)
     {
@@ -1508,6 +1791,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
                   sizeof (*req), relay_size, join_msg_size, req_size);
       GNUNET_break (0);
       GNUNET_SERVER_client_disconnect (client);
+      GNUNET_free (slv);
       return;
     }
     if (0 < slv->relay_count)
@@ -1518,7 +1802,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
 
     chn = &slv->chn;
     chn->is_master = GNUNET_NO;
-    chn->pub_key = req->channel_key;
+    chn->pub_key = req->channel_pub_key;
     chn->pub_key_hash = pub_key_hash;
     channel_init (chn);
 
@@ -1526,7 +1810,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
     {
       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
-                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
     }
     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
@@ -1539,7 +1823,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
   {
     chn = &slv->chn;
 
-    struct CountersResult res;
+    struct GNUNET_PSYC_CountersResultMessage res;
     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
     res.header.size = htons (sizeof (res));
     res.result_code = htonl (GNUNET_OK);
@@ -1549,7 +1833,12 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
                                                 GNUNET_NO);
 
-    if (NULL == slv->member)
+    if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
+    {
+      mcast_recv_join_decision (slv, GNUNET_YES,
+                                NULL, 0, NULL, NULL);
+    }
+    else if (NULL == slv->member)
     {
       slv->member
         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
@@ -1558,7 +1847,6 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
                                         &slv->join_msg->header,
                                         &mcast_recv_join_request,
                                         &mcast_recv_join_decision,
-                                        &mcast_recv_membership_test,
                                         &mcast_recv_replay_fragment,
                                         &mcast_recv_replay_message,
                                         &mcast_recv_message, chn);
@@ -1581,7 +1869,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
               "%p Client connected as slave to channel %s.\n",
               slv, GNUNET_h2s (&chn->pub_key_hash));
 
-  struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+  struct Client *cli = GNUNET_new (struct Client);
   cli->client = client;
   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
 
@@ -1619,34 +1907,41 @@ static void
 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
                            const struct GNUNET_MessageHeader *msg)
 {
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (GNUNET_YES == chn->is_master);
-  struct Master *mst = (struct Master *) chn;
-
-  struct GNUNET_PSYC_JoinDecisionMessage *
-    dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
+  const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
+    = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
+  struct Channel *chn;
+  struct Master *mst;
   struct JoinDecisionClosure jcls;
+
+  chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_assert (GNUNET_YES == chn->is_master);
+  mst = (struct Master *) chn;
   jcls.is_admitted = ntohl (dcsn->is_admitted);
   jcls.msg
     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
     ? (struct GNUNET_MessageHeader *) &dcsn[1]
     : NULL;
 
-  struct GNUNET_HashCode slave_key_hash;
-  GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
-                      &slave_key_hash);
+  struct GNUNET_HashCode slave_pub_hash;
+  GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
+                      &slave_pub_hash);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Got join decision (%d) from client for channel %s..\n",
               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p ..and slave %s.\n",
-              mst, GNUNET_h2s (&slave_key_hash));
+              mst, GNUNET_h2s (&slave_pub_hash));
 
-  GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
+  GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
                                               &mcast_send_join_decision, &jcls);
-  GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
+  GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -1684,6 +1979,8 @@ transmit_notify (void *cls, size_t *data_size, void *data)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "%p transmit_notify: nothing to send.\n", chn);
+    if (NULL != tmit_msg && *data_size < tmit_msg->size)
+      GNUNET_break (0);
     *data_size = 0;
     return GNUNET_NO;
   }
@@ -1694,9 +1991,13 @@ transmit_notify (void *cls, size_t *data_size, void *data)
   *data_size = tmit_msg->size;
   memcpy (data, &tmit_msg[1], *data_size);
 
-  int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
+  int ret
+    = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+    ? GNUNET_NO
+    : GNUNET_YES;
 
-  if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
+  /* FIXME: handle disconnecting clients */
+  if (NULL != tmit_msg->client)
     send_message_ack (chn, tmit_msg->client);
 
   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
@@ -1704,12 +2005,13 @@ transmit_notify (void *cls, size_t *data_size, void *data)
 
   if (NULL != chn->tmit_head)
   {
-    transmit_message (chn);
+    GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
   }
-  else if (GNUNET_YES == chn->is_disconnected)
+  else if (GNUNET_YES == chn->is_disconnected
+           && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
   {
     /* FIXME: handle partial message (when still in_transmit) */
-    cleanup_channel (chn);
+    return GNUNET_SYSERR;
   }
   return ret;
 }
@@ -1755,10 +2057,12 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data)
 static void
 master_transmit_message (struct Master *mst)
 {
+  if (NULL == mst->chn.tmit_head)
+    return;
   if (NULL == mst->tmit_handle)
   {
     mst->tmit_handle
-      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
                                         mst->max_group_generation,
                                         master_transmit_notify, mst);
   }
@@ -1775,10 +2079,12 @@ master_transmit_message (struct Master *mst)
 static void
 slave_transmit_message (struct Slave *slv)
 {
+  if (NULL == slv->chn.tmit_head)
+    return;
   if (NULL == slv->tmit_handle)
   {
     slv->tmit_handle
-      = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+      = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
                                            slave_transmit_notify, slv);
   }
   else
@@ -1801,14 +2107,16 @@ transmit_message (struct Channel *chn)
  * Queue a message from a channel master for sending to the multicast group.
  */
 static void
-master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
-                     uint16_t first_ptype, uint16_t last_ptype)
+master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
 
-  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
   {
     tmit_msg->id = ++mst->max_message_id;
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p master_queue_message: message_id=%" PRIu64 "\n",
+                mst, tmit_msg->id);
     struct GNUNET_PSYC_MessageMethod *pmeth
       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
 
@@ -1818,13 +2126,24 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
     }
     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
     {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p master_queue_message: state_delta=%" PRIu64 "\n",
+                  mst, tmit_msg->id - mst->max_state_message_id);
       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
                                           - mst->max_state_message_id);
+      mst->max_state_message_id = tmit_msg->id;
     }
     else
     {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "%p master_queue_message: state not modified\n", mst);
       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
     }
+
+    if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
+    {
+      /// @todo add state_hash to PSYC header
+    }
   }
 }
 
@@ -1833,10 +2152,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
  * Queue a message from a channel slave for sending to the multicast group.
  */
 static void
-slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
-                     uint16_t first_ptype, uint16_t last_ptype)
+slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
 {
-  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
   {
     struct GNUNET_PSYC_MessageMethod *pmeth
       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
@@ -1868,17 +2186,16 @@ queue_message (struct Channel *chn,
   memcpy (&tmit_msg[1], data, data_size);
   tmit_msg->client = client;
   tmit_msg->size = data_size;
-  tmit_msg->state = chn->tmit_state;
+  tmit_msg->first_ptype = first_ptype;
+  tmit_msg->last_ptype = last_ptype;
 
   /* FIXME: separate queue per message ID */
 
   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
 
   chn->is_master
-    ? master_queue_message ((struct Master *) chn, tmit_msg,
-                            first_ptype, last_ptype)
-    : slave_queue_message ((struct Slave *) chn, tmit_msg,
-                           first_ptype, last_ptype);
+    ? master_queue_message ((struct Master *) chn, tmit_msg)
+    : slave_queue_message ((struct Slave *) chn, tmit_msg);
   return tmit_msg;
 }
 
@@ -1932,7 +2249,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
   uint16_t size = ntohs (msg->size);
   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p Message payload too large: %u < %u.\n",
+                chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
     GNUNET_break (0);
     transmit_cancel (chn, client);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -1966,35 +2285,241 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
 
 
 /**
- * Client requests to add a slave to the membership database.
+ * Received result of GNUNET_PSYCSTORE_membership_store()
  */
 static void
-client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
-                       const struct GNUNET_MessageHeader *msg)
+store_recv_membership_store_result (void *cls, int64_t result,
+                                    const char *err_msg, uint16_t err_msg_size)
 {
+  struct Operation *op = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
+              op->chn, result, err_msg_size, err_msg);
 
+  if (NULL != op->client)
+    client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+  op_remove (op);
 }
 
 
 /**
- * Client requests to remove a slave from the membership database.
+ * Client requests to add/remove a slave in the membership database.
  */
 static void
-client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *msg)
+client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
+                              const struct GNUNET_MessageHeader *msg)
 {
+  struct Channel *
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_assert (NULL != chn);
+
+  const struct ChannelMembershipStoreRequest *
+    req = (const struct ChannelMembershipStoreRequest *) msg;
 
+  struct Operation *op = op_add (chn, client, req->op_id, 0);
+
+  uint64_t announced_at = GNUNET_ntohll (req->announced_at);
+  uint64_t effective_since = GNUNET_ntohll (req->effective_since);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received membership store request from client.\n", chn);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
+              chn, req->did_join, announced_at, effective_since);
+
+  GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
+                                     req->did_join, announced_at, effective_since,
+                                     0, /* FIXME: group_generation */
+                                     &store_recv_membership_store_result, op);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 /**
- * Client requests channel history from PSYCstore.
+ * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
+ */
+static int
+store_recv_fragment_history (void *cls,
+                             struct GNUNET_MULTICAST_MessageHeader *mmsg,
+                             enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+  struct Operation *op = cls;
+  if (NULL == op->client)
+  { /* Requesting client already disconnected. */
+    return GNUNET_NO;
+  }
+  struct Channel *chn = op->chn;
+
+  struct GNUNET_PSYC_MessageHeader *pmsg;
+  uint16_t msize = ntohs (mmsg->header.size);
+  uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
+
+  struct GNUNET_OperationResultMessage *
+    res = GNUNET_malloc (sizeof (*res) + psize);
+  res->header.size = htons (sizeof (*res) + psize);
+  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+  res->op_id = op->op_id;
+  res->result_code = GNUNET_htonll (GNUNET_OK);
+
+  pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
+  GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
+  memcpy (&res[1], pmsg, psize);
+
+  /** @todo FIXME: send only to requesting client */
+  client_send_msg (chn, &res->header);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Received the result of GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
  */
 static void
-client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
-                           const struct GNUNET_MessageHeader *msg)
+store_recv_fragment_history_result (void *cls, int64_t result,
+                                    const char *err_msg, uint16_t err_msg_size)
 {
+  struct Operation *op = cls;
+  if (NULL == op->client)
+  { /* Requesting client already disconnected. */
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p History replay #%" PRIu64 ": "
+              "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+              op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
+
+  if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
+  {
+    /** @todo Multicast replay request for messages not found locally. */
+  }
 
+  client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+  op_remove (op);
+}
+
+
+/**
+ * Client requests channel history.
+ */
+static void
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+                            const struct GNUNET_MessageHeader *msg)
+{
+  struct Channel *
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_assert (NULL != chn);
+
+  const struct GNUNET_PSYC_HistoryRequestMessage *
+    req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+  uint16_t size = ntohs (msg->size);
+  const char *method_prefix = (const char *) &req[1];
+
+  if (size < sizeof (*req) + 1
+      || '\0' != method_prefix[size - sizeof (*req) - 1])
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p History replay #%" PRIu64 ": "
+                "invalid method prefix. size: %u < %u?\n",
+                chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
+
+  if (0 == req->message_limit)
+    GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
+                                  GNUNET_ntohll (req->start_message_id),
+                                  GNUNET_ntohll (req->end_message_id),
+                                  0, method_prefix,
+                                  &store_recv_fragment_history,
+                                  &store_recv_fragment_history_result, op);
+  else
+    GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
+                                         GNUNET_ntohll (req->message_limit),
+                                         method_prefix,
+                                         &store_recv_fragment_history,
+                                         &store_recv_fragment_history_result,
+                                         op);
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Received state var from PSYCstore, send it to client.
+ */
+static int
+store_recv_state_var (void *cls, const char *name,
+                      const void *value, uint32_t value_size)
+{
+  struct Operation *op = cls;
+  struct GNUNET_OperationResultMessage *res;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
+              op->chn, GNUNET_ntohll (op->op_id), name);
+
+  if (NULL != name) /* First part */
+  {
+    uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
+    struct GNUNET_PSYC_MessageModifier *mod;
+    res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
+    res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
+    res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+    res->op_id = op->op_id;
+
+    mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
+    mod->header.size = htons (sizeof (*mod) + name_size + value_size);
+    mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+    mod->name_size = htons (name_size);
+    mod->value_size = htonl (value_size);
+    mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
+    memcpy (&mod[1], name, name_size);
+    memcpy (((char *) &mod[1]) + name_size, value, value_size);
+  }
+  else /* Continuation */
+  {
+    struct GNUNET_MessageHeader *mod;
+    res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
+    res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
+    res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+    res->op_id = op->op_id;
+
+    mod = (struct GNUNET_MessageHeader *) &res[1];
+    mod->size = htons (sizeof (*mod) + value_size);
+    mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+    memcpy (&mod[1], value, value_size);
+  }
+
+  // FIXME: client might have been disconnected
+  GNUNET_SERVER_notification_context_add (nc, op->client);
+  GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
+                                              GNUNET_NO);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_state_get()
+ * or GNUNET_PSYCSTORE_state_get_prefix()
+ */
+static void
+store_recv_state_result (void *cls, int64_t result,
+                         const char *err_msg, uint16_t err_msg_size)
+{
+  struct Operation *op = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p state_get #%" PRIu64 ": "
+              "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+              op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
+
+  // FIXME: client might have been disconnected
+  client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+  op_remove (op);
 }
 
 
@@ -2005,7 +2530,27 @@ static void
 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
                        const struct GNUNET_MessageHeader *msg)
 {
+  struct Channel *
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_assert (NULL != chn);
+
+  const struct StateRequest *
+    req = (const struct StateRequest *) msg;
 
+  uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+  const char *name = (const char *) &req[1];
+  if (0 == name_size || '\0' != name[name_size - 1])
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  struct Operation *op = op_add (chn, client, req->op_id, 0);
+  GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
+                              &store_recv_state_var,
+                              &store_recv_state_result, op);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
@@ -2016,10 +2561,59 @@ static void
 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
                               const struct GNUNET_MessageHeader *msg)
 {
+  struct Channel *
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_assert (NULL != chn);
+
+  const struct StateRequest *
+    req = (const struct StateRequest *) msg;
+
+  uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+  const char *name = (const char *) &req[1];
+  if (0 == name_size || '\0' != name[name_size - 1])
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
 
+  struct Operation *op = op_add (chn, client, req->op_id, 0);
+  GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
+                                     &store_recv_state_var,
+                                     &store_recv_state_result, op);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
+static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+  { &client_recv_master_start, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
+
+  { &client_recv_slave_join, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
+
+  { &client_recv_join_decision, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+
+  { &client_recv_psyc_message, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+  { &client_recv_membership_store, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
+
+  { &client_recv_history_replay, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+
+  { &client_recv_state_get, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+  { &client_recv_state_get_prefix, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+
+  { NULL, NULL, 0, 0 }
+};
+
+
 /**
  * Initialize the PSYC service.
  *
@@ -2031,46 +2625,15 @@ static void
 run (void *cls, struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    { &client_recv_master_start, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
-
-    { &client_recv_slave_join, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
-
-    { &client_recv_join_decision, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
-    { &client_recv_psyc_message, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
-
-    { &client_recv_slave_add, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
-
-    { &client_recv_slave_remove, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
-
-    { &client_recv_story_request, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
-
-    { &client_recv_state_get, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
-
-    { &client_recv_state_get_prefix, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
-
-    { NULL, NULL, 0, 0 }
-  };
-
   cfg = c;
   store = GNUNET_PSYCSTORE_connect (cfg);
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-  recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
-  GNUNET_SERVER_add_handlers (server, handlers);
+  GNUNET_SERVER_add_handlers (server, server_handlers);
   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                 &shutdown_task, NULL);