adding extended proxy support for http(s) client
[oweals/gnunet.git] / src / psyc / psyc_api.c
index a5a01fa923e4cf72bcc98d451865b2a8803e1fad..16e8106d453730c55852fe1aa891323e748593d4 100644 (file)
@@ -45,7 +45,20 @@ struct OperationHandle
 {
   struct OperationHandle *prev;
   struct OperationHandle *next;
-  const struct GNUNET_MessageHeader *msg;
+  struct GNUNET_MessageHeader *msg;
+};
+
+
+/**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_ChannelTransmitHandle
+{
+  struct GNUNET_PSYC_Channel *ch;
+  GNUNET_PSYC_TransmitNotifyModifier notify_mod;
+  GNUNET_PSYC_TransmitNotifyData notify_data;
+  void *notify_cls;
+  enum MessageState state;
 };
 
 /**
@@ -53,6 +66,11 @@ struct OperationHandle
  */
 struct GNUNET_PSYC_Channel
 {
+  /**
+   * Transmission handle;
+   */
+  struct GNUNET_PSYC_ChannelTransmitHandle tmit;
+
   /**
    * Configuration to use.
    */
@@ -78,6 +96,11 @@ struct GNUNET_PSYC_Channel
    */
   struct OperationHandle *tmit_tail;
 
+  /**
+   * Message being transmitted to the PSYC service.
+   */
+  struct OperationHandle *tmit_msg;
+
   /**
    * Message to send on reconnect.
    */
@@ -118,6 +141,11 @@ struct GNUNET_PSYC_Channel
    */
   uint64_t recv_message_id;
 
+  /**
+   * Public key of the slave from which a message is being received.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
+
   /**
    * State of the currently being received message from the PSYC service.
    */
@@ -138,11 +166,6 @@ struct GNUNET_PSYC_Channel
    */
   uint32_t recv_mod_value_size;
 
-  /**
-   * Buffer space available for transmitting the next data fragment.
-   */
-  uint16_t tmit_size; // FIXME
-
   /**
    * Is transmission paused?
    */
@@ -151,7 +174,7 @@ struct GNUNET_PSYC_Channel
   /**
    * Are we still waiting for a PSYC_TRANSMIT_ACK?
    */
-  uint8_t tmit_ack_pending; // FIXME
+  uint8_t tmit_ack_pending;
 
   /**
    * Are we polling for incoming messages right now?
@@ -170,19 +193,6 @@ struct GNUNET_PSYC_Channel
 };
 
 
-/**
- * Handle for a pending PSYC transmission operation.
- */
-struct GNUNET_PSYC_MasterTransmitHandle
-{
-  struct GNUNET_PSYC_Master *master;
-  GNUNET_PSYC_MasterTransmitNotify notify_mod;
-  GNUNET_PSYC_MasterTransmitNotify notify_data;
-  void *notify_cls;
-  enum MessageState state;
-};
-
-
 /**
  * Handle for the master of a PSYC channel.
  */
@@ -190,8 +200,6 @@ struct GNUNET_PSYC_Master
 {
   struct GNUNET_PSYC_Channel ch;
 
-  struct GNUNET_PSYC_MasterTransmitHandle *tmit;
-
   GNUNET_PSYC_MasterStartCallback start_cb;
 
   uint64_t max_message_id;
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master
 struct GNUNET_PSYC_Slave
 {
   struct GNUNET_PSYC_Channel ch;
+
+  GNUNET_PSYC_SlaveJoinCallback join_cb;
+
+  uint64_t max_message_id;
 };
 
 
@@ -246,16 +258,14 @@ struct GNUNET_PSYC_StateQuery
 };
 
 
-/**
- * Try again to connect to the PSYC service.
- *
- * @param cls Handle to the PSYC service.
- * @param tc Scheduler context
- */
 static void
 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
+static void
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
+
+
 /**
  * Reschedule a connect attempt to the service.
  *
@@ -304,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch)
   ch->recv_state = MSG_STATE_START;
   ch->recv_flags = 0;
   ch->recv_message_id = 0;
-  ch->recv_mod_value_size =0;
+  //FIXME: ch->recv_slave_key = { 0 };
+  ch->recv_mod_value_size = 0;
   ch->recv_mod_value_size_expected = 0;
 }
 
@@ -312,8 +323,6 @@ recv_reset (struct GNUNET_PSYC_Channel *ch)
 static void
 recv_error (struct GNUNET_PSYC_Channel *ch)
 {
-  recv_reset (ch);
-
   GNUNET_PSYC_MessageCallback message_cb
     = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
     ? ch->hist_message_cb
@@ -321,83 +330,182 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
 
   if (NULL != message_cb)
     message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
+
+  recv_reset (ch);
 }
 
+
+/**
+ * Queue a message part for transmission to the PSYC service.
+ *
+ * The message part is added to the current message buffer.
+ * When this buffer is full, it is added to the transmission queue.
+ *
+ * @param ch Channel struct for the client.
+ * @param msg Modifier message part, or NULL when there's no more modifiers.
+ * @param end End of message.
+ */
+static void
+queue_message (struct GNUNET_PSYC_Channel *ch,
+               const struct GNUNET_MessageHeader *msg,
+               uint8_t end)
+{
+  uint16_t size = msg ? ntohs (msg->size) : 0;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Queueing message of type %u and size %u (end: %u)).\n",
+       ntohs (msg->type), size, end);
+
+  struct OperationHandle *op = ch->tmit_msg;
+  if (NULL != op)
+  {
+    if (NULL == msg
+        || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
+    {
+      /* End of message or buffer is full, add it to transmission queue
+       * and start with empty buffer */
+      op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+      op->msg->size = htons (op->msg->size);
+      GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+      ch->tmit_msg = op = NULL;
+      ch->tmit_ack_pending++;
+    }
+    else
+    {
+      /* Message fits in current buffer, append */
+      ch->tmit_msg = op
+        = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
+      op->msg = (struct GNUNET_MessageHeader *) &op[1];
+      memcpy ((char *) op->msg + op->msg->size, msg, size);
+      op->msg->size += size;
+    }
+  }
+
+  if (NULL == op && NULL != msg)
+  {
+    /* Empty buffer, copy over message. */
+    ch->tmit_msg = op
+      = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
+    op->msg = (struct GNUNET_MessageHeader *) &op[1];
+    op->msg->size = sizeof (*op->msg) + size;
+    memcpy (&op->msg[1], msg, size);
+  }
+
+  if (NULL != op
+      && (GNUNET_YES == end
+          || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+              < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+  {
+    /* End of message or buffer is full, add it to transmission queue. */
+    op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    op->msg->size = htons (op->msg->size);
+    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    ch->tmit_msg = op = NULL;
+    ch->tmit_ack_pending++;
+  }
+
+  if (GNUNET_YES == end)
+    ch->in_transmit = GNUNET_NO;
+
+  transmit_next (ch);
+}
+
+
 /**
  * Request a modifier from a client to transmit.
  *
  * @param mst Master handle.
  */
 static void
-master_transmit_mod (struct GNUNET_PSYC_Master *mst)
+channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
 {
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
-  uint16_t max_data_size
-    = ch->tmit_size > sizeof (struct GNUNET_MessageHeader)
-    ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size
-    : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size;
-  uint16_t data_size = max_data_size;
+  uint16_t max_data_size, data_size;
+  char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+  struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+  int notify_ret;
+
+  switch (ch->tmit.state)
+  {
+  case MSG_STATE_MODIFIER:
+  {
+    struct GNUNET_PSYC_MessageModifier *mod
+      = (struct GNUNET_PSYC_MessageModifier *) msg;
+    max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
+    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+    msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
+                                      &mod->oper, &mod->value_size);
+    mod->name_size = strnlen ((char *) &mod[1], data_size);
+    if (mod->name_size < data_size)
+    {
+      mod->value_size = htonl (mod->value_size);
+      mod->name_size = htons (mod->name_size);
+    }
+    else if (0 < data_size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
+      notify_ret = GNUNET_SYSERR;
+    }
+    break;
+  }
+  case MSG_STATE_MOD_CONT:
+  {
+    max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
+    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+    msg->size = sizeof (struct GNUNET_MessageHeader);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
+                                      &data_size, &msg[1], NULL, NULL);
+    break;
+  }
+  default:
+    GNUNET_assert (0);
+  }
 
-  struct GNUNET_MessageHeader *msg;
-  struct OperationHandle *op
-    = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
-  op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
-  msg->type
-    = MSG_STATE_MODIFIER == mst->tmit->state
-    ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER)
-    : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
-
-  int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
-                                           &data_size, &msg[1]);
   switch (notify_ret)
   {
   case GNUNET_NO:
-    if (0 != data_size)
-      mst->tmit->state = MSG_STATE_MOD_CONT;
+    if (0 == data_size)
+    { /* Transmission paused, nothing to send. */
+      ch->tmit_paused = GNUNET_YES;
+      return;
+    }
+    ch->tmit.state = MSG_STATE_MOD_CONT;
     break;
 
   case GNUNET_YES:
-    mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER;
+    if (0 == data_size)
+    {
+      /* End of modifiers. */
+      ch->tmit.state = MSG_STATE_DATA;
+      if (0 == ch->tmit_ack_pending)
+        channel_transmit_data (ch);
+
+      return;
+    }
+    ch->tmit.state = MSG_STATE_MODIFIER;
     break;
 
   default:
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MasterTransmitNotify returned error when requesting a modifier.\n");
+         "MasterTransmitNotifyModifier returned error "
+         "when requesting a modifier.\n");
 
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
 
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-    transmit_next (ch);
+    queue_message (ch, msg, GNUNET_YES);
     return;
   }
 
-  if ((GNUNET_NO == notify_ret && 0 == data_size))
-  {
-    /* Transmission paused, nothing to send. */
-    ch->tmit_paused = GNUNET_YES;
-    GNUNET_free (op);
-  }
-
   if (0 < data_size)
   {
-    GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
-    msg->size = htons (sizeof (*msg) + data_size);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-  }
-
-  /* End of message. */
-  if (GNUNET_YES == notify_ret)
-  {
-    op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
-    op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
-    msg->size = htons (sizeof (*msg));
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    GNUNET_assert (data_size <= max_data_size);
+    msg->size = htons (msg->size + data_size);
+    queue_message (ch, msg, GNUNET_NO);
   }
 
-  transmit_next (ch);
+  channel_transmit_mod (ch);
 }
 
 
@@ -407,18 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
  * @param mst Master handle.
  */
 static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst)
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
 {
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
-  struct GNUNET_MessageHeader *msg;
   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
-  struct OperationHandle *op
-    = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
-  op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
+  char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+  struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+
   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
 
-  int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
-                                           &data_size, &msg[1]);
+  int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
+                                         &data_size, &msg[1]);
   switch (notify_ret)
   {
   case GNUNET_NO:
@@ -426,24 +532,22 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
     {
       /* Transmission paused, nothing to send. */
       ch->tmit_paused = GNUNET_YES;
-      GNUNET_free (op);
+      return;
     }
     break;
 
   case GNUNET_YES:
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_END;
     break;
 
   default:
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "MasterTransmitNotify returned error when requesting data.\n");
 
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
-
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-    transmit_next (ch);
+    queue_message (ch, msg, GNUNET_YES);
     return;
   }
 
@@ -451,20 +555,96 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
   {
     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
     msg->size = htons (sizeof (*msg) + data_size);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    queue_message (ch, msg, !notify_ret);
   }
 
   /* End of message. */
   if (GNUNET_YES == notify_ret)
   {
-    op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
-    op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
     msg->size = htons (sizeof (*msg));
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    queue_message (ch, msg, GNUNET_YES);
   }
+}
 
-  transmit_next (ch);
+
+/**
+ * Send a message to a channel.
+ *
+ * @param ch Handle to the PSYC channel.
+ * @param method_name Which method should be invoked.
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
+ * @param notify_cls Closure for @a notify_mod and @a notify_data.
+ * @param flags Flags for the message being transmitted.
+ * @return Transmission handle, NULL on error (i.e. more than one request queued).
+ */
+static struct GNUNET_PSYC_ChannelTransmitHandle *
+channel_transmit (struct GNUNET_PSYC_Channel *ch,
+                  const char *method_name,
+                  GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                  GNUNET_PSYC_TransmitNotifyData notify_data,
+                  void *notify_cls,
+                  uint32_t flags)
+{
+  if (GNUNET_NO != ch->in_transmit)
+    return NULL;
+  ch->in_transmit = GNUNET_YES;
+
+  size_t size = strlen (method_name) + 1;
+  struct GNUNET_PSYC_MessageMethod *pmeth;
+  struct OperationHandle *op;
+
+  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
+                                     + sizeof (*pmeth) + size);
+  op->msg = (struct GNUNET_MessageHeader *) &op[1];
+  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
+
+  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
+  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
+  pmeth->header.size = htons (sizeof (*pmeth) + size);
+  pmeth->flags = htonl (flags);
+  memcpy (&pmeth[1], method_name, size);
+
+  ch->tmit.ch = ch;
+  ch->tmit.notify_mod = notify_mod;
+  ch->tmit.notify_data = notify_data;
+  ch->tmit.notify_cls = notify_cls;
+  ch->tmit.state = MSG_STATE_MODIFIER;
+
+  channel_transmit_mod (ch);
+  return &ch->tmit;
+}
+
+
+/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+static void
+channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+  struct GNUNET_PSYC_Channel *ch = th->ch;
+  if (0 == ch->tmit_ack_pending)
+  {
+    ch->tmit_paused = GNUNET_NO;
+    channel_transmit_data (ch);
+  }
+}
+
+
+/**
+ * Abort transmission request to channel.
+ *
+ * @param th Handle of the request that is being aborted.
+ */
+static void
+channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+  struct GNUNET_PSYC_Channel *ch = th->ch;
+  if (GNUNET_NO == ch->in_transmit)
+    return;
 }
 
 
@@ -476,57 +656,66 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
  */
 static void
 handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
-                     const struct GNUNET_PSYC_MessageHeader *pmsg)
+                     const struct GNUNET_PSYC_MessageHeader *msg)
 {
-  const struct GNUNET_MessageHeader *msg;
-  uint16_t msize = ntohs (pmsg->header.size);
-  uint16_t pos = 0;
-  uint16_t size = 0;
-  uint16_t type, size_eq, size_min;
+  uint16_t size = ntohs (msg->header.size);
+  uint32_t flags = ntohl (msg->flags);
+
+  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
+                           (struct GNUNET_MessageHeader *) msg);
 
   if (MSG_STATE_START == ch->recv_state)
   {
-    ch->recv_message_id = GNUNET_ntohll (pmsg->message_id);
-    ch->recv_flags = ntohl (pmsg->flags);
+    ch->recv_message_id = GNUNET_ntohll (msg->message_id);
+    ch->recv_flags = flags;
+    ch->recv_slave_key = msg->slave_key;
+    ch->recv_mod_value_size = 0;
+    ch->recv_mod_value_size_expected = 0;
   }
-  else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id)
+  else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
   {
+    // FIXME
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
-         GNUNET_ntohll (pmsg->message_id), ch->recv_message_id);
+         GNUNET_ntohll (msg->message_id), ch->recv_message_id);
     GNUNET_break_op (0);
     recv_error (ch);
+    return;
   }
-  else if (ntohl (pmsg->flags) != ch->recv_flags)
+  else if (flags != ch->recv_flags)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message flags. Got: %lu, expected: %lu\n",
-         ntohl (pmsg->flags), ch->recv_flags);
+         flags, ch->recv_flags);
     GNUNET_break_op (0);
     recv_error (ch);
+    return;
   }
 
-  for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size)
+  uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
+
+  for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
   {
-    msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
-    size = ntohs (msg->size);
-    type = ntohs (msg->type);
+    const struct GNUNET_MessageHeader *pmsg
+      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+    psize = ntohs (pmsg->size);
+    ptype = ntohs (pmsg->type);
     size_eq = size_min = 0;
 
-    if (msize < sizeof (*pmsg) + pos + size)
+    if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Discarding message of type %u with invalid size. "
-                  "(%u < %u + %u + %u)\n", ntohs (msg->type),
-                  msize, sizeof (*msg), pos, size);
-      break;
+                  "Dropping message of type %u with invalid size %u.\n",
+                  ptype, psize);
+      recv_error (ch);
+      return;
     }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part of type %u and size %u from PSYC.\n",
-                ntohs (msg->type), size);
 
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received message part from PSYC.\n");
+    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
 
-    switch (type)
+    switch (ptype)
     {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -534,6 +723,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
       break;
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
       size_min = sizeof (struct GNUNET_MessageHeader);
       break;
@@ -541,116 +731,104 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
       size_eq = sizeof (struct GNUNET_MessageHeader);
       break;
+    default:
+      GNUNET_break_op (0);
+      recv_error (ch);
+      return;
     }
 
-    if (! ((0 < size_eq && size == size_eq)
-           || (0 < size_min && size_min <= size)))
+    if (! ((0 < size_eq && psize == size_eq)
+           || (0 < size_min && size_min <= psize)))
     {
-      GNUNET_break (0);
-      reschedule_connect (ch);
+      GNUNET_break_op (0);
+      recv_error (ch);
       return;
     }
 
-    switch (type)
+    switch (ptype)
     {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
     {
       struct GNUNET_PSYC_MessageMethod *meth
-        = (struct GNUNET_PSYC_MessageMethod *) msg;
+        = (struct GNUNET_PSYC_MessageMethod *) pmsg;
 
-      if (MSG_STATE_HEADER != ch->recv_state)
+      if (MSG_STATE_START != ch->recv_state)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message method.\n");
+             "Dropping out of order message method (%u).\n",
+             ch->recv_state);
         /* It is normal to receive an incomplete message right after connecting,
          * but should not happen later.
          * FIXME: add a check for this condition.
          */
         GNUNET_break_op (0);
         recv_error (ch);
-        break;
+        return;
       }
 
-      if ('\0' != (char *) meth + msg->size - 1)
+      if ('\0' != *((char *) meth + psize - 1))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding message with malformed method. "
+             "Dropping message with malformed method. "
              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
         GNUNET_break_op (0);
         recv_error (ch);
-        break;
+        return;
       }
-      GNUNET_PSYC_MessageCallback message_cb
-        = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-        ? ch->hist_message_cb
-        : ch->message_cb;
-
-      if (NULL != message_cb)
-        message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
-
       ch->recv_state = MSG_STATE_METHOD;
       break;
     }
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
     {
-      if (MSG_STATE_MODIFIER != ch->recv_state)
+      if (!(MSG_STATE_METHOD == ch->recv_state
+            || MSG_STATE_MODIFIER == ch->recv_state
+            || MSG_STATE_MOD_CONT == ch->recv_state))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message modifier.\n");
+             "Dropping out of order message modifier (%u).\n",
+             ch->recv_state);
         GNUNET_break_op (0);
         recv_error (ch);
-        break;
+        return;
       }
 
       struct GNUNET_PSYC_MessageModifier *mod
-        = (struct GNUNET_PSYC_MessageModifier *) msg;
+        = (struct GNUNET_PSYC_MessageModifier *) pmsg;
 
       uint16_t name_size = ntohs (mod->name_size);
-      ch->recv_mod_value_size_expected = ntohs (mod->value_size);
-      ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1;
+      ch->recv_mod_value_size_expected = ntohl (mod->value_size);
+      ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
 
-      if (size < sizeof (*mod) + name_size + 1
-          || '\0' != (char *) &mod[1] + mod->name_size
+      if (psize < sizeof (*mod) + name_size + 1
+          || '\0' != *((char *) &mod[1] + name_size)
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
-        LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
+        LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
         GNUNET_break_op (0);
-        break;
+        recv_error (ch);
+        return;
       }
-
       ch->recv_state = MSG_STATE_MODIFIER;
-
-      GNUNET_PSYC_MessageCallback message_cb
-        = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-        ? ch->hist_message_cb
-        : ch->message_cb;
-
-      if (NULL != message_cb)
-        message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
-
       break;
     }
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
     {
-      ch->recv_mod_value_size += size - sizeof (*msg);
+      ch->recv_mod_value_size += psize - sizeof (*pmsg);
 
-      if (MSG_STATE_MODIFIER != ch->recv_state
+      if (!(MSG_STATE_MODIFIER == ch->recv_state
+            || MSG_STATE_MOD_CONT == ch->recv_state)
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message modifier continuation.\n");
+             "Dropping out of order message modifier continuation "
+             "!(%u == %u || %u == %u) || %lu < %lu.\n",
+             MSG_STATE_MODIFIER, ch->recv_state,
+             MSG_STATE_MOD_CONT, ch->recv_state,
+             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
         GNUNET_break_op (0);
-        recv_reset (ch);
-        break;
+        recv_error (ch);
+        return;
       }
-
-      GNUNET_PSYC_MessageCallback message_cb
-        = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-        ? ch->hist_message_cb
-        : ch->message_cb;
-
-      if (NULL != message_cb)
-        message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
       break;
     }
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
@@ -659,15 +837,30 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
           || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message data fragment.\n");
+             "Dropping out of order message data fragment "
+             "(%u < %u || %lu != %lu).\n",
+             ch->recv_state, MSG_STATE_METHOD,
+             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
+
         GNUNET_break_op (0);
-        recv_reset (ch);
-        break;
+        recv_error (ch);
+        return;
       }
-
       ch->recv_state = MSG_STATE_DATA;
       break;
     }
+    }
+
+    GNUNET_PSYC_MessageCallback message_cb
+      = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
+      ? ch->hist_message_cb
+      : ch->message_cb;
+
+    if (NULL != message_cb)
+      message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
+
+    switch (ptype)
+    {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
       recv_reset (ch);
@@ -677,6 +870,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
 }
 
 
+/**
+ * Handle incoming message acknowledgement from the PSYC service.
+ *
+ * @param ch The channel the acknowledgement is sent to.
+ */
+static void
+handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
+{
+  if (0 == ch->tmit_ack_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+    GNUNET_break (0);
+    return;
+  }
+  ch->tmit_ack_pending--;
+
+  switch (ch->tmit.state)
+  {
+  case MSG_STATE_MODIFIER:
+  case MSG_STATE_MOD_CONT:
+    if (GNUNET_NO == ch->tmit_paused)
+      channel_transmit_mod (ch);
+    break;
+
+  case MSG_STATE_DATA:
+    if (GNUNET_NO == ch->tmit_paused)
+      channel_transmit_data (ch);
+    break;
+
+  case MSG_STATE_END:
+  case MSG_STATE_CANCEL:
+    break;
+
+  default:
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Ignoring message ACK in state %u.\n", ch->tmit.state);
+  }
+}
+
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -695,7 +928,7 @@ message_handler (void *cls,
 
   if (NULL == msg)
   {
-    GNUNET_break (0);
+    // timeout / disconnected from server, reconnect
     reschedule_connect (ch);
     return;
   }
@@ -717,27 +950,18 @@ message_handler (void *cls,
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
     size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
     break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
-    size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
-    size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-    size_min = sizeof (struct GNUNET_MessageHeader);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
-  case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
     size_eq = sizeof (struct GNUNET_MessageHeader);
     break;
+  default:
+    GNUNET_break_op (0);
+    return;
   }
 
   if (! ((0 < size_eq && size == size_eq)
          || (0 < size_min && size_min <= size)))
   {
-    GNUNET_break (0);
-    reschedule_connect (ch);
+    GNUNET_break_op (0);
     return;
   }
 
@@ -753,70 +977,28 @@ message_handler (void *cls,
   }
   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
   {
-#if TODO
     struct CountersResult *cres = (struct CountersResult *) msg;
     slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
-    if (NULL != slv->join_ack_cb)
-      mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
-#endif
+    if (NULL != slv->join_cb)
+      slv->join_cb (ch->cb_cls, slv->max_message_id);
     break;
   }
-  case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
   {
-    ch->tmit_ack_pending = GNUNET_NO;
-
-    if (ch->is_master)
-    {
-      GNUNET_assert (NULL != mst->tmit);
-      switch (mst->tmit->state)
-      {
-      case MSG_STATE_MODIFIER:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_mod (mst);
-        break;
-
-      case MSG_STATE_MOD_CONT:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_mod (mst);
-        break;
-
-      case MSG_STATE_DATA:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_data (mst);
-        break;
-
-      case MSG_STATE_END:
-      case MSG_STATE_CANCEL:
-        if (NULL != mst->tmit)
-        {
-          GNUNET_free (mst->tmit);
-          mst->tmit = NULL;
-        }
-        else
-        {
-          LOG (GNUNET_ERROR_TYPE_WARNING,
-               "Ignoring transmit ack, there's no transmission going on.\n");
-        }
-        break;
-      default:
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Ignoring unexpected transmit ack.\n");
-      }
-    }
-    else
-    {
-      /* TODO: slave */
-    }
+    handle_psyc_message_ack (ch);
     break;
   }
 
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
-    handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
+    handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
     break;
   }
 
-  GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+  if (NULL != ch->client)
+  {
+    GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+  }
 }
 
 
@@ -1069,30 +1251,6 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
 }
 
 
-/* FIXME: split up value into <64K chunks and transmit the continuations in
- *        MOD_CONT msgs */
-static int
-send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
-{
-  struct GNUNET_PSYC_Channel *ch = cls;
-  size_t name_size = strlen (mod->name) + 1;
-  struct GNUNET_PSYC_MessageModifier *pmod;
-  struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
-                                              + name_size + mod->value_size);
-  pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
-  op->msg = (struct GNUNET_MessageHeader *) pmod;
-
-  pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
-  pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
-  pmod->name_size = htons (name_size);
-  memcpy (&pmod[1], mod->name, name_size);
-  memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
-
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-  return GNUNET_YES;
-}
-
-
 /**
  * Send a message to call a method to all members in the PSYC channel.
  *
@@ -1107,39 +1265,14 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
 struct GNUNET_PSYC_MasterTransmitHandle *
 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
                              const char *method_name,
-                             GNUNET_PSYC_MasterTransmitNotify notify_mod,
-                             GNUNET_PSYC_MasterTransmitNotify notify_data,
+                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                             GNUNET_PSYC_TransmitNotifyData notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags)
 {
-  GNUNET_assert (NULL != master);
-  struct GNUNET_PSYC_Channel *ch = &master->ch;
-  if (GNUNET_NO != ch->in_transmit)
-    return NULL;
-  ch->in_transmit = GNUNET_YES;
-
-  size_t size = strlen (method_name) + 1;
-  struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct OperationHandle *op
-    = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
-  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
-  op->msg = (struct GNUNET_MessageHeader *) pmeth;
-
-  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
-  pmeth->header.size = htons (sizeof (*pmeth) + size);
-  pmeth->flags = htonl (flags);
-  memcpy (&pmeth[1], method_name, size);
-
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-  transmit_next (ch);
-
-  master->tmit = GNUNET_malloc (sizeof (*master->tmit));
-  master->tmit->master = master;
-  master->tmit->notify_mod = notify_mod;
-  master->tmit->notify_data = notify_data;
-  master->tmit->notify_cls = notify_cls;
-  master->tmit->state = MSG_STATE_START; // FIXME
-  return master->tmit;
+  return (struct GNUNET_PSYC_MasterTransmitHandle *)
+    channel_transmit (&master->ch, method_name, notify_mod, notify_data,
+                      notify_cls, flags);
 }
 
 
@@ -1151,12 +1284,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
 void
 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 {
-  struct GNUNET_PSYC_Channel *ch = &th->master->ch;
-  if (GNUNET_NO == ch->tmit_ack_pending)
-  {
-    ch->tmit_paused = GNUNET_NO;
-    master_transmit_data (th->master);
-  }
+  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1168,10 +1296,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 void
 GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
 {
-  struct GNUNET_PSYC_Master *master = th->master;
-  struct GNUNET_PSYC_Channel *ch = &master->ch;
-  if (GNUNET_NO != ch->in_transmit)
-    return;
+  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1225,15 +1350,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
 {
   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
   struct GNUNET_PSYC_Channel *ch = &slv->ch;
-  struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req)
-                                                + relay_count * sizeof (*relays));
+  struct SlaveJoinRequest *req
+    = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
   req->header.size = htons (sizeof (*req)
                             + relay_count * sizeof (*relays));
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
   req->channel_key = *channel_key;
   req->slave_key = *slave_key;
   req->origin = *origin;
-  req->relay_count = relay_count;
+  req->relay_count = htonl (relay_count);
   memcpy (&req[1], relays, relay_count * sizeof (*relays));
 
   ch->message_cb = message_cb;
@@ -1246,6 +1371,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
 
+  slv->join_cb = slave_joined_cb;
   return slv;
 }
 
@@ -1271,9 +1397,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
  *
  * @param slave Slave handle.
  * @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or
- *            NULL.
- * @param notify Function to call when we are allowed to transmit (to get data).
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify.
  * @param flags Flags for the message being transmitted.
  * @return Transmission handle, NULL on error (i.e. more than one request
@@ -1282,12 +1407,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
 struct GNUNET_PSYC_SlaveTransmitHandle *
 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
                             const char *method_name,
-                            const struct GNUNET_ENV_Environment *env,
-                            GNUNET_PSYC_SlaveTransmitNotify notify,
+                            GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                            GNUNET_PSYC_TransmitNotifyData notify_data,
                             void *notify_cls,
                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
 {
-  return NULL;
+  return (struct GNUNET_PSYC_SlaveTransmitHandle *)
+    channel_transmit (&slave->ch, method_name,
+                      notify_mod, notify_data, notify_cls, flags);
 }
 
 
@@ -1297,9 +1424,9 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
  * @param th Handle of the request that is being resumed.
  */
 void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 {
-
+  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1311,7 +1438,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 void
 GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 {
-
+  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1325,7 +1452,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 struct GNUNET_PSYC_Channel *
 GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
 {
-  return (struct GNUNET_PSYC_Channel *) master;
+  return &master->ch;
 }
 
 
@@ -1338,7 +1465,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
 struct GNUNET_PSYC_Channel *
 GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
 {
-  return (struct GNUNET_PSYC_Channel *) slave;
+  return &slave->ch;
 }