/**
* Client connection to service.
*/
- struct GNUNET_CLIENT_MANAGER_Connection *client;
+ struct GNUNET_MQ_Handle *mq;
/**
* Message currently being received from the client.
*/
struct GNUNET_MessageHeader *msg;
+ /**
+ * Envelope for @a msg
+ */
+ struct GNUNET_MQ_Envelope *env;
+
/**
* Callback to request next modifier from client.
*/
* Create a transmission handle.
*/
struct GNUNET_PSYC_TransmitHandle *
-GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
+GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
- tmit->client = client;
+ tmit->mq = mq;
return tmit;
}
{
/* End of message or buffer is full, add it to transmission queue
* and start with empty buffer */
- tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
tmit->msg->size = htons (tmit->msg->size);
- GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+ GNUNET_MQ_send (tmit->mq, tmit->env);
+ tmit->env = NULL;
tmit->msg = NULL;
tmit->acks_pending++;
}
else
{
/* Message fits in current buffer, append */
- tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
tmit->msg->size += size;
}
if (NULL == tmit->msg && NULL != msg)
{
/* Empty buffer, copy over message. */
- tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
+ tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+ GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ /* store current message size in host byte order
+ * then later switch it to network byte order before sending */
tmit->msg->size = sizeof (*tmit->msg) + size;
+
GNUNET_memcpy (&tmit->msg[1], msg, size);
}
< tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
{
/* End of message or buffer is full, add it to transmission queue. */
- tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
tmit->msg->size = htons (tmit->msg->size);
- GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+ GNUNET_MQ_send (tmit->mq, tmit->env);
+ tmit->env = NULL;
tmit->msg = NULL;
tmit->acks_pending++;
}
size_t size = strlen (method_name) + 1;
struct GNUNET_PSYC_MessageMethod *pmeth;
- tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
+
+ tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+ GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ /* store current message size in host byte order
+ * then later switch it to network byte order before sending */
tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
if (NULL != notify_mod)
}
tmit->acks_pending--;
+ if (GNUNET_YES == tmit->paused)
+ return;
+
switch (tmit->state)
{
case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: