guix-env: some update.
[oweals/gnunet.git] / src / testbed / testbed_api_peers.c
index b5a66cd5da86bf148a3ad26c0927c1bf6bf4b9f3..871e554a944a27e2346d618b1a1b581b9c32ea3f 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2008--2012 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2008--2013 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
       You should have received a copy of the GNU General Public License
       along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
  */
 
 /**
@@ -97,33 +97,38 @@ static void
 opstart_peer_create (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct PeerCreateData *data;
+  struct PeerCreateData *data = opc->data;
   struct GNUNET_TESTBED_PeerCreateMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
   char *config;
   char *xconfig;
   size_t c_size;
   size_t xc_size;
-  uint16_t msize;
 
   GNUNET_assert (OP_PEER_CREATE == opc->type);
-  data = opc->data;
   GNUNET_assert (NULL != data);
   GNUNET_assert (NULL != data->peer);
   opc->state = OPC_STATE_STARTED;
-  config = GNUNET_CONFIGURATION_serialize (data->cfg, &c_size);
-  xc_size = GNUNET_TESTBED_compress_config_ (config, c_size, &xconfig);
+  config = GNUNET_CONFIGURATION_serialize (data->cfg,
+                                           &c_size);
+  xc_size = GNUNET_TESTBED_compress_config_ (config,
+                                             c_size,
+                                             &xconfig);
   GNUNET_free (config);
-  msize = xc_size + sizeof (struct GNUNET_TESTBED_PeerCreateMessage);
-  msg = GNUNET_realloc (xconfig, msize);
-  memmove (&msg[1], msg, xc_size);
-  msg->header.size = htons (msize);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER);
+  env = GNUNET_MQ_msg_extra (msg,
+                             xc_size,
+                             GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER);
   msg->operation_id = GNUNET_htonll (opc->id);
   msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (data->peer->host));
   msg->peer_id = htonl (data->peer->unique_id);
-  msg->config_size = htonl (c_size);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+  msg->config_size = htons ((uint16_t) c_size);
+  GNUNET_memcpy (&msg[1],
+          xconfig,
+          xc_size);
+  GNUNET_MQ_send (opc->c->mq,
+                  env);
+  GNUNET_free (xconfig);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
 }
 
 
@@ -140,7 +145,7 @@ oprelease_peer_create (void *cls)
   switch (opc->state)
   {
   case OPC_STATE_STARTED:
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
     /* No break we continue flow */
   case OPC_STATE_INIT:
     GNUNET_free (((struct PeerCreateData *) opc->data)->peer);
@@ -162,20 +167,20 @@ static void
 opstart_peer_destroy (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct GNUNET_TESTBED_Peer *peer;
+  struct GNUNET_TESTBED_Peer *peer = opc->data;
   struct GNUNET_TESTBED_PeerDestroyMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
 
   GNUNET_assert (OP_PEER_DESTROY == opc->type);
-  peer = opc->data;
   GNUNET_assert (NULL != peer);
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerDestroyMessage));
-  msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerDestroyMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER);
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER);
   msg->peer_id = htonl (peer->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_MQ_send (peer->controller->mq,
+                  env);
 }
 
 
@@ -189,8 +194,16 @@ oprelease_peer_destroy (void *cls)
 {
   struct OperationContext *opc = cls;
 
-  if (OPC_STATE_FINISHED != opc->state)
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  switch (opc->state)
+  {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
+    break;
+  case OPC_STATE_FINISHED:
+    break;
+  }
   GNUNET_free (opc);
 }
 
@@ -205,23 +218,22 @@ opstart_peer_start (void *cls)
 {
   struct OperationContext *opc = cls;
   struct GNUNET_TESTBED_PeerStartMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
   struct PeerEventData *data;
   struct GNUNET_TESTBED_Peer *peer;
 
   GNUNET_assert (OP_PEER_START == opc->type);
-  GNUNET_assert (NULL != opc->data);
-  data = opc->data;
-  GNUNET_assert (NULL != data->peer);
-  peer = data->peer;
-  GNUNET_assert ((PS_CREATED == peer->state) || (PS_STOPPED == peer->state));
+  GNUNET_assert (NULL != (data = opc->data));
+  GNUNET_assert (NULL != (peer = data->peer));
+  GNUNET_assert ((TESTBED_PS_CREATED == peer->state) || (TESTBED_PS_STOPPED == peer->state));
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerStartMessage));
-  msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerStartMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_START_PEER);
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TESTBED_START_PEER);
   msg->peer_id = htonl (peer->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_MQ_send (peer->controller->mq,
+                  env);
 }
 
 
@@ -235,10 +247,16 @@ oprelease_peer_start (void *cls)
 {
   struct OperationContext *opc = cls;
 
-  if (OPC_STATE_FINISHED != opc->state)
+  switch (opc->state)
   {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
     GNUNET_free (opc->data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
   }
   GNUNET_free (opc);
 }
@@ -256,20 +274,19 @@ opstart_peer_stop (void *cls)
   struct GNUNET_TESTBED_PeerStopMessage *msg;
   struct PeerEventData *data;
   struct GNUNET_TESTBED_Peer *peer;
+  struct GNUNET_MQ_Envelope *env;
 
-  GNUNET_assert (NULL != opc->data);
-  data = opc->data;
-  GNUNET_assert (NULL != data->peer);
-  peer = data->peer;
-  GNUNET_assert (PS_STARTED == peer->state);
+  GNUNET_assert (NULL != (data = opc->data));
+  GNUNET_assert (NULL != (peer = data->peer));
+  GNUNET_assert (TESTBED_PS_STARTED == peer->state);
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerStopMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER);
-  msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerStopMessage));
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER);
   msg->peer_id = htonl (peer->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_MQ_send (peer->controller->mq,
+                  env);
 }
 
 
@@ -283,10 +300,16 @@ oprelease_peer_stop (void *cls)
 {
   struct OperationContext *opc = cls;
 
-  if (OPC_STATE_FINISHED != opc->state)
+  switch (opc->state)
   {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
     GNUNET_free (opc->data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
   }
   GNUNET_free (opc);
 }
@@ -311,7 +334,7 @@ GNUNET_TESTBED_generate_peergetconfig_msg_ (uint32_t peer_id,
                      (struct GNUNET_TESTBED_PeerGetConfigurationMessage));
   msg->header.size =
       htons (sizeof (struct GNUNET_TESTBED_PeerGetConfigurationMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_PEER_CONFIGURATION);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_PEER_INFORMATION);
   msg->peer_id = htonl (peer_id);
   msg->operation_id = GNUNET_htonll (operation_id);
   return msg;
@@ -327,16 +350,15 @@ static void
 opstart_peer_getinfo (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct PeerInfoData *data;
+  struct PeerInfoData *data = opc->data;
   struct GNUNET_TESTBED_PeerGetConfigurationMessage *msg;
 
-  data = opc->data;
   GNUNET_assert (NULL != data);
   opc->state = OPC_STATE_STARTED;
   msg =
       GNUNET_TESTBED_generate_peergetconfig_msg_ (data->peer->unique_id,
                                                   opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
 }
 
@@ -352,19 +374,22 @@ oprelease_peer_getinfo (void *cls)
   struct OperationContext *opc = cls;
   struct GNUNET_TESTBED_PeerInformation *data;
 
-  if (OPC_STATE_FINISHED != opc->state)
-  {
-    GNUNET_free_non_null (opc->data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  }
-  else
+  switch (opc->state)
   {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
+    GNUNET_free (opc->data);
+    break;
+  case OPC_STATE_FINISHED:
     data = opc->data;
     GNUNET_assert (NULL != data);
     switch (data->pit)
     {
     case GNUNET_TESTBED_PIT_CONFIGURATION:
-      GNUNET_CONFIGURATION_destroy (data->result.cfg);
+      if (NULL != data->result.cfg)
+        GNUNET_CONFIGURATION_destroy (data->result.cfg);
       break;
     case GNUNET_TESTBED_PIT_IDENTITY:
       GNUNET_free (data->result.id);
@@ -373,6 +398,7 @@ oprelease_peer_getinfo (void *cls)
       GNUNET_assert (0);        /* We should never reach here */
     }
     GNUNET_free (data);
+    break;
   }
   GNUNET_free (opc);
 }
@@ -387,24 +413,23 @@ static void
 opstart_overlay_connect (void *cls)
 {
   struct OperationContext *opc = cls;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_TESTBED_OverlayConnectMessage *msg;
   struct OverlayConnectData *data;
 
   opc->state = OPC_STATE_STARTED;
   data = opc->data;
   GNUNET_assert (NULL != data);
-  data->tslot_index = GNUNET_TESTBED_get_tslot_ (data->p1->host, data);
-  data->tstart = GNUNET_TIME_absolute_get ();
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
-  msg->header.size =
-      htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_OVERLAY_CONNECT);
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TESTBED_OVERLAY_CONNECT);
   msg->peer1 = htonl (data->p1->unique_id);
   msg->peer2 = htonl (data->p2->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
   msg->peer2_host_id = htonl (GNUNET_TESTBED_host_get_id_ (data->p2->host));
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+  GNUNET_TESTBED_insert_opc_ (opc->c,
+                              opc);
+  GNUNET_MQ_send (opc->c->mq,
+                  env);
 }
 
 
@@ -417,7 +442,6 @@ static void
 oprelease_overlay_connect (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct GNUNET_TIME_Relative duration;
   struct OverlayConnectData *data;
 
   data = opc->data;
@@ -426,20 +450,84 @@ oprelease_overlay_connect (void *cls)
   case OPC_STATE_INIT:
     break;
   case OPC_STATE_STARTED:
-    (void) GNUNET_TESTBED_release_time_slot_ (data->p1->host, data->tslot_index,
-                                              data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
     break;
   case OPC_STATE_FINISHED:
-    duration = GNUNET_TIME_absolute_get_duration (data->tstart);
-    GNUNET_TESTBED_update_time_slot_ (data->p1->host, data->tslot_index, data,
-                                      duration, data->failed);
+    break;
   }
   GNUNET_free (data);
   GNUNET_free (opc);
 }
 
 
+/**
+ * Function called when a peer reconfigure operation is ready
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+opstart_peer_reconfigure (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct PeerReconfigureData *data = opc->data;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TESTBED_PeerReconfigureMessage *msg;
+  char *xconfig;
+  size_t xc_size;
+
+  opc->state = OPC_STATE_STARTED;
+  GNUNET_assert (NULL != data);
+  xc_size = GNUNET_TESTBED_compress_config_ (data->config,
+                                             data->cfg_size,
+                                             &xconfig);
+  GNUNET_free (data->config);
+  data->config = NULL;
+  GNUNET_assert (xc_size < UINT16_MAX - sizeof (*msg));
+  env = GNUNET_MQ_msg_extra (msg,
+                             xc_size,
+                             GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER);
+  msg->peer_id = htonl (data->peer->unique_id);
+  msg->operation_id = GNUNET_htonll (opc->id);
+  msg->config_size = htons (data->cfg_size);
+  GNUNET_memcpy (&msg[1],
+          xconfig,
+          xc_size);
+  GNUNET_free (xconfig);
+  GNUNET_free (data);
+  opc->data = NULL;
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_MQ_send (opc->c->mq,
+                  env);
+}
+
+
+/**
+ * Callback which will be called when a peer reconfigure operation is released
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+oprelease_peer_reconfigure (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct PeerReconfigureData *data = opc->data;
+
+  switch (opc->state)
+  {
+  case OPC_STATE_INIT:
+    GNUNET_free (data->config);
+    GNUNET_free (data);
+    break;
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
+  }
+  GNUNET_free (opc);
+}
+
+
 /**
  * Lookup a peer by ID.
  *
@@ -495,18 +583,18 @@ GNUNET_TESTBED_peer_create (struct GNUNET_TESTBED_Controller *controller,
   struct OperationContext *opc;
   static uint32_t id_gen;
 
-  peer = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Peer));
+  peer = GNUNET_new (struct GNUNET_TESTBED_Peer);
   peer->controller = controller;
   peer->host = host;
   peer->unique_id = id_gen++;
-  peer->state = PS_INVALID;
-  data = GNUNET_malloc (sizeof (struct PeerCreateData));
+  peer->state = TESTBED_PS_INVALID;
+  data = GNUNET_new (struct PeerCreateData);
   data->host = host;
   data->cfg = cfg;
   data->cb = cb;
   data->cls = cls;
   data->peer = peer;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = controller;
   opc->data = data;
   opc->id = GNUNET_TESTBED_get_next_op_id (controller);
@@ -538,11 +626,11 @@ GNUNET_TESTBED_peer_start (void *op_cls, struct GNUNET_TESTBED_Peer *peer,
   struct OperationContext *opc;
   struct PeerEventData *data;
 
-  data = GNUNET_malloc (sizeof (struct PeerEventData));
+  data = GNUNET_new (struct PeerEventData);
   data->peer = peer;
   data->pcc = pcc;
   data->pcc_cls = pcc_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = peer->controller;
   opc->data = data;
   opc->op_cls = op_cls;
@@ -578,11 +666,11 @@ GNUNET_TESTBED_peer_stop (void *op_cls,
   struct OperationContext *opc;
   struct PeerEventData *data;
 
-  data = GNUNET_malloc (sizeof (struct PeerEventData));
+  data = GNUNET_new (struct PeerEventData);
   data->peer = peer;
   data->pcc = pcc;
   data->pcc_cls = pcc_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = peer->controller;
   opc->data = data;
   opc->op_cls = op_cls;
@@ -603,6 +691,7 @@ GNUNET_TESTBED_peer_stop (void *op_cls,
  * with event type GNUNET_TESTBED_ET_OPERATION_FINISHED when result for this
  * operation is available. Instead, the GNUNET_TESTBED_PeerInfoCallback() will
  * be called.
+ * The peer information in the callback is valid until the operation is canceled.
  *
  * @param peer peer to request information about
  * @param pit desired information
@@ -621,12 +710,13 @@ GNUNET_TESTBED_peer_get_information (struct GNUNET_TESTBED_Peer *peer,
   struct PeerInfoData *data;
 
   GNUNET_assert (GNUNET_TESTBED_PIT_GENERIC != pit);
-  data = GNUNET_malloc (sizeof (struct PeerInfoData));
+  GNUNET_assert (NULL != cb);
+  data = GNUNET_new (struct PeerInfoData);
   data->peer = peer;
   data->pit = pit;
   data->cb = cb;
   data->cb_cls = cb_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = peer->controller;
   opc->data = data;
   opc->type = OP_PEER_INFO;
@@ -656,9 +746,38 @@ GNUNET_TESTBED_peer_update_configuration (struct GNUNET_TESTBED_Peer *peer,
                                           const struct
                                           GNUNET_CONFIGURATION_Handle *cfg)
 {
-  // FIXME: handle locally or delegate...
-  GNUNET_break (0);
-  return NULL;
+  struct OperationContext *opc;
+  struct PeerReconfigureData *data;
+  size_t csize;
+
+  data = GNUNET_new (struct PeerReconfigureData);
+  data->peer = peer;
+  data->config = GNUNET_CONFIGURATION_serialize (cfg, &csize);
+  if (NULL == data->config)
+  {
+    GNUNET_free (data);
+    return NULL;
+  }
+  if (csize > UINT16_MAX)
+  {
+    GNUNET_break (0);
+    GNUNET_free (data->config);
+    GNUNET_free (data);
+    return NULL;
+  }
+  data->cfg_size = (uint16_t) csize;
+  opc = GNUNET_new (struct OperationContext);
+  opc->c = peer->controller;
+  opc->data = data;
+  opc->type = OP_PEER_RECONFIGURE;
+  opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
+  opc->op =
+      GNUNET_TESTBED_operation_create_ (opc, &opstart_peer_reconfigure,
+                                        &oprelease_peer_reconfigure);
+  GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations,
+                                          opc->op);
+  GNUNET_TESTBED_operation_begin_wait_ (opc->op);
+  return opc->op;
 }
 
 
@@ -674,7 +793,7 @@ GNUNET_TESTBED_peer_destroy (struct GNUNET_TESTBED_Peer *peer)
 {
   struct OperationContext *opc;
 
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->data = peer;
   opc->c = peer->controller;
   opc->id = GNUNET_TESTBED_get_next_op_id (peer->controller);
@@ -736,13 +855,13 @@ GNUNET_TESTBED_overlay_connect (void *op_cls,
   struct OperationContext *opc;
   struct OverlayConnectData *data;
 
-  GNUNET_assert ((PS_STARTED == p1->state) && (PS_STARTED == p2->state));
-  data = GNUNET_malloc (sizeof (struct OverlayConnectData));
+  GNUNET_assert ((TESTBED_PS_STARTED == p1->state) && (TESTBED_PS_STARTED == p2->state));
+  data = GNUNET_new (struct OverlayConnectData);
   data->p1 = p1;
   data->p2 = p2;
   data->cb = cb;
   data->cb_cls = cb_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->data = data;
   opc->c = p1->controller;
   opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
@@ -757,5 +876,123 @@ GNUNET_TESTBED_overlay_connect (void *op_cls,
 }
 
 
+/**
+ * Function called when a peer manage service operation is ready
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+opstart_manage_service (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct ManageServiceData *data = opc->data;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TESTBED_ManagePeerServiceMessage *msg;
+  size_t xlen;
+
+  GNUNET_assert (NULL != data);
+  xlen = data->msize - sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage);
+  env = GNUNET_MQ_msg_extra (msg,
+                             xlen,
+                             GNUNET_MESSAGE_TYPE_TESTBED_MANAGE_PEER_SERVICE);
+  msg->peer_id = htonl (data->peer->unique_id);
+  msg->operation_id = GNUNET_htonll (opc->id);
+  msg->start = (uint8_t) data->start;
+  GNUNET_memcpy (&msg[1],
+          data->service_name,
+          xlen);
+  GNUNET_free (data->service_name);
+  data->service_name = NULL;
+  opc->state = OPC_STATE_STARTED;
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_MQ_send (opc->c->mq,
+                  env);
+}
+
+
+/**
+ * Callback which will be called when peer manage server operation is released
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+oprelease_manage_service (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct ManageServiceData *data;
+
+  data = opc->data;
+  switch (opc->state)
+  {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    break;
+  case OPC_STATE_INIT:
+    GNUNET_assert (NULL != data);
+    GNUNET_free (data->service_name);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
+  }
+  GNUNET_free_non_null (data);
+  GNUNET_free (opc);
+}
+
+
+/**
+ * Start or stop given service at a peer.  This should not be called to
+ * start/stop the peer's ARM service.  Use GNUNET_TESTBED_peer_start(),
+ * GNUNET_TESTBED_peer_stop() for starting/stopping peer's ARM service.  Success
+ * or failure of the generated operation is signalled through the controller
+ * event callback and/or operation completion callback.
+ *
+ * @param op_cls the closure for the operation
+ * @param peer the peer whose service is to be started/stopped
+ * @param service_name the name of the service
+ * @param cb the operation completion callback
+ * @param cb_cls the closure for the operation completion callback
+ * @param start 1 to start the service; 0 to stop the service
+ * @return an operation handle; NULL upon error (peer not running)
+ */
+struct GNUNET_TESTBED_Operation *
+GNUNET_TESTBED_peer_manage_service (void *op_cls,
+                                    struct GNUNET_TESTBED_Peer *peer,
+                                    const char *service_name,
+                                    GNUNET_TESTBED_OperationCompletionCallback cb,
+                                    void *cb_cls,
+                                    unsigned int start)
+{
+  struct ManageServiceData *data;
+  struct OperationContext *opc;
+  size_t msize;
+
+  GNUNET_assert (TESTBED_PS_STARTED == peer->state); /* peer is not running? */
+  msize = strlen (service_name) + 1;
+  msize += sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage);
+  if (GNUNET_MAX_MESSAGE_SIZE < msize)
+    return NULL;
+  data = GNUNET_new (struct ManageServiceData);
+  data->cb = cb;
+  data->cb_cls = cb_cls;
+  data->peer = peer;
+  data->service_name = GNUNET_strdup (service_name);
+  data->start = start;
+  data->msize = (uint16_t) msize;
+  opc = GNUNET_new (struct OperationContext);
+  opc->data = data;
+  opc->c = peer->controller;
+  opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
+  opc->type = OP_MANAGE_SERVICE;
+  opc->op_cls = op_cls;
+  opc->op =
+      GNUNET_TESTBED_operation_create_ (opc, &opstart_manage_service,
+                                        &oprelease_manage_service);
+  GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations,
+                                          opc->op);
+  GNUNET_TESTBED_operation_begin_wait_ (opc->op);
+  return opc->op;
+}
+
+
 
 /* end of testbed_api_peers.c */