fix build issues
[oweals/gnunet.git] / src / psycutil / psyc_message.c
index 8c214d2b642f4f90a607ccd503461e878d577e85..3421fd0ba8c384dbf2bc31610bc483034fb3053e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * This file is part of GNUnet
- * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 GNUnet e.V.
  *
  * GNUnet is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published
@@ -19,7 +19,7 @@
  */
 
 /**
- * @file psycstore/psyc_util_lib.c
+ * @file psycutil/psyc_message.c
  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
  * @author Gabor X Toth
  */
@@ -39,13 +39,18 @@ struct GNUNET_PSYC_TransmitHandle
   /**
    * Client connection to service.
    */
-  struct GNUNET_CLIENT_MANAGER_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Message currently being received from the client.
    */
   struct GNUNET_MessageHeader *msg;
 
+  /**
+   * Envelope for @a msg
+   */
+  struct GNUNET_MQ_Envelope *env;
+
   /**
    * Callback to request next modifier from client.
    */
@@ -135,7 +140,7 @@ struct GNUNET_PSYC_ReceiveHandle
   /**
    * Public key of the slave from which a message is being received.
    */
-  struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
+  struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
 
   /**
    * State of the currently being received message from the PSYC service.
@@ -215,7 +220,7 @@ GNUNET_PSYC_message_create (const char *method_name,
   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
-  memcpy (&pmeth[1], method_name, method_name_size);
+  GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
 
   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
   if (NULL != env)
@@ -234,9 +239,9 @@ GNUNET_PSYC_message_create (const char *method_name,
       pmod->name_size = htons (mod_name_size);
       pmod->value_size = htonl (mod->value_size);
 
-      memcpy (&pmod[1], mod->name, mod_name_size);
+      GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
       if (0 < mod->value_size)
-        memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
+        GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
 
       mod = mod->next;
     }
@@ -249,7 +254,7 @@ GNUNET_PSYC_message_create (const char *method_name,
     p += pmsg->size;
     pmsg->size = htons (pmsg->size);
     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
-    memcpy (&pmsg[1], data, data_size);
+    GNUNET_memcpy (&pmsg[1], data, data_size);
   }
 
   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
@@ -267,38 +272,54 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
 {
   uint16_t size = ntohs (msg->size);
   uint16_t type = ntohs (msg->type);
-  GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
+
+  GNUNET_log (kind,
+              "Message of type %d and size %u:\n",
+              type,
+              size);
   switch (type)
   {
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
   {
-    struct GNUNET_PSYC_MessageHeader *pmsg
-      = (struct GNUNET_PSYC_MessageHeader *) msg;
-    GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
-                GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
+    const struct GNUNET_PSYC_MessageHeader *pmsg
+      = (const struct GNUNET_PSYC_MessageHeader *) msg;
+    GNUNET_log (kind,
+                "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
+                GNUNET_ntohll (pmsg->message_id),
+                ntohl (pmsg->flags));
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
   {
-    struct GNUNET_PSYC_MessageMethod *meth
-      = (struct GNUNET_PSYC_MessageMethod *) msg;
-    GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
+    const struct GNUNET_PSYC_MessageMethod *meth
+      = (const struct GNUNET_PSYC_MessageMethod *) msg;
+    GNUNET_log (kind,
+                "\t%.*s\n",
+                (int) (size - sizeof (*meth)),
+                (const char *) &meth[1]);
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
   {
-    struct GNUNET_PSYC_MessageModifier *mod
-      = (struct GNUNET_PSYC_MessageModifier *) msg;
+    const struct GNUNET_PSYC_MessageModifier *mod
+      = (const struct GNUNET_PSYC_MessageModifier *) msg;
     uint16_t name_size = ntohs (mod->name_size);
     char oper = ' ' < mod->oper ? mod->oper : ' ';
-    GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
-                size - sizeof (*mod) - name_size,
-                ((char *) &mod[1]) + name_size);
+    GNUNET_log (kind,
+                "\t%c%.*s\t%.*s\n",
+                oper,
+                (int) name_size,
+                (const char *) &mod[1],
+                (int) (size - sizeof (*mod) - name_size),
+                ((const char *) &mod[1]) + name_size);
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-    GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
+    GNUNET_log (kind,
+                "\t%.*s\n",
+                (int) (size - sizeof (*msg)),
+                (const char *) &msg[1]);
     break;
   }
 }
@@ -311,10 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
  * Create a transmission handle.
  */
 struct GNUNET_PSYC_TransmitHandle *
-GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
+GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
 {
-  struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
-  tmit->client = client;
+  struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
+
+  tmit->mq = mq;
   return tmit;
 }
 
@@ -361,17 +383,16 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
     {
       /* End of message or buffer is full, add it to transmission queue
        * and start with empty buffer */
-      tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
       tmit->msg->size = htons (tmit->msg->size);
-      GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+      GNUNET_MQ_send (tmit->mq, tmit->env);
+      tmit->env = NULL;
       tmit->msg = NULL;
       tmit->acks_pending++;
     }
     else
     {
       /* Message fits in current buffer, append */
-      tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
-      memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
+      GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
       tmit->msg->size += size;
     }
   }
@@ -379,9 +400,14 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
   if (NULL == tmit->msg && NULL != msg)
   {
     /* Empty buffer, copy over message. */
-    tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
+    tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+                                     GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+                                     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    /* store current message size in host byte order
+     * then later switch it to network byte order before sending */
     tmit->msg->size = sizeof (*tmit->msg) + size;
-    memcpy (&tmit->msg[1], msg, size);
+
+    GNUNET_memcpy (&tmit->msg[1], msg, size);
   }
 
   if (NULL != tmit->msg
@@ -390,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
   {
     /* End of message or buffer is full, add it to transmission queue. */
-    tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
     tmit->msg->size = htons (tmit->msg->size);
-    GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+    GNUNET_MQ_send (tmit->mq, tmit->env);
+    tmit->env = NULL;
     tmit->msg = NULL;
     tmit->acks_pending++;
   }
@@ -630,8 +656,8 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
       tmit->mod_value = tmit->mod->value + value_size;
     }
 
-    memcpy (data, tmit->mod->name, name_size);
-    memcpy ((char *)data + name_size, tmit->mod->value, value_size);
+    GNUNET_memcpy (data, tmit->mod->name, name_size);
+    GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
     return GNUNET_NO;
   }
   else
@@ -659,7 +685,7 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
     }
 
     *data_size = value_size;
-    memcpy (data, value, value_size);
+    GNUNET_memcpy (data, value, value_size);
     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
   }
 }
@@ -705,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
 
   size_t size = strlen (method_name) + 1;
   struct GNUNET_PSYC_MessageMethod *pmeth;
-  tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
+
+  tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+                                   GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+                                   GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  /* store current message size in host byte order
+   * then later switch it to network byte order before sending */
   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
 
   if (NULL != notify_mod)
@@ -740,7 +771,7 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
   pmeth->header.size = htons (sizeof (*pmeth) + size);
   pmeth->flags = htonl (flags);
-  memcpy (&pmeth[1], method_name, size);
+  GNUNET_memcpy (&pmeth[1], method_name, size);
 
   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
   tmit->notify_data = notify_data;
@@ -810,6 +841,9 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
   }
   tmit->acks_pending--;
 
+  if (GNUNET_YES == tmit->paused)
+    return;
+
   switch (tmit->state)
   {
   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
@@ -879,11 +913,10 @@ static void
 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
 {
   if (NULL != recv->message_part_cb)
-    recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
-                           0, NULL);
+    recv->message_part_cb (recv->cb_cls, NULL, NULL);
 
   if (NULL != recv->message_cb)
-    recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
+    recv->message_cb (recv->cb_cls, NULL);
 
   GNUNET_PSYC_receive_reset (recv);
 }
@@ -904,7 +937,6 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
 {
   uint16_t size = ntohs (msg->header.size);
   uint32_t flags = ntohl (msg->flags);
-  uint64_t message_id;
 
   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
                            (struct GNUNET_MessageHeader *) msg);
@@ -913,7 +945,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
   {
     recv->message_id = GNUNET_ntohll (msg->message_id);
     recv->flags = flags;
-    recv->slave_key = msg->slave_key;
+    recv->slave_pub_key = msg->slave_pub_key;
     recv->mod_value_size = 0;
     recv->mod_value_size_expected = 0;
   }
@@ -936,7 +968,6 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
     recv_error (recv);
     return GNUNET_SYSERR;
   }
-  message_id = recv->message_id;
 
   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
 
@@ -1099,10 +1130,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
     }
 
     if (NULL != recv->message_part_cb)
-      recv->message_part_cb (recv->cb_cls, &recv->slave_key,
-                             recv->message_id, recv->flags,
-                             0, // FIXME: data_offset
-                             pmsg);
+      recv->message_part_cb (recv->cb_cls, msg, pmsg);
 
     switch (ptype)
     {
@@ -1114,7 +1142,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
   }
 
   if (NULL != recv->message_cb)
-    recv->message_cb (recv->cb_cls, message_id, flags, msg);
+    recv->message_cb (recv->cb_cls, msg);
   return GNUNET_OK;
 }
 
@@ -1180,23 +1208,22 @@ struct ParseMessageClosure
 
 static void
 parse_message_part_cb (void *cls,
-                       const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
-                       uint64_t message_id, uint32_t flags, uint64_t data_offset,
-                       const struct GNUNET_MessageHeader *msg)
+                       const struct GNUNET_PSYC_MessageHeader *msg,
+                       const struct GNUNET_MessageHeader *pmsg)
 {
   struct ParseMessageClosure *pmc = cls;
-  if (NULL == msg)
+  if (NULL == pmsg)
   {
     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
     return;
   }
 
-  switch (ntohs (msg->type))
+  switch (ntohs (pmsg->type))
   {
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
   {
     struct GNUNET_PSYC_MessageMethod *
-      pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
+      pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
     *pmc->method_name = (const char *) &pmeth[1];
     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
     break;
@@ -1205,7 +1232,7 @@ parse_message_part_cb (void *cls,
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
   {
     struct GNUNET_PSYC_MessageModifier *
-      pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
+      pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
 
     const char *name = (const char *) &pmod[1];
     const void *value = name + ntohs (pmod->name_size);
@@ -1216,8 +1243,8 @@ parse_message_part_cb (void *cls,
   }
 
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-    *pmc->data = &msg[1];
-    *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
+    *pmc->data = &pmsg[1];
+    *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
     break;
 
@@ -1241,7 +1268,7 @@ parse_message_part_cb (void *cls,
  * @param env
  *        The environment for the message with a list of modifiers.
  * @param[out] data
- *        Pointer to data inside @a pmsg.
+ *        Pointer to data inside @a msg.
  * @param[out] data_size
  *        Size of @data is written here.
  *
@@ -1292,7 +1319,7 @@ GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
   pmsg->fragment_offset = mmsg->fragment_offset;
   pmsg->flags = htonl (flags);
 
-  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+  GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
 }
 
 
@@ -1324,6 +1351,6 @@ GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *m
     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
-  memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
+  GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
   return pmsg;
 }