fix build issues
[oweals/gnunet.git] / src / psycutil / psyc_message.c
index 303ba846684ce80e1c28c63890b7aea814394b7d..3421fd0ba8c384dbf2bc31610bc483034fb3053e 100644 (file)
@@ -39,13 +39,18 @@ struct GNUNET_PSYC_TransmitHandle
   /**
    * Client connection to service.
    */
-  struct GNUNET_CLIENT_MANAGER_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Message currently being received from the client.
    */
   struct GNUNET_MessageHeader *msg;
 
+  /**
+   * Envelope for @a msg
+   */
+  struct GNUNET_MQ_Envelope *env;
+
   /**
    * Callback to request next modifier from client.
    */
@@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
  * Create a transmission handle.
  */
 struct GNUNET_PSYC_TransmitHandle *
-GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
+GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
 {
   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
 
-  tmit->client = client;
+  tmit->mq = mq;
   return tmit;
 }
 
@@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
     {
       /* End of message or buffer is full, add it to transmission queue
        * and start with empty buffer */
-      tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
       tmit->msg->size = htons (tmit->msg->size);
-      GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+      GNUNET_MQ_send (tmit->mq, tmit->env);
+      tmit->env = NULL;
       tmit->msg = NULL;
       tmit->acks_pending++;
     }
     else
     {
       /* Message fits in current buffer, append */
-      tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
       GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
       tmit->msg->size += size;
     }
@@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
   if (NULL == tmit->msg && NULL != msg)
   {
     /* Empty buffer, copy over message. */
-    tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
+    tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+                                     GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+                                     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    /* store current message size in host byte order
+     * then later switch it to network byte order before sending */
     tmit->msg->size = sizeof (*tmit->msg) + size;
+
     GNUNET_memcpy (&tmit->msg[1], msg, size);
   }
 
@@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
   {
     /* End of message or buffer is full, add it to transmission queue. */
-    tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
     tmit->msg->size = htons (tmit->msg->size);
-    GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+    GNUNET_MQ_send (tmit->mq, tmit->env);
+    tmit->env = NULL;
     tmit->msg = NULL;
     tmit->acks_pending++;
   }
@@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
 
   size_t size = strlen (method_name) + 1;
   struct GNUNET_PSYC_MessageMethod *pmeth;
-  tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
+
+  tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+                                   GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+                                   GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  /* store current message size in host byte order
+   * then later switch it to network byte order before sending */
   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
 
   if (NULL != notify_mod)
@@ -827,6 +841,9 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
   }
   tmit->acks_pending--;
 
+  if (GNUNET_YES == tmit->paused)
+    return;
+
   switch (tmit->state)
   {
   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: