adding extended proxy support for http(s) client
[oweals/gnunet.git] / src / psyc / psyc_api.c
index 20394bbce9658db484ab54112dc0e77c69f6e0e0..16e8106d453730c55852fe1aa891323e748593d4 100644 (file)
@@ -48,11 +48,29 @@ struct OperationHandle
   struct GNUNET_MessageHeader *msg;
 };
 
+
+/**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_ChannelTransmitHandle
+{
+  struct GNUNET_PSYC_Channel *ch;
+  GNUNET_PSYC_TransmitNotifyModifier notify_mod;
+  GNUNET_PSYC_TransmitNotifyData notify_data;
+  void *notify_cls;
+  enum MessageState state;
+};
+
 /**
  * Handle to access PSYC channel operations for both the master and slaves.
  */
 struct GNUNET_PSYC_Channel
 {
+  /**
+   * Transmission handle;
+   */
+  struct GNUNET_PSYC_ChannelTransmitHandle tmit;
+
   /**
    * Configuration to use.
    */
@@ -123,6 +141,11 @@ struct GNUNET_PSYC_Channel
    */
   uint64_t recv_message_id;
 
+  /**
+   * Public key of the slave from which a message is being received.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
+
   /**
    * State of the currently being received message from the PSYC service.
    */
@@ -170,19 +193,6 @@ struct GNUNET_PSYC_Channel
 };
 
 
-/**
- * Handle for a pending PSYC transmission operation.
- */
-struct GNUNET_PSYC_MasterTransmitHandle
-{
-  struct GNUNET_PSYC_Master *master;
-  GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
-  GNUNET_PSYC_MasterTransmitNotify notify_data;
-  void *notify_cls;
-  enum MessageState state;
-};
-
-
 /**
  * Handle for the master of a PSYC channel.
  */
@@ -190,8 +200,6 @@ struct GNUNET_PSYC_Master
 {
   struct GNUNET_PSYC_Channel ch;
 
-  struct GNUNET_PSYC_MasterTransmitHandle *tmit;
-
   GNUNET_PSYC_MasterStartCallback start_cb;
 
   uint64_t max_message_id;
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master
 struct GNUNET_PSYC_Slave
 {
   struct GNUNET_PSYC_Channel ch;
+
+  GNUNET_PSYC_SlaveJoinCallback join_cb;
+
+  uint64_t max_message_id;
 };
 
 
@@ -251,7 +263,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst);
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
 
 
 /**
@@ -302,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch)
   ch->recv_state = MSG_STATE_START;
   ch->recv_flags = 0;
   ch->recv_message_id = 0;
-  ch->recv_mod_value_size =0;
+  //FIXME: ch->recv_slave_key = { 0 };
+  ch->recv_mod_value_size = 0;
   ch->recv_mod_value_size_expected = 0;
 }
 
@@ -323,7 +336,7 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
 
 
 /**
- * Queue an incoming message part for transmission to the PSYC service.
+ * Queue a message part for transmission to the PSYC service.
  *
  * The message part is added to the current message buffer.
  * When this buffer is full, it is added to the transmission queue.
@@ -377,10 +390,11 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
     op->msg->size = sizeof (*op->msg) + size;
     memcpy (&op->msg[1], msg, size);
   }
+
   if (NULL != op
-      && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
-                  < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+      && (GNUNET_YES == end
+          || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+              < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
   {
     /* End of message or buffer is full, add it to transmission queue. */
     op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -390,6 +404,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
     ch->tmit_ack_pending++;
   }
 
+  if (GNUNET_YES == end)
+    ch->in_transmit = GNUNET_NO;
+
   transmit_next (ch);
 }
 
@@ -400,15 +417,14 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
  * @param mst Master handle.
  */
 static void
-master_transmit_mod (struct GNUNET_PSYC_Master *mst)
+channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
 {
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
   uint16_t max_data_size, data_size;
   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
   int notify_ret;
 
-  switch (mst->tmit->state)
+  switch (ch->tmit.state)
   {
   case MSG_STATE_MODIFIER:
   {
@@ -417,13 +433,12 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
-    notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
-                                        &data_size, &mod[1], &mod->oper);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
+                                      &mod->oper, &mod->value_size);
     mod->name_size = strnlen ((char *) &mod[1], data_size);
     if (mod->name_size < data_size)
     {
-      mod->oper = htons (mod->oper);
-      mod->value_size = htons (data_size - 1 - mod->name_size);
+      mod->value_size = htonl (mod->value_size);
       mod->name_size = htons (mod->name_size);
     }
     else if (0 < data_size)
@@ -436,10 +451,10 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
   case MSG_STATE_MOD_CONT:
   {
     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);    
+    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
     msg->size = sizeof (struct GNUNET_MessageHeader);
-    notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
-                                        &data_size, &msg[1], NULL);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
+                                      &data_size, &msg[1], NULL, NULL);
     break;
   }
   default:
@@ -454,27 +469,28 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
       ch->tmit_paused = GNUNET_YES;
       return;
     }
-    mst->tmit->state = MSG_STATE_MOD_CONT;
+    ch->tmit.state = MSG_STATE_MOD_CONT;
     break;
 
   case GNUNET_YES:
     if (0 == data_size)
     {
       /* End of modifiers. */
-      mst->tmit->state = MSG_STATE_DATA;
+      ch->tmit.state = MSG_STATE_DATA;
       if (0 == ch->tmit_ack_pending)
-        master_transmit_data (mst);
+        channel_transmit_data (ch);
 
       return;
     }
-    mst->tmit->state = MSG_STATE_MODIFIER;
+    ch->tmit.state = MSG_STATE_MODIFIER;
     break;
 
   default:
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MasterTransmitNotify returned error when requesting a modifier.\n");
+         "MasterTransmitNotifyModifier returned error "
+         "when requesting a modifier.\n");
 
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
 
@@ -489,7 +505,7 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
     queue_message (ch, msg, GNUNET_NO);
   }
 
-  master_transmit_mod (mst);
+  channel_transmit_mod (ch);
 }
 
 
@@ -499,17 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
  * @param mst Master handle.
  */
 static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst)
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
 {
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
 
   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
 
-  int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
-                                           &data_size, &msg[1]);
+  int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
+                                         &data_size, &msg[1]);
   switch (notify_ret)
   {
   case GNUNET_NO:
@@ -522,14 +537,14 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
     break;
 
   case GNUNET_YES:
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_END;
     break;
 
   default:
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "MasterTransmitNotify returned error when requesting data.\n");
 
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
     queue_message (ch, msg, GNUNET_YES);
@@ -553,6 +568,86 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
 }
 
 
+/**
+ * Send a message to a channel.
+ *
+ * @param ch Handle to the PSYC channel.
+ * @param method_name Which method should be invoked.
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
+ * @param notify_cls Closure for @a notify_mod and @a notify_data.
+ * @param flags Flags for the message being transmitted.
+ * @return Transmission handle, NULL on error (i.e. more than one request queued).
+ */
+static struct GNUNET_PSYC_ChannelTransmitHandle *
+channel_transmit (struct GNUNET_PSYC_Channel *ch,
+                  const char *method_name,
+                  GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                  GNUNET_PSYC_TransmitNotifyData notify_data,
+                  void *notify_cls,
+                  uint32_t flags)
+{
+  if (GNUNET_NO != ch->in_transmit)
+    return NULL;
+  ch->in_transmit = GNUNET_YES;
+
+  size_t size = strlen (method_name) + 1;
+  struct GNUNET_PSYC_MessageMethod *pmeth;
+  struct OperationHandle *op;
+
+  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
+                                     + sizeof (*pmeth) + size);
+  op->msg = (struct GNUNET_MessageHeader *) &op[1];
+  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
+
+  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
+  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
+  pmeth->header.size = htons (sizeof (*pmeth) + size);
+  pmeth->flags = htonl (flags);
+  memcpy (&pmeth[1], method_name, size);
+
+  ch->tmit.ch = ch;
+  ch->tmit.notify_mod = notify_mod;
+  ch->tmit.notify_data = notify_data;
+  ch->tmit.notify_cls = notify_cls;
+  ch->tmit.state = MSG_STATE_MODIFIER;
+
+  channel_transmit_mod (ch);
+  return &ch->tmit;
+}
+
+
+/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+static void
+channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+  struct GNUNET_PSYC_Channel *ch = th->ch;
+  if (0 == ch->tmit_ack_pending)
+  {
+    ch->tmit_paused = GNUNET_NO;
+    channel_transmit_data (ch);
+  }
+}
+
+
+/**
+ * Abort transmission request to channel.
+ *
+ * @param th Handle of the request that is being aborted.
+ */
+static void
+channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+  struct GNUNET_PSYC_Channel *ch = th->ch;
+  if (GNUNET_NO == ch->in_transmit)
+    return;
+}
+
+
 /**
  * Handle incoming message from the PSYC service.
  *
@@ -564,14 +659,22 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
                      const struct GNUNET_PSYC_MessageHeader *msg)
 {
   uint16_t size = ntohs (msg->header.size);
+  uint32_t flags = ntohl (msg->flags);
+
+  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
+                           (struct GNUNET_MessageHeader *) msg);
 
   if (MSG_STATE_START == ch->recv_state)
   {
     ch->recv_message_id = GNUNET_ntohll (msg->message_id);
-    ch->recv_flags = ntohl (msg->flags);
+    ch->recv_flags = flags;
+    ch->recv_slave_key = msg->slave_key;
+    ch->recv_mod_value_size = 0;
+    ch->recv_mod_value_size_expected = 0;
   }
   else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
   {
+    // FIXME
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
          GNUNET_ntohll (msg->message_id), ch->recv_message_id);
@@ -579,11 +682,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
     recv_error (ch);
     return;
   }
-  else if (ntohl (msg->flags) != ch->recv_flags)
+  else if (flags != ch->recv_flags)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message flags. Got: %lu, expected: %lu\n",
-         ntohl (msg->flags), ch->recv_flags);
+         flags, ch->recv_flags);
     GNUNET_break_op (0);
     recv_error (ch);
     return;
@@ -599,19 +702,19 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
     ptype = ntohs (pmsg->type);
     size_eq = size_min = 0;
 
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part of type %u and size %u from PSYC.\n",
-                ptype, psize);
-
     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Discarding message of type %u with invalid size %u.\n",
+                  "Dropping message of type %u with invalid size %u.\n",
                   ptype, psize);
       recv_error (ch);
       return;
     }
 
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received message part from PSYC.\n");
+    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
     switch (ptype)
     {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
@@ -652,7 +755,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
       if (MSG_STATE_START != ch->recv_state)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message method.\n");
+             "Dropping out of order message method (%u).\n",
+             ch->recv_state);
         /* It is normal to receive an incomplete message right after connecting,
          * but should not happen later.
          * FIXME: add a check for this condition.
@@ -665,7 +769,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
       if ('\0' != *((char *) meth + psize - 1))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding message with malformed method. "
+             "Dropping message with malformed method. "
              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
         GNUNET_break_op (0);
         recv_error (ch);
@@ -681,7 +785,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
             || MSG_STATE_MOD_CONT == ch->recv_state))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message modifier.\n");
+             "Dropping out of order message modifier (%u).\n",
+             ch->recv_state);
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -691,14 +796,14 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
 
       uint16_t name_size = ntohs (mod->name_size);
-      ch->recv_mod_value_size_expected = ntohs (mod->value_size);
+      ch->recv_mod_value_size_expected = ntohl (mod->value_size);
       ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
 
       if (psize < sizeof (*mod) + name_size + 1
           || '\0' != *((char *) &mod[1] + name_size)
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
-        LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
+        LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -715,7 +820,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message modifier continuation.\n");
+             "Dropping out of order message modifier continuation "
+             "!(%u == %u || %u == %u) || %lu < %lu.\n",
+             MSG_STATE_MODIFIER, ch->recv_state,
+             MSG_STATE_MOD_CONT, ch->recv_state,
+             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -728,7 +837,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
           || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message data fragment.\n");
+             "Dropping out of order message data fragment "
+             "(%u < %u || %lu != %lu).\n",
+             ch->recv_state, MSG_STATE_METHOD,
+             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
+
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -757,6 +870,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
 }
 
 
+/**
+ * Handle incoming message acknowledgement from the PSYC service.
+ *
+ * @param ch The channel the acknowledgement is sent to.
+ */
+static void
+handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
+{
+  if (0 == ch->tmit_ack_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+    GNUNET_break (0);
+    return;
+  }
+  ch->tmit_ack_pending--;
+
+  switch (ch->tmit.state)
+  {
+  case MSG_STATE_MODIFIER:
+  case MSG_STATE_MOD_CONT:
+    if (GNUNET_NO == ch->tmit_paused)
+      channel_transmit_mod (ch);
+    break;
+
+  case MSG_STATE_DATA:
+    if (GNUNET_NO == ch->tmit_paused)
+      channel_transmit_data (ch);
+    break;
+
+  case MSG_STATE_END:
+  case MSG_STATE_CANCEL:
+    break;
+
+  default:
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Ignoring message ACK in state %u.\n", ch->tmit.state);
+  }
+}
+
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -775,7 +928,7 @@ message_handler (void *cls,
 
   if (NULL == msg)
   {
-    GNUNET_break (0);
+    // timeout / disconnected from server, reconnect
     reschedule_connect (ch);
     return;
   }
@@ -824,63 +977,15 @@ message_handler (void *cls,
   }
   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
   {
-#if TODO
     struct CountersResult *cres = (struct CountersResult *) msg;
     slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
-    if (NULL != slv->join_ack_cb)
-      mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
-#endif
+    if (NULL != slv->join_cb)
+      slv->join_cb (ch->cb_cls, slv->max_message_id);
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
   {
-    if (0 == ch->tmit_ack_pending)
-    {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
-      GNUNET_break (0);
-      break;
-    }
-    ch->tmit_ack_pending--;
-
-    if (ch->is_master)
-    {
-      GNUNET_assert (NULL != mst->tmit);
-      switch (mst->tmit->state)
-      {
-      case MSG_STATE_MODIFIER:
-      case MSG_STATE_MOD_CONT:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_mod (mst);
-        break;
-
-      case MSG_STATE_DATA:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_data (mst);
-        break;
-
-      case MSG_STATE_END:
-      case MSG_STATE_CANCEL:
-        if (NULL != mst->tmit)
-        {
-          GNUNET_free (mst->tmit);
-          mst->tmit = NULL;
-        }
-        else
-        {
-          LOG (GNUNET_ERROR_TYPE_WARNING,
-               "Ignoring message ACK, there's no transmission going on.\n");
-          GNUNET_break (0);
-        }
-        break;
-      default:
-        LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Ignoring message ACK in state %u.\n", mst->tmit->state);
-      }
-    }
-    else
-    {
-      /* TODO: slave */
-    }
+    handle_psyc_message_ack (ch);
     break;
   }
 
@@ -1106,8 +1211,6 @@ void
 GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
 {
   disconnect (master);
-  if (NULL != master->tmit)
-    GNUNET_free (master->tmit);
   GNUNET_free (master);
 }
 
@@ -1162,41 +1265,14 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
 struct GNUNET_PSYC_MasterTransmitHandle *
 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
                              const char *method_name,
-                             GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod,
-                             GNUNET_PSYC_MasterTransmitNotify notify_data,
+                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                             GNUNET_PSYC_TransmitNotifyData notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags)
 {
-  GNUNET_assert (NULL != master);
-  struct GNUNET_PSYC_Channel *ch = &master->ch;
-  if (GNUNET_NO != ch->in_transmit)
-    return NULL;
-  ch->in_transmit = GNUNET_YES;
-
-  size_t size = strlen (method_name) + 1;
-  struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct OperationHandle *op;
-
-  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
-                                     + sizeof (*pmeth) + size);
-  op->msg = (struct GNUNET_MessageHeader *) &op[1];
-  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
-
-  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
-  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
-  pmeth->header.size = htons (sizeof (*pmeth) + size);
-  pmeth->flags = htonl (flags);
-  memcpy (&pmeth[1], method_name, size);
-
-  master->tmit = GNUNET_malloc (sizeof (*master->tmit));
-  master->tmit->master = master;
-  master->tmit->notify_mod = notify_mod;
-  master->tmit->notify_data = notify_data;
-  master->tmit->notify_cls = notify_cls;
-  master->tmit->state = MSG_STATE_MODIFIER;
-
-  master_transmit_mod (master);
-  return master->tmit;
+  return (struct GNUNET_PSYC_MasterTransmitHandle *)
+    channel_transmit (&master->ch, method_name, notify_mod, notify_data,
+                      notify_cls, flags);
 }
 
 
@@ -1208,12 +1284,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
 void
 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 {
-  struct GNUNET_PSYC_Channel *ch = &th->master->ch;
-  if (0 == ch->tmit_ack_pending)
-  {
-    ch->tmit_paused = GNUNET_NO;
-    master_transmit_data (th->master);
-  }
+  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1225,10 +1296,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 void
 GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
 {
-  struct GNUNET_PSYC_Master *master = th->master;
-  struct GNUNET_PSYC_Channel *ch = &master->ch;
-  if (GNUNET_NO != ch->in_transmit)
-    return;
+  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1282,15 +1350,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
 {
   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
   struct GNUNET_PSYC_Channel *ch = &slv->ch;
-  struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req)
-                                                + relay_count * sizeof (*relays));
+  struct SlaveJoinRequest *req
+    = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
   req->header.size = htons (sizeof (*req)
                             + relay_count * sizeof (*relays));
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
   req->channel_key = *channel_key;
   req->slave_key = *slave_key;
   req->origin = *origin;
-  req->relay_count = relay_count;
+  req->relay_count = htonl (relay_count);
   memcpy (&req[1], relays, relay_count * sizeof (*relays));
 
   ch->message_cb = message_cb;
@@ -1303,6 +1371,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
 
+  slv->join_cb = slave_joined_cb;
   return slv;
 }
 
@@ -1328,9 +1397,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
  *
  * @param slave Slave handle.
  * @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or
- *            NULL.
- * @param notify Function to call when we are allowed to transmit (to get data).
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify.
  * @param flags Flags for the message being transmitted.
  * @return Transmission handle, NULL on error (i.e. more than one request
@@ -1339,12 +1407,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
 struct GNUNET_PSYC_SlaveTransmitHandle *
 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
                             const char *method_name,
-                            const struct GNUNET_ENV_Environment *env,
-                            GNUNET_PSYC_SlaveTransmitNotify notify,
+                            GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                            GNUNET_PSYC_TransmitNotifyData notify_data,
                             void *notify_cls,
                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
 {
-  return NULL;
+  return (struct GNUNET_PSYC_SlaveTransmitHandle *)
+    channel_transmit (&slave->ch, method_name,
+                      notify_mod, notify_data, notify_cls, flags);
 }
 
 
@@ -1354,9 +1424,9 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
  * @param th Handle of the request that is being resumed.
  */
 void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 {
-
+  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1368,7 +1438,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 void
 GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 {
-
+  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1382,7 +1452,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 struct GNUNET_PSYC_Channel *
 GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
 {
-  return (struct GNUNET_PSYC_Channel *) master;
+  return &master->ch;
 }
 
 
@@ -1395,7 +1465,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
 struct GNUNET_PSYC_Channel *
 GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
 {
-  return (struct GNUNET_PSYC_Channel *) slave;
+  return &slave->ch;
 }