adding extended proxy support for http(s) client
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
index d3f203ebf9d3cadba989eaaad8a593428d7ba124..fb3aa65f7cda91a09d19f949b3d8d069228676ac 100644 (file)
@@ -24,6 +24,8 @@
  * @author Gabor X Toth
  */
 
+#include <inttypes.h>
+
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
@@ -56,10 +58,12 @@ static struct GNUNET_SERVER_NotificationContext *nc;
 static struct GNUNET_PSYCSTORE_Handle *store;
 
 /**
- * channel's pub_key_hash -> struct Channel
+ * All connected masters and slaves.
+ * Channel's pub_key_hash -> struct Channel
  */
 static struct GNUNET_CONTAINER_MultiHashMap *clients;
 
+
 /**
  * Message in the transmission queue.
  */
@@ -68,11 +72,61 @@ struct TransmitMessage
   struct TransmitMessage *prev;
   struct TransmitMessage *next;
 
+  /**
+   * Buffer with message to be transmitted.
+   */
   char *buf;
-  uint16_t size;
-  uint8_t status;
+
+  /**
+   * Size of @a buf
+   */
+  uint16_t size
+;
+  /**
+   * @see enum MessageState
+   */
+  uint8_t state;
 };
 
+
+/**
+ * Cache for received message fragments.
+ * Message fragments are only sent to clients after all modifiers arrived.
+ *
+ * chan_key -> MultiHashMap chan_msgs
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
+
+
+/**
+ * Entry in the chan_msgs hashmap of @a recv_cache:
+ * fragment_id -> FragmentEntry
+ */
+struct FragmentEntry
+{
+  struct GNUNET_MULTICAST_MessageHeader *mmsg;
+  uint16_t ref_count;
+};
+
+
+/**
+ * Entry in the @a recv_msgs hash map of a @a Channel.
+ * message_id -> FragmentCache
+ */
+struct FragmentCache
+{
+  /**
+   * Total size of header fragments (METHOD & MODIFIERs)
+   */
+  uint64_t header_size;
+
+  /**
+   * Fragment IDs stored in @a recv_cache.
+   */
+  struct GNUNET_CONTAINER_Heap *fragments;
+};
+
+
 /**
  * Common part of the client context for both a master and slave channel.
  */
@@ -83,36 +137,110 @@ struct Channel
   struct TransmitMessage *tmit_head;
   struct TransmitMessage *tmit_tail;
 
-  char *tmit_buf;
+  /**
+   * Received fragments not yet sent to the client.
+   * message_id -> FragmentCache
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
+
+  /**
+   * FIXME
+   */
   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
-  uint32_t tmit_mod_count;
-  uint32_t tmit_mod_recvd;
-  uint16_t tmit_size;
-  uint8_t tmit_status;
 
+  /**
+   * Public key of the channel.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+  /**
+   * Hash of @a pub_key.
+   */
+  struct GNUNET_HashCode pub_key_hash;
+
+  /**
+   * Expected value size for the modifier being received from the PSYC service.
+   */
+  uint32_t tmit_mod_value_size_expected;
+
+  /**
+   * Actual value size for the modifier being received from the PSYC service.
+   */
+  uint32_t tmit_mod_value_size;
+
+  /**
+   * @see enum MessageState
+   */
+  uint8_t tmit_state;
+
+  /**
+   * FIXME
+   */
   uint8_t in_transmit;
+
+  /**
+   * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
+   */
   uint8_t is_master;
+
+  /**
+   * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
+   */
+  uint8_t ready;
+
+  /**
+   * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
+   */
+  uint8_t disconnected;
 };
 
+
 /**
  * Client context for a channel master.
  */
 struct Master
 {
+  /**
+   * Channel struct common for Master and Slave
+   */
   struct Channel channel;
-  struct GNUNET_CRYPTO_EccPrivateKey priv_key;
-  struct GNUNET_CRYPTO_EccPublicSignKey pub_key;
-  struct GNUNET_HashCode pub_key_hash;
 
+  /**
+   * Private key of the channel.
+   */
+  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+
+  /**
+   * Handle for the multicast origin.
+   */
   struct GNUNET_MULTICAST_Origin *origin;
+
+  /**
+   * Transmit handle for multicast.
+   */
   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
 
+  /**
+   * Maximum message ID for this channel.
+   *
+   * Incremented before sending a message, thus the message_id in messages sent
+   * starts from 1.
+   */
   uint64_t max_message_id;
+
+  /**
+   * ID of the last message that contains any state operations.
+   * 0 if there is no such message.
+   */
   uint64_t max_state_message_id;
+
+  /**
+   * Maximum group generation for this channel.
+   */
   uint64_t max_group_generation;
 
   /**
-   * enum GNUNET_PSYC_Policy
+   * @see enum GNUNET_PSYC_Policy
    */
   uint32_t policy;
 };
@@ -123,25 +251,62 @@ struct Master
  */
 struct Slave
 {
+  /**
+   * Channel struct common for Master and Slave
+   */
   struct Channel channel;
-  struct GNUNET_CRYPTO_EccPrivateKey slave_key;
-  struct GNUNET_CRYPTO_EccPublicSignKey chan_key;
-  struct GNUNET_HashCode chan_key_hash;
 
+  /**
+   * Private key of the slave.
+   */
+  struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
+
+  /**
+   * Handle for the multicast member.
+   */
   struct GNUNET_MULTICAST_Member *member;
+
+  /**
+   * Transmit handle for multicast.
+   */
   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
 
+  /**
+   * Peer identity of the origin.
+   */
   struct GNUNET_PeerIdentity origin;
+
+  /**
+   * Number of items in @a relays.
+   */
+  uint32_t relay_count;
+
+  /**
+   * Relays that multicast can use to connect.
+   */
   struct GNUNET_PeerIdentity *relays;
+
+  /**
+   * Join request to be transmitted to the master on join.
+   */
   struct GNUNET_MessageHeader *join_req;
 
+  /**
+   * Maximum message ID for this channel.
+   */
   uint64_t max_message_id;
-  uint64_t max_request_id;
 
-  uint32_t relay_count;
+  /**
+   * Maximum request ID for this channel.
+   */
+  uint64_t max_request_id;
 };
 
 
+static inline void
+transmit_message (struct Channel *ch, uint8_t inc_msg_id);
+
+
 /**
  * Task run during shutdown.
  *
@@ -163,6 +328,34 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   }
 }
 
+
+static void
+client_cleanup (struct Channel *ch)
+{
+  /* FIXME: fragment_cache_clear */
+
+  if (ch->is_master)
+  {
+    struct Master *mst = (struct Master *) ch;
+    if (NULL != mst->origin)
+      GNUNET_MULTICAST_origin_stop (mst->origin);
+    GNUNET_CONTAINER_multihashmap_remove (clients, &ch->pub_key_hash, mst);
+  }
+  else
+  {
+    struct Slave *slv = (struct Slave *) ch;
+    if (NULL != slv->join_req)
+      GNUNET_free (slv->join_req);
+    if (NULL != slv->relays)
+      GNUNET_free (slv->relays);
+    if (NULL != slv->member)
+      GNUNET_MULTICAST_member_part (slv->member);
+  }
+
+  GNUNET_free (ch);
+}
+
+
 /**
  * Called whenever a client is disconnected.
  * Frees our resources associated with that client.
@@ -176,73 +369,67 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
   if (NULL == client)
     return;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
-
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
+
   if (NULL == ch)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "User context is NULL in client_disconnect()\n");
+                "%p User context is NULL in client_disconnect()\n", ch);
     GNUNET_break (0);
     return;
   }
 
-  if (NULL != ch->tmit_buf)
-  {
-    GNUNET_free (ch->tmit_buf);
-    ch->tmit_buf = NULL;
-  }
+  ch->disconnected = GNUNET_YES;
 
-  if (ch->is_master)
+  /* Send pending messages to multicast before cleanup. */
+  if (NULL != ch->tmit_head)
   {
-    struct Master *mst = (struct Master *) ch;
-    if (NULL != mst->origin)
-      GNUNET_MULTICAST_origin_stop (mst->origin);
+    transmit_message (ch, GNUNET_NO);
   }
   else
   {
-    struct Slave *slv = (struct Slave *) ch;
-    if (NULL != slv->join_req)
-      GNUNET_free (slv->join_req);
-    if (NULL != slv->relays)
-      GNUNET_free (slv->relays);
-    if (NULL != slv->member)
-      GNUNET_MULTICAST_member_part (slv->member);
+    client_cleanup (ch);
   }
-
-  GNUNET_free (ch);
 }
 
-void
-join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+
+/**
+ * Master receives a join request from a slave.
+ */
+static void
+join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
          const struct GNUNET_MessageHeader *join_req,
          struct GNUNET_MULTICAST_JoinHandle *jh)
 {
 
 }
 
-void
+
+static void
 membership_test_cb (void *cls,
-                    const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+                    const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
                     uint64_t message_id, uint64_t group_generation,
                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
 {
 
 }
 
-void
+
+static void
 replay_fragment_cb (void *cls,
-                    const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+                    const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
                     uint64_t fragment_id, uint64_t flags,
                     struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
 
+{
 }
 
-void
+
+static void
 replay_message_cb (void *cls,
-                   const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+                   const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
                    uint64_t message_id,
                    uint64_t fragment_offset,
                    uint64_t flags,
@@ -251,23 +438,378 @@ replay_message_cb (void *cls,
 
 }
 
-void
-request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
-            const struct GNUNET_MessageHeader *req,
-            enum GNUNET_MULTICAST_MessageFlags flags)
+
+static void
+fragment_store_result (void *cls, int64_t result, const char *err_msg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "fragment_store() returned %l (%s)\n", result, err_msg);
+}
+
+
+static void
+message_to_client (struct Channel *ch,
+                   const struct GNUNET_MULTICAST_MessageHeader *mmsg)
 {
+  uint16_t size = ntohs (mmsg->header.size);
+  struct GNUNET_PSYC_MessageHeader *pmsg;
+  uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Sending message to client. "
+              "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+              ch, GNUNET_ntohll (mmsg->fragment_id),
+              GNUNET_ntohll (mmsg->message_id));
+
+  pmsg = GNUNET_malloc (psize);
+  pmsg->header.size = htons (psize);
+  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  pmsg->message_id = mmsg->message_id;
+
+  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
 
+  GNUNET_SERVER_notification_context_add (nc, ch->client);
+  GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+                                              (const struct GNUNET_MessageHeader *) pmsg,
+                                              GNUNET_NO);
+  GNUNET_free (pmsg);
 }
 
-void
+
+/**
+ * Convert an uint64_t in network byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
+{
+  /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
+
+  n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
+  n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
+
+  *key = (struct GNUNET_HashCode) {{ 0 }};
+  *((uint64_t *) key)
+    = (n << 32) | (n >> 32);
+}
+
+
+/**
+ * Convert an uint64_t in host byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
+{
+#if __BYTE_ORDER == __BIG_ENDIAN
+  hash_key_from_nll (key, n);
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+  *key = (struct GNUNET_HashCode) {{ 0 }};
+  *((uint64_t *) key) = n;
+#else
+  #error byteorder undefined
+#endif
+}
+
+
+static void
+fragment_cache_insert (struct Channel *ch,
+                       const struct GNUNET_HashCode *msg_id,
+                       struct FragmentCache *frag_cache,
+                       const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+                       uint16_t last_part_type)
+{
+  uint16_t size = ntohs (mmsg->header.size);
+  struct GNUNET_CONTAINER_MultiHashMap
+    *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
+                                                    &ch->pub_key_hash);
+
+  if (NULL == frag_cache)
+  {
+    frag_cache = GNUNET_new (struct FragmentCache);
+    frag_cache->fragments
+      = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+
+    if (NULL == ch->recv_msgs)
+    {
+      ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    }
+    GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+    if (NULL == chan_msgs)
+    {
+      chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+      GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
+                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    }
+  }
+
+  struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
+  hash_key_from_nll (frag_id, mmsg->fragment_id);
+  struct FragmentEntry
+    *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+  if (NULL == frag_entry)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Adding message fragment to cache. "
+                "fragment_id: %" PRIu64 ", "
+                "header_size: %" PRIu64 " + %" PRIu64 ").\n",
+                ch, GNUNET_ntohll (mmsg->fragment_id),
+                frag_cache->header_size, size);
+    frag_entry = GNUNET_new (struct FragmentEntry);
+    frag_entry->ref_count = 1;
+    frag_entry->mmsg = GNUNET_malloc (size);
+    memcpy (frag_entry->mmsg, mmsg, size);
+    GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  else
+  {
+    frag_entry->ref_count++;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Message fragment already in cache. "
+                "fragment_id: %" PRIu64 ", ref_count: %u\n",
+                ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
+  }
+
+  switch (last_part_type)
+  {
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+    frag_cache->header_size += size;
+  }
+  GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
+                                GNUNET_ntohll (mmsg->fragment_id));
+}
+
+
+static void
+fragment_cache_clear (struct Channel *ch,
+                      const struct GNUNET_HashCode *msg_id,
+                      struct FragmentCache *frag_cache,
+                      uint8_t send_to_client)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Clearing message fragment cache.\n", ch);
+
+  struct GNUNET_CONTAINER_MultiHashMap
+    *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
+                                                    &ch->pub_key_hash);
+  GNUNET_assert (NULL != chan_msgs);
+  struct GNUNET_HashCode *frag_id;
+
+  while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
+  {
+    struct FragmentEntry
+      *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+    if (frag_entry != NULL)
+    {
+      if (GNUNET_YES == send_to_client)
+      {
+        message_to_client (ch, frag_entry->mmsg);
+      }
+      if (1 == frag_entry->ref_count)
+      {
+        GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
+        GNUNET_free (frag_entry->mmsg);
+        GNUNET_free (frag_entry);
+      }
+      else
+      {
+        frag_entry->ref_count--;
+      }
+    }
+    GNUNET_free (frag_id);
+  }
+
+  GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
+  GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
+  GNUNET_free (frag_cache);
+}
+
+
+/**
+ * Incoming message fragment from multicast.
+ *
+ * Store it using PSYCstore and send it to the client of the channel.
+ */
+static void
 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
 {
+  struct Channel *ch = cls;
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received message of type %u and size %u from multicast.\n",
+              ch, type, size);
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+  {
+    GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key,
+                                     (const struct
+                                      GNUNET_MULTICAST_MessageHeader *) msg,
+                                     0, NULL, NULL);
+
+#if TODO
+    /* FIXME: apply modifiers to state in PSYCstore */
+    GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
+                                   GNUNET_ntohll (mmsg->message_id),
+                                   meth->mod_count, mods,
+                                   rcb, rcb_cls);
+#endif
+
+    const struct GNUNET_MULTICAST_MessageHeader
+      *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
+
+    uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
+                                                    (const char *) &mmsg[1]);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Last message part type %u\n", ptype);
+
+    if (GNUNET_NO == ptype)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Received message with invalid parts from multicast. "
+                  "Dropping message.\n", ch);
+      GNUNET_break_op (0);
+      break;
+    }
+
+    struct GNUNET_HashCode msg_id;
+    hash_key_from_nll (&msg_id, mmsg->message_id);
+
+    struct FragmentCache *frag_cache = NULL;
+    if (NULL != ch->recv_msgs)
+      frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
+
+    switch (ptype)
+    {
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+      /* FIXME: check state flag / max_state_message_id */
+      if (NULL == frag_cache)
+      {
+        message_to_client (ch, mmsg);
+        break;
+      }
+      else
+      {
+        if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
+        { /* first data fragment after the header, send cached fragments */
+          fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES);
+          message_to_client (ch, mmsg);
+          break;
+        }
+        else
+        { /* still missing fragments from the header, cache data fragment */
+          /* fall thru */
+        }
+      }
+
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+      /* not all modifiers arrived yet, cache fragment */
+      fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
+      break;
+
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+      if (NULL != frag_cache)
+      { /* fragments not yet sent to client, remove from cache */
+        fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO);
+      }
+      else
+      {
+        message_to_client (ch, mmsg);
+      }
+      break;
+    }
+    break;
+  }
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p Dropping unknown message of type %u and size %u.\n",
+                ch, type, size);
+  }
+}
+
+
+/**
+ * Incoming request fragment from multicast for a master.
+ *
+ * @param cls          Master.
+ * @param slave_key    Sending slave's public key.
+ * @param msg          The message.
+ * @param flags                Request flags.
+ */
+static void
+request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+            const struct GNUNET_MessageHeader *msg,
+            enum GNUNET_MULTICAST_MessageFlags flags)
+{
+  struct Master *mst = cls;
+  struct Channel *ch = &mst->channel;
+
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message of type %u from multicast.\n",
-              ntohs (msg->type));
+              "%p Received request of type %u and size %u from multicast.\n",
+              ch, type, size);
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+  {
+    const struct GNUNET_MULTICAST_RequestHeader *req
+      = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
+
+    /* FIXME: see message_cb() */
+    if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
+                                                    (const char *) &req[1]))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Dropping message with invalid parts "
+                  "received from multicast.\n", ch);
+      GNUNET_break_op (0);
+      break;
+    }
+
+    struct GNUNET_PSYC_MessageHeader *pmsg;
+    uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+    pmsg = GNUNET_malloc (psize);
+    pmsg->header.size = htons (psize);
+    pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    pmsg->message_id = req->request_id;
+    pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+
+    memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+
+    GNUNET_SERVER_notification_context_add (nc, ch->client);
+    GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+                                                (const struct GNUNET_MessageHeader *) pmsg,
+                                                GNUNET_NO);
+    GNUNET_free (pmsg);
+    break;
+  }
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Dropping unknown request of type %u and size %u.\n",
+                ch, type, size);
+    GNUNET_break_op (0);
+  }
 }
 
-void
+
+/**
+ * Response from PSYCstore with the current counter values for a channel master.
+ */
+static void
 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
                     uint64_t max_message_id, uint64_t max_group_generation,
                     uint64_t max_state_message_id)
@@ -291,6 +833,7 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
                                        join_cb, membership_test_cb,
                                        replay_fragment_cb, replay_message_cb,
                                        request_cb, message_cb, ch);
+    ch->ready = GNUNET_YES;
   }
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -299,6 +842,9 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
 }
 
 
+/**
+ * Response from PSYCstore with the current counter values for a channel slave.
+ */
 void
 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
                    uint64_t max_message_id, uint64_t max_group_generation,
@@ -316,13 +862,14 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
   {
     slv->max_message_id = max_message_id;
     slv->member
-      = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
+      = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
                                       &slv->origin,
                                       slv->relay_count, slv->relays,
                                       slv->join_req, join_cb,
                                       membership_test_cb,
                                       replay_fragment_cb, replay_message_cb,
                                       message_cb, ch);
+    ch->ready = GNUNET_YES;
   }
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -332,6 +879,9 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
 }
 
 
+/**
+ * Handle a connecting client starting a channel master.
+ */
 static void
 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
                      const struct GNUNET_MessageHeader *msg)
@@ -339,24 +889,29 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
   const struct MasterStartRequest *req
     = (const struct MasterStartRequest *) msg;
   struct Master *mst = GNUNET_new (struct Master);
-  mst->channel.client = client;
-  mst->channel.is_master = GNUNET_YES;
+  struct Channel *ch = &mst->channel;
+  ch->client = client;
+  ch->is_master = GNUNET_YES;
   mst->policy = ntohl (req->policy);
   mst->priv_key = req->channel_key;
-  GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key,
-                                                  &mst->pub_key);
-  GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
+  GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
+  GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Master connected to channel %s.\n",
+              mst, GNUNET_h2s (&ch->pub_key_hash));
 
-  GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
-                                 master_counters_cb, mst);
+  GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
 
   GNUNET_SERVER_client_set_user_context (client, &mst->channel);
-  GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
+  GNUNET_CONTAINER_multihashmap_put (clients, &ch->pub_key_hash, mst,
                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
+/**
+ * Handle a connecting client joining as a channel slave.
+ */
 static void
 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
                    const struct GNUNET_MessageHeader *msg)
@@ -364,83 +919,153 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
   const struct SlaveJoinRequest *req
     = (const struct SlaveJoinRequest *) msg;
   struct Slave *slv = GNUNET_new (struct Slave);
+  struct Channel *ch = &slv->channel;
   slv->channel.client = client;
   slv->channel.is_master = GNUNET_NO;
   slv->slave_key = req->slave_key;
-  slv->chan_key = req->channel_key;
-  GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
-                      &slv->chan_key_hash);
+  ch->pub_key = req->channel_key;
+  GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
+                      &ch->pub_key_hash);
   slv->origin = req->origin;
   slv->relay_count = ntohl (req->relay_count);
+  if (0 < slv->relay_count)
+  {
+    const struct GNUNET_PeerIdentity *relays
+      = (const struct GNUNET_PeerIdentity *) &req[1];
+    slv->relays
+      = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
+    uint32_t i;
+    for (i = 0; i < slv->relay_count; i++)
+      memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+  }
 
-  const struct GNUNET_PeerIdentity *relays
-    = (const struct GNUNET_PeerIdentity *) &req[1];
-  slv->relays
-    = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
-  uint32_t i;
-  for (i = 0; i < slv->relay_count; i++)
-    memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Slave connected to channel %s.\n",
+              slv, GNUNET_h2s (&ch->pub_key_hash));
 
-  GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
-                                 slave_counters_cb, slv);
+  GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
 
   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
+/**
+ * Send acknowledgement to a client.
+ *
+ * Sent after a message fragment has been passed on to multicast.
+ *
+ * @param ch The channel struct for the client.
+ */
 static void
-send_transmit_ack (struct Channel *ch)
+send_message_ack (struct Channel *ch)
 {
-  struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
-  res->header.size = htons (sizeof (*res));
-  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
-  res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
+  struct GNUNET_MessageHeader res;
+  res.size = htons (sizeof (res));
+  res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
+  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
                                               GNUNET_NO);
-  GNUNET_free (res);
 }
 
 
+/**
+ * Callback for the transmit functions of multicast.
+ */
 static int
 transmit_notify (void *cls, size_t *data_size, void *data)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
   struct Channel *ch = cls;
-  struct TransmitMessage *msg = ch->tmit_head;
+  struct TransmitMessage *tmit_msg = ch->tmit_head;
 
-  if (NULL == msg || *data_size < ntohs (msg->size))
+  if (NULL == tmit_msg || *data_size < tmit_msg->size)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p transmit_notify: nothing to send.\n", ch);
     *data_size = 0;
     return GNUNET_NO;
   }
 
-  *data_size = ntohs (msg->size);
-  memcpy (data, msg->buf, *data_size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
+
+  *data_size = tmit_msg->size;
+  memcpy (data, tmit_msg->buf, *data_size);
+
+  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  GNUNET_free (tmit_msg);
 
-  GNUNET_free (ch->tmit_buf);
-  ch->tmit_buf = NULL;
-  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
+  int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+  send_message_ack (ch);
 
-  return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+  if (0 == ch->tmit_task)
+  {
+    if (NULL != ch->tmit_head)
+    {
+      transmit_message (ch, GNUNET_NO);
+    }
+    else if (ch->disconnected)
+    {
+      /* FIXME: handle partial message (when still in_transmit) */
+      client_cleanup (ch);
+    }
+  }
+
+  return ret;
+}
+
+
+/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+master_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+  int ret = transmit_notify (cls, data_size, data);
+
+  if (GNUNET_YES == ret)
+  {
+    struct Master *mst = cls;
+    mst->tmit_handle = NULL;
+  }
+  return ret;
 }
 
 
+/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+slave_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+  int ret = transmit_notify (cls, data_size, data);
+
+  if (GNUNET_YES == ret)
+  {
+    struct Slave *slv = cls;
+    slv->tmit_handle = NULL;
+  }
+  return ret;
+}
+
+
+/**
+ * Transmit a message from a channel master to the multicast group.
+ */
 static void
-master_transmit_message (void *cls,
-                         const struct GNUNET_SCHEDULER_TaskContext *tc)
+master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
-  struct Master *mst = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
   mst->channel.tmit_task = 0;
   if (NULL == mst->tmit_handle)
   {
+    if (GNUNET_YES == inc_msg_id)
+      mst->max_message_id++;
     mst->tmit_handle
-      = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
+      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
                                         mst->max_group_generation,
-                                        transmit_notify, mst);
+                                        master_transmit_notify, mst);
   }
   else
   {
@@ -449,17 +1074,20 @@ master_transmit_message (void *cls,
 }
 
 
+/**
+ * Transmit a message from a channel slave to the multicast group.
+ */
 static void
-slave_transmit_message (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
 {
-  struct Slave *slv = cls;
   slv->channel.tmit_task = 0;
   if (NULL == slv->tmit_handle)
   {
+    if (GNUNET_YES == inc_msg_id)
+      slv->max_message_id++;
     slv->tmit_handle
-      = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
-                                          transmit_notify, slv);
+      = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+                                           slave_transmit_notify, slv);
   }
   else
   {
@@ -468,121 +1096,162 @@ slave_transmit_message (void *cls,
 }
 
 
-static int
-buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
+static inline void
+transmit_message (struct Channel *ch, uint8_t inc_msg_id)
 {
-  uint16_t size = ntohs (msg->size);
-  struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
-
-  if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
-    return GNUNET_SYSERR;
-
-  if (0 == ch->tmit_size)
-  {
-    ch->tmit_buf = GNUNET_malloc (size);
-    memcpy (ch->tmit_buf, msg, size);
-    ch->tmit_size = size;
-  }
-  else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
-  {
-    ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
-    memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
-    ch->tmit_size += size;
-  }
-
-  if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
-      < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
-  {
-    struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
-    tmit_msg->buf = (char *) msg;
-    tmit_msg->size = size;
-    tmit_msg->status = ch->tmit_status;
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-    tmit_delay = GNUNET_TIME_UNIT_ZERO;
-  }
-
-  if (0 != ch->tmit_task)
-    GNUNET_SCHEDULER_cancel (ch->tmit_task);
+  ch->is_master
+    ? master_transmit_message ((struct Master *) ch, inc_msg_id)
+    : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
+}
 
-  ch->tmit_task
-    = ch->is_master
-    ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
-    : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
 
-  return GNUNET_OK;
+static void
+transmit_error (struct Channel *ch)
+{
+  struct GNUNET_MessageHeader *msg;
+  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
+                                                    + sizeof (*msg));
+  msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
+  msg->size = ntohs (sizeof (*msg));
+  msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+
+  tmit_msg->buf = (char *) &tmit_msg[1];
+  tmit_msg->size = sizeof (*msg);
+  tmit_msg->state = ch->tmit_state;
+  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  transmit_message (ch, GNUNET_NO);
+
+  /* FIXME: cleanup */
 }
 
+
+/**
+ * Incoming message from a client.
+ */
 static void
-handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
-                        const struct GNUNET_MessageHeader *msg)
+handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+                     const struct GNUNET_MessageHeader *msg)
 {
-  const struct GNUNET_PSYC_MessageMethod *meth
-    = (const struct GNUNET_PSYC_MessageMethod *) msg;
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
-  if (GNUNET_NO != ch->in_transmit)
+  if (GNUNET_YES != ch->ready)
   {
-    // FIXME: already transmitting a message, send back error message.
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p Dropping message from client, channel is not ready yet.\n",
+                ch);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
 
-  ch->in_transmit = GNUNET_YES;
-  ch->tmit_buf = NULL;
-  ch->tmit_size = 0;
-  ch->tmit_mod_recvd = 0;
-  ch->tmit_mod_count = ntohl (meth->mod_count);
-  ch->tmit_status = GNUNET_PSYC_DATA_CONT;
+  uint8_t inc_msg_id = GNUNET_NO;
+  uint16_t size = ntohs (msg->size);
+  uint16_t psize = 0, ptype = 0, pos = 0;
 
-  buffer_message (ch, msg);
+  if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
+    GNUNET_break (0);
+    transmit_error (ch);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received message from client.\n", ch);
+  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
 
-  if (0 == ch->tmit_mod_count)
-    send_transmit_ack (ch);
+  for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+  {
+    const struct GNUNET_MessageHeader *pmsg
+      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+    psize = ntohs (pmsg->size);
+    ptype = ntohs (pmsg->type);
+    if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "%p Received invalid message part of type %u and size %u "
+                  "from client.\n", ch, ptype, psize);
+      GNUNET_break (0);
+      transmit_error (ch);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Received message part from client.\n", ch);
+    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
+    if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
+      inc_msg_id = GNUNET_YES;
+  }
+
+  size -= sizeof (*msg);
+  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
+  tmit_msg->buf = (char *) &tmit_msg[1];
+  memcpy (tmit_msg->buf, &msg[1], size);
+  tmit_msg->size = size;
+  tmit_msg->state = ch->tmit_state;
+  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  transmit_message (ch, inc_msg_id);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };
 
 
+/**
+ * Client requests to add a slave to the membership database.
+ */
 static void
-handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *msg)
+handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
+                  const struct GNUNET_MessageHeader *msg)
 {
-  const struct GNUNET_PSYC_MessageModifier *mod
-    = (const struct GNUNET_PSYC_MessageModifier *) msg;
-  struct Channel *ch
-    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != ch);
 
-  ch->tmit_mod_recvd++;
-  buffer_message (ch, msg);
+}
 
-  if (ch->tmit_mod_recvd == ch->tmit_mod_count)
-    send_transmit_ack (ch);
 
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
+/**
+ * Client requests to remove a slave from the membership database.
+ */
+static void
+handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
+                     const struct GNUNET_MessageHeader *msg)
+{
+
+}
 
 
+/**
+ * Client requests channel history from PSYCstore.
+ */
 static void
-handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
+handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
                       const struct GNUNET_MessageHeader *msg)
 {
-  const struct GNUNET_PSYC_MessageData *data
-    = (const struct GNUNET_PSYC_MessageData *) msg;
-  struct Channel *ch
-    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != ch);
 
-  ch->tmit_status = ntohs (data->status);
-  buffer_message (ch, msg);
-  send_transmit_ack (ch);
+}
 
-  if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
-    ch->in_transmit = GNUNET_NO;
 
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
+/**
+ * Client requests best matching state variable from PSYCstore.
+ */
+static void
+handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
+                  const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Client requests state variables with a given prefix from PSYCstore.
+ */
+static void
+handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
+                         const struct GNUNET_MessageHeader *msg)
+{
+
+}
 
 
 /**
@@ -603,27 +1272,35 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
     { &handle_slave_join, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
 
-    { &handle_transmit_method, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
+    { &handle_psyc_message, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+    { &handle_slave_add, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
+
+    { &handle_slave_remove, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
 
-    { &handle_transmit_modifier, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
+    { &handle_story_request, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
 
-    { &handle_transmit_data, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
+    { &handle_state_get, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
 
-    { NULL, NULL, 0, 0 }
+    { &handle_state_get_prefix, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
   };
 
   cfg = c;
   store = GNUNET_PSYCSTORE_connect (cfg);
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
   GNUNET_SERVER_add_handlers (server, handlers);
   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
-                                NULL);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                &shutdown_task, NULL);
 }