-ensure stats queues do not grow too big
[oweals/gnunet.git] / src / testbed / testbed_api.c
index aad5055ef26bb56defba79b67adf5757863b8242..7c0ed1f02924c60992a50c954b065837152ad27d 100644 (file)
@@ -1288,7 +1288,6 @@ handle_barrier_status (void *cls,
  *
  * @param controller the handle to the controller
  * @param msg the message to queue
- * @deprecated
  */
 void
 GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
@@ -1328,17 +1327,27 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
  *           operation
  */
 struct OperationContext *
-GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
-                                       *controller, uint64_t operation_id,
+GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller,
+                                       uint64_t operation_id,
                                        const struct GNUNET_MessageHeader *msg,
                                        GNUNET_CLIENT_MessageHandler cc,
                                        void *cc_cls)
 {
   struct OperationContext *opc;
   struct ForwardedOperationData *data;
-  struct GNUNET_MessageHeader *dup_msg;
-  uint16_t msize;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_MessageHeader *m2;
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
 
+  env = GNUNET_MQ_msg_extra (m2,
+                             size - sizeof (*m2),
+                             type);
+  memcpy (m2,
+          msg,
+          size);
+  GNUNET_MQ_send (controller->mq,
+                  env);
   data = GNUNET_new (struct ForwardedOperationData);
   data->cc = cc;
   data->cc_cls = cc_cls;
@@ -1347,11 +1356,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
   opc->type = OP_FORWARDED;
   opc->data = data;
   opc->id = operation_id;
-  msize = ntohs (msg->size);
-  dup_msg = GNUNET_malloc (msize);
-  (void) memcpy (dup_msg, msg, msize);
-  GNUNET_TESTBED_queue_message_ (opc->c, dup_msg);
-  GNUNET_TESTBED_insert_opc_ (controller, opc);
+  GNUNET_TESTBED_insert_opc_ (controller,
+                              opc);
   return opc;
 }
 
@@ -1365,7 +1371,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
 void
 GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc)
 {
-  GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+  GNUNET_TESTBED_remove_opc_ (opc->c,
+                              opc);
   GNUNET_free (opc->data);
   GNUNET_free (opc);
 }
@@ -1545,27 +1552,29 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host,
   GNUNET_MQ_hd_var_size (barrier_status,
                          GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS,
                          const struct GNUNET_TESTBED_BarrierStatusMsg);
-   struct GNUNET_TESTBED_Controller *controller
-     = GNUNET_new (struct GNUNET_TESTBED_Controller);
-   struct GNUNET_MQ_MessageHandler handlers[] = {
-     make_add_host_confirm_handler (controller),
-     make_peer_conevent_handler (controller),
-     make_opsuccess_handler (controller),
-     make_op_fail_event_handler (controller),
-     make_peer_create_success_handler (controller),
-     make_peer_event_handler (controller),
-     make_peer_config_handler (controller),
-     make_slave_config_handler (controller),
-     make_link_controllers_result_handler (controller),
-     make_barrier_status_handler (controller),
-     GNUNET_MQ_handler_end ()
+  struct GNUNET_TESTBED_Controller *controller
+    = GNUNET_new (struct GNUNET_TESTBED_Controller);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_add_host_confirm_handler (controller),
+    make_peer_conevent_handler (controller),
+    make_opsuccess_handler (controller),
+    make_op_fail_event_handler (controller),
+    make_peer_create_success_handler (controller),
+    make_peer_event_handler (controller),
+    make_peer_config_handler (controller),
+    make_slave_config_handler (controller),
+    make_link_controllers_result_handler (controller),
+    make_barrier_status_handler (controller),
+    GNUNET_MQ_handler_end ()
   };
   struct GNUNET_TESTBED_InitMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
   const struct GNUNET_CONFIGURATION_Handle *cfg;
   const char *controller_hostname;
   unsigned long long max_parallel_operations;
   unsigned long long max_parallel_service_connections;
   unsigned long long max_parallel_topology_config_operations;
+  size_t slen;
 
   GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host)));
   if (GNUNET_OK !=
@@ -1626,18 +1635,17 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host,
   controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
   if (NULL == controller_hostname)
     controller_hostname = "127.0.0.1";
-  msg =
-      GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) +
-                     strlen (controller_hostname) + 1);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT);
-  msg->header.size =
-      htons (sizeof (struct GNUNET_TESTBED_InitMessage) +
-             strlen (controller_hostname) + 1);
+  slen = strlen (controller_hostname) + 1;
+  env = GNUNET_MQ_msg_extra (msg,
+                             slen,
+                             GNUNET_MESSAGE_TYPE_TESTBED_INIT);
   msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host));
   msg->event_mask = GNUNET_htonll (controller->event_mask);
-  strcpy ((char *) &msg[1], controller_hostname);
-  GNUNET_TESTBED_queue_message_ (controller,
-                                 (struct GNUNET_MessageHeader *) msg);
+  memcpy (&msg[1],
+          controller_hostname,
+          slen);
+  GNUNET_MQ_send (controller->mq,
+                  env);
   return controller;
 }
 
@@ -2182,16 +2190,17 @@ static void
 opstart_shutdown_peers (void *cls)
 {
   struct OperationContext *opc = cls;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_TESTBED_ShutdownPeersMessage *msg;
 
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_new (struct GNUNET_TESTBED_ShutdownPeersMessage);
-  msg->header.size =
-      htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
-  GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+  GNUNET_TESTBED_insert_opc_ (opc->c,
+                              opc);
+  GNUNET_MQ_send (opc->c->mq,
+                  env);
 }
 
 
@@ -2330,10 +2339,10 @@ GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller,
                               int echo)
 {
   struct GNUNET_TESTBED_BarrierInit *msg;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_TESTBED_Barrier *barrier;
   struct GNUNET_HashCode key;
   size_t name_len;
-  uint16_t msize;
 
   GNUNET_assert (quorum <= 100);
   GNUNET_assert (NULL != cb);
@@ -2362,13 +2371,16 @@ GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller,
                                                     &barrier->key,
                                                     barrier,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
-  msize = name_len + sizeof (struct GNUNET_TESTBED_BarrierInit);
-  msg = GNUNET_malloc (msize);
-  msg->header.size = htons (msize);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT);
+
+  env = GNUNET_MQ_msg_extra (msg,
+                             name_len,
+                             GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT);
   msg->quorum = (uint8_t) quorum;
-  (void) memcpy (msg->name, barrier->name, name_len);
-  GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header);
+  memcpy (msg->name,
+          barrier->name,
+          name_len);
+  GNUNET_MQ_send (barrier->c->mq,
+                  env);
   return barrier;
 }
 
@@ -2406,15 +2418,19 @@ GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller,
 void
 GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier)
 {
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_TESTBED_BarrierCancel *msg;
-  uint16_t msize;
-
-  msize = sizeof (struct GNUNET_TESTBED_BarrierCancel) + strlen (barrier->name);
-  msg = GNUNET_malloc (msize);
-  msg->header.size = htons (msize);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL);
-  (void) memcpy (msg->name, barrier->name, strlen (barrier->name));
-  GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header);
+  size_t slen;
+
+  slen = strlen (barrier->name);
+  env = GNUNET_MQ_msg_extra (msg,
+                             slen,
+                             GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL);
+  memcpy (msg->name,
+          barrier->name,
+          slen);
+  GNUNET_MQ_send (barrier->c->mq,
+                  env);
   GNUNET_TESTBED_barrier_remove_ (barrier);
 }