-more datacache integration work
[oweals/gnunet.git] / src / testbed / testbed_api_peers.c
index 39e4b596a99772a6899b296d9acd287c4444502b..9e9028c7823915c121e3382f84f2566654f12939 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2008--2012 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 #include "testbed_api_hosts.h"
 #include "testbed_api_operations.h"
 
+
+/**
+ * Peer list DLL head
+ */
+static struct GNUNET_TESTBED_Peer *peer_list_head;
+
+/**
+ * Peer list DLL tail
+ */
+static struct GNUNET_TESTBED_Peer *peer_list_tail;
+
+
+/**
+ * Adds a peer to the peer list
+ *
+ * @param peer the peer to add to the peer list
+ */
+void
+GNUNET_TESTBED_peer_register_ (struct GNUNET_TESTBED_Peer *peer)
+{
+  GNUNET_CONTAINER_DLL_insert_tail (peer_list_head, peer_list_tail, peer);
+}
+
+
+/**
+ * Removes a peer from the peer list
+ *
+ * @param peer the peer to remove
+ */
+void
+GNUNET_TESTBED_peer_deregister_ (struct GNUNET_TESTBED_Peer *peer)
+{
+  GNUNET_CONTAINER_DLL_remove (peer_list_head, peer_list_tail, peer);
+}
+
+
+/**
+ * Frees all peers
+ */
+void
+GNUNET_TESTBED_cleanup_peers_ (void)
+{
+  struct GNUNET_TESTBED_Peer *peer;
+
+  while (NULL != (peer = peer_list_head))
+  {
+    GNUNET_TESTBED_peer_deregister_ (peer);
+    GNUNET_free (peer);
+  }
+}
+
+
+
 /**
  * Function to call to start a peer_create type operation once all
  * queues the operation is part of declare that the
@@ -44,7 +97,7 @@ static void
 opstart_peer_create (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct PeerCreateData *data;
+  struct PeerCreateData *data = opc->data;
   struct GNUNET_TESTBED_PeerCreateMessage *msg;
   char *config;
   char *xconfig;
@@ -53,7 +106,6 @@ opstart_peer_create (void *cls)
   uint16_t msize;
 
   GNUNET_assert (OP_PEER_CREATE == opc->type);
-  data = opc->data;
   GNUNET_assert (NULL != data);
   GNUNET_assert (NULL != data->peer);
   opc->state = OPC_STATE_STARTED;
@@ -68,8 +120,8 @@ opstart_peer_create (void *cls)
   msg->operation_id = GNUNET_htonll (opc->id);
   msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (data->peer->host));
   msg->peer_id = htonl (data->peer->unique_id);
-  msg->config_size = htonl (c_size);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  msg->config_size = htons ((uint16_t) c_size);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
 }
 
@@ -87,7 +139,7 @@ oprelease_peer_create (void *cls)
   switch (opc->state)
   {
   case OPC_STATE_STARTED:
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
     /* No break we continue flow */
   case OPC_STATE_INIT:
     GNUNET_free (((struct PeerCreateData *) opc->data)->peer);
@@ -109,19 +161,18 @@ static void
 opstart_peer_destroy (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct GNUNET_TESTBED_Peer *peer;
+  struct GNUNET_TESTBED_Peer *peer = opc->data;
   struct GNUNET_TESTBED_PeerDestroyMessage *msg;
 
   GNUNET_assert (OP_PEER_DESTROY == opc->type);
-  peer = opc->data;
   GNUNET_assert (NULL != peer);
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerDestroyMessage));
+  msg = GNUNET_new (struct GNUNET_TESTBED_PeerDestroyMessage);
   msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerDestroyMessage));
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER);
   msg->peer_id = htonl (peer->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header);
 }
 
@@ -136,8 +187,16 @@ oprelease_peer_destroy (void *cls)
 {
   struct OperationContext *opc = cls;
 
-  if (OPC_STATE_FINISHED != opc->state)
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  switch (opc->state)
+  {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
+    break;
+  case OPC_STATE_FINISHED:
+    break;
+  }
   GNUNET_free (opc);
 }
 
@@ -156,18 +215,16 @@ opstart_peer_start (void *cls)
   struct GNUNET_TESTBED_Peer *peer;
 
   GNUNET_assert (OP_PEER_START == opc->type);
-  GNUNET_assert (NULL != opc->data);
-  data = opc->data;
-  GNUNET_assert (NULL != data->peer);
-  peer = data->peer;
-  GNUNET_assert ((PS_CREATED == peer->state) || (PS_STOPPED == peer->state));
+  GNUNET_assert (NULL != (data = opc->data));
+  GNUNET_assert (NULL != (peer = data->peer));
+  GNUNET_assert ((TESTBED_PS_CREATED == peer->state) || (TESTBED_PS_STOPPED == peer->state));
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerStartMessage));
+  msg = GNUNET_new (struct GNUNET_TESTBED_PeerStartMessage);
   msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerStartMessage));
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_START_PEER);
   msg->peer_id = htonl (peer->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header);
 }
 
@@ -182,10 +239,16 @@ oprelease_peer_start (void *cls)
 {
   struct OperationContext *opc = cls;
 
-  if (OPC_STATE_FINISHED != opc->state)
+  switch (opc->state)
   {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
     GNUNET_free (opc->data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
   }
   GNUNET_free (opc);
 }
@@ -204,18 +267,16 @@ opstart_peer_stop (void *cls)
   struct PeerEventData *data;
   struct GNUNET_TESTBED_Peer *peer;
 
-  GNUNET_assert (NULL != opc->data);
-  data = opc->data;
-  GNUNET_assert (NULL != data->peer);
-  peer = data->peer;
-  GNUNET_assert (PS_STARTED == peer->state);
+  GNUNET_assert (NULL != (data = opc->data));
+  GNUNET_assert (NULL != (peer = data->peer));
+  GNUNET_assert (TESTBED_PS_STARTED == peer->state);
   opc->state = OPC_STATE_STARTED;
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerStopMessage));
+  msg = GNUNET_new (struct GNUNET_TESTBED_PeerStopMessage);
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER);
   msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerStopMessage));
   msg->peer_id = htonl (peer->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header);
 }
 
@@ -230,10 +291,16 @@ oprelease_peer_stop (void *cls)
 {
   struct OperationContext *opc = cls;
 
-  if (OPC_STATE_FINISHED != opc->state)
+  switch (opc->state)
   {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
     GNUNET_free (opc->data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
   }
   GNUNET_free (opc);
 }
@@ -258,7 +325,7 @@ GNUNET_TESTBED_generate_peergetconfig_msg_ (uint32_t peer_id,
                      (struct GNUNET_TESTBED_PeerGetConfigurationMessage));
   msg->header.size =
       htons (sizeof (struct GNUNET_TESTBED_PeerGetConfigurationMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_PEER_CONFIGURATION);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_PEER_INFORMATION);
   msg->peer_id = htonl (peer_id);
   msg->operation_id = GNUNET_htonll (operation_id);
   return msg;
@@ -274,16 +341,15 @@ static void
 opstart_peer_getinfo (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct PeerInfoData *data;
+  struct PeerInfoData *data = opc->data;
   struct GNUNET_TESTBED_PeerGetConfigurationMessage *msg;
 
-  data = opc->data;
   GNUNET_assert (NULL != data);
   opc->state = OPC_STATE_STARTED;
   msg =
       GNUNET_TESTBED_generate_peergetconfig_msg_ (data->peer->unique_id,
                                                   opc->id);
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
 }
 
@@ -299,19 +365,22 @@ oprelease_peer_getinfo (void *cls)
   struct OperationContext *opc = cls;
   struct GNUNET_TESTBED_PeerInformation *data;
 
-  if (OPC_STATE_FINISHED != opc->state)
-  {
-    GNUNET_free_non_null (opc->data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
-  }
-  else
+  switch (opc->state)
   {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    /* no break; continue */
+  case OPC_STATE_INIT:
+    GNUNET_free (opc->data);
+    break;
+  case OPC_STATE_FINISHED:
     data = opc->data;
     GNUNET_assert (NULL != data);
     switch (data->pit)
     {
     case GNUNET_TESTBED_PIT_CONFIGURATION:
-      GNUNET_CONFIGURATION_destroy (data->result.cfg);
+      if (NULL != data->result.cfg)
+        GNUNET_CONFIGURATION_destroy (data->result.cfg);
       break;
     case GNUNET_TESTBED_PIT_IDENTITY:
       GNUNET_free (data->result.id);
@@ -320,6 +389,7 @@ oprelease_peer_getinfo (void *cls)
       GNUNET_assert (0);        /* We should never reach here */
     }
     GNUNET_free (data);
+    break;
   }
   GNUNET_free (opc);
 }
@@ -340,9 +410,7 @@ opstart_overlay_connect (void *cls)
   opc->state = OPC_STATE_STARTED;
   data = opc->data;
   GNUNET_assert (NULL != data);
-  data->tslot_index = GNUNET_TESTBED_get_tslot_ (data->p1->host, data);
-  data->tstart = GNUNET_TIME_absolute_get ();
-  msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
+  msg = GNUNET_new (struct GNUNET_TESTBED_OverlayConnectMessage);
   msg->header.size =
       htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_OVERLAY_CONNECT);
@@ -350,7 +418,7 @@ opstart_overlay_connect (void *cls)
   msg->peer2 = htonl (data->p2->unique_id);
   msg->operation_id = GNUNET_htonll (opc->id);
   msg->peer2_host_id = htonl (GNUNET_TESTBED_host_get_id_ (data->p2->host));
-  GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc);
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
 }
 
@@ -364,7 +432,6 @@ static void
 oprelease_overlay_connect (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct GNUNET_TIME_Relative duration;
   struct OverlayConnectData *data;
 
   data = opc->data;
@@ -373,20 +440,81 @@ oprelease_overlay_connect (void *cls)
   case OPC_STATE_INIT:
     break;
   case OPC_STATE_STARTED:
-    (void) GNUNET_TESTBED_release_time_slot_ (data->p1->host, data->tslot_index,
-                                              data);
-    GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
     break;
   case OPC_STATE_FINISHED:
-    duration = GNUNET_TIME_absolute_get_duration (data->tstart);
-    GNUNET_TESTBED_update_time_slot_ (data->p1->host, data->tslot_index, data,
-                                      duration, data->failed);
+    break;
   }
   GNUNET_free (data);
   GNUNET_free (opc);
 }
 
 
+/**
+ * Function called when a peer reconfigure operation is ready
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+opstart_peer_reconfigure (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct PeerReconfigureData *data = opc->data;
+  struct GNUNET_TESTBED_PeerReconfigureMessage *msg;
+  char *xconfig;
+  size_t xc_size;
+  uint16_t msize;
+
+  opc->state = OPC_STATE_STARTED;
+  GNUNET_assert (NULL != data);
+  xc_size = GNUNET_TESTBED_compress_config_ (data->config, data->cfg_size,
+                                             &xconfig);
+  GNUNET_free (data->config);
+  data->config = NULL;
+  GNUNET_assert (xc_size <= UINT16_MAX);
+  msize = (uint16_t) xc_size +
+      sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage);
+  msg = GNUNET_realloc (xconfig, msize);
+  (void) memmove (&msg[1], msg, xc_size);
+  msg->header.size = htons (msize);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER);
+  msg->peer_id = htonl (data->peer->unique_id);
+  msg->operation_id = GNUNET_htonll (opc->id);
+  msg->config_size = htons (data->cfg_size);
+  GNUNET_free (data);
+  opc->data = NULL;
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+}
+
+
+/**
+ * Callback which will be called when a peer reconfigure operation is released
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+oprelease_peer_reconfigure (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct PeerReconfigureData *data = opc->data;
+
+  switch (opc->state)
+  {
+  case OPC_STATE_INIT:
+    GNUNET_free (data->config);
+    GNUNET_free (data);
+    break;
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
+  }
+  GNUNET_free (opc);
+}
+
+
 /**
  * Lookup a peer by ID.
  *
@@ -442,18 +570,18 @@ GNUNET_TESTBED_peer_create (struct GNUNET_TESTBED_Controller *controller,
   struct OperationContext *opc;
   static uint32_t id_gen;
 
-  peer = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Peer));
+  peer = GNUNET_new (struct GNUNET_TESTBED_Peer);
   peer->controller = controller;
   peer->host = host;
   peer->unique_id = id_gen++;
-  peer->state = PS_INVALID;
-  data = GNUNET_malloc (sizeof (struct PeerCreateData));
+  peer->state = TESTBED_PS_INVALID;
+  data = GNUNET_new (struct PeerCreateData);
   data->host = host;
   data->cfg = cfg;
   data->cb = cb;
   data->cls = cls;
   data->peer = peer;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = controller;
   opc->data = data;
   opc->id = GNUNET_TESTBED_get_next_op_id (controller);
@@ -485,11 +613,11 @@ GNUNET_TESTBED_peer_start (void *op_cls, struct GNUNET_TESTBED_Peer *peer,
   struct OperationContext *opc;
   struct PeerEventData *data;
 
-  data = GNUNET_malloc (sizeof (struct PeerEventData));
+  data = GNUNET_new (struct PeerEventData);
   data->peer = peer;
   data->pcc = pcc;
   data->pcc_cls = pcc_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = peer->controller;
   opc->data = data;
   opc->op_cls = op_cls;
@@ -510,25 +638,29 @@ GNUNET_TESTBED_peer_start (void *op_cls, struct GNUNET_TESTBED_Peer *peer,
  * "GNUNET_TESTBED_peer_destroy" to fully clean up the
  * state of the peer).
  *
+ * @param op_cls the closure for this operation; will be set in the event
+ *          information
  * @param peer peer to stop
  * @param pcc function to call upon completion
  * @param pcc_cls closure for 'pcc'
  * @return handle to the operation
  */
 struct GNUNET_TESTBED_Operation *
-GNUNET_TESTBED_peer_stop (struct GNUNET_TESTBED_Peer *peer,
+GNUNET_TESTBED_peer_stop (void *op_cls,
+                          struct GNUNET_TESTBED_Peer *peer,
                           GNUNET_TESTBED_PeerChurnCallback pcc, void *pcc_cls)
 {
   struct OperationContext *opc;
   struct PeerEventData *data;
 
-  data = GNUNET_malloc (sizeof (struct PeerEventData));
+  data = GNUNET_new (struct PeerEventData);
   data->peer = peer;
   data->pcc = pcc;
   data->pcc_cls = pcc_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = peer->controller;
   opc->data = data;
+  opc->op_cls = op_cls;
   opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
   opc->type = OP_PEER_STOP;
   opc->op =
@@ -546,6 +678,7 @@ GNUNET_TESTBED_peer_stop (struct GNUNET_TESTBED_Peer *peer,
  * with event type GNUNET_TESTBED_ET_OPERATION_FINISHED when result for this
  * operation is available. Instead, the GNUNET_TESTBED_PeerInfoCallback() will
  * be called.
+ * The peer information in the callback is valid until the operation is canceled.
  *
  * @param peer peer to request information about
  * @param pit desired information
@@ -564,12 +697,13 @@ GNUNET_TESTBED_peer_get_information (struct GNUNET_TESTBED_Peer *peer,
   struct PeerInfoData *data;
 
   GNUNET_assert (GNUNET_TESTBED_PIT_GENERIC != pit);
-  data = GNUNET_malloc (sizeof (struct PeerInfoData));
+  GNUNET_assert (NULL != cb);
+  data = GNUNET_new (struct PeerInfoData);
   data->peer = peer;
   data->pit = pit;
   data->cb = cb;
   data->cb_cls = cb_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->c = peer->controller;
   opc->data = data;
   opc->type = OP_PEER_INFO;
@@ -599,9 +733,38 @@ GNUNET_TESTBED_peer_update_configuration (struct GNUNET_TESTBED_Peer *peer,
                                           const struct
                                           GNUNET_CONFIGURATION_Handle *cfg)
 {
-  // FIXME: handle locally or delegate...
-  GNUNET_break (0);
-  return NULL;
+  struct OperationContext *opc;
+  struct PeerReconfigureData *data;
+  size_t csize;
+
+  data = GNUNET_new (struct PeerReconfigureData);
+  data->peer = peer;
+  data->config = GNUNET_CONFIGURATION_serialize (cfg, &csize);
+  if (NULL == data->config)
+  {
+    GNUNET_free (data);
+    return NULL;
+  }
+  if (csize > UINT16_MAX)
+  {
+    GNUNET_break (0);
+    GNUNET_free (data->config);
+    GNUNET_free (data);
+    return NULL;
+  }
+  data->cfg_size = (uint16_t) csize;
+  opc = GNUNET_new (struct OperationContext);
+  opc->c = peer->controller;
+  opc->data = data;
+  opc->type = OP_PEER_RECONFIGURE;
+  opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
+  opc->op =
+      GNUNET_TESTBED_operation_create_ (opc, &opstart_peer_reconfigure,
+                                        &oprelease_peer_reconfigure);
+  GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations,
+                                          opc->op);
+  GNUNET_TESTBED_operation_begin_wait_ (opc->op);
+  return opc->op;
 }
 
 
@@ -617,7 +780,7 @@ GNUNET_TESTBED_peer_destroy (struct GNUNET_TESTBED_Peer *peer)
 {
   struct OperationContext *opc;
 
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->data = peer;
   opc->c = peer->controller;
   opc->id = GNUNET_TESTBED_get_next_op_id (peer->controller);
@@ -679,13 +842,13 @@ GNUNET_TESTBED_overlay_connect (void *op_cls,
   struct OperationContext *opc;
   struct OverlayConnectData *data;
 
-  GNUNET_assert ((PS_STARTED == p1->state) && (PS_STARTED == p2->state));
-  data = GNUNET_malloc (sizeof (struct OverlayConnectData));
+  GNUNET_assert ((TESTBED_PS_STARTED == p1->state) && (TESTBED_PS_STARTED == p2->state));
+  data = GNUNET_new (struct OverlayConnectData);
   data->p1 = p1;
   data->p2 = p2;
   data->cb = cb;
   data->cb_cls = cb_cls;
-  opc = GNUNET_malloc (sizeof (struct OperationContext));
+  opc = GNUNET_new (struct OperationContext);
   opc->data = data;
   opc->c = p1->controller;
   opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
@@ -700,5 +863,118 @@ GNUNET_TESTBED_overlay_connect (void *op_cls,
 }
 
 
+/**
+ * Function called when a peer manage service operation is ready
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+opstart_manage_service (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct ManageServiceData *data = opc->data;
+  struct GNUNET_TESTBED_ManagePeerServiceMessage *msg;
+
+  GNUNET_assert (NULL != data);
+  msg = GNUNET_malloc (data->msize);
+  msg->header.size = htons (data->msize);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_MANAGE_PEER_SERVICE);
+  msg->peer_id = htonl (data->peer->unique_id);
+  msg->operation_id = GNUNET_htonll (opc->id);
+  msg->start = (uint8_t) data->start;
+  (void) memcpy (&msg[1], data->service_name, data->msize
+                 - sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage));
+  GNUNET_free (data->service_name);
+  data->service_name = NULL;
+  opc->state = OPC_STATE_STARTED;
+  GNUNET_TESTBED_insert_opc_ (opc->c, opc);
+  GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+}
+
+
+/**
+ * Callback which will be called when peer manage server operation is released
+ *
+ * @param cls the closure from GNUNET_TESTBED_operation_create_()
+ */
+static void
+oprelease_manage_service (void *cls)
+{
+  struct OperationContext *opc = cls;
+  struct ManageServiceData *data;
+
+  data = opc->data;
+  switch (opc->state)
+  {
+  case OPC_STATE_STARTED:
+    GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+    break;
+  case OPC_STATE_INIT:
+    GNUNET_assert (NULL != data);
+    GNUNET_free (data->service_name);
+    break;
+  case OPC_STATE_FINISHED:
+    break;
+  }
+  GNUNET_free_non_null (data);
+  GNUNET_free (opc);
+}
+
+
+/**
+ * Start or stop given service at a peer.  This should not be called to
+ * start/stop the peer's ARM service.  Use GNUNET_TESTBED_peer_start(),
+ * GNUNET_TESTBED_peer_stop() for starting/stopping peer's ARM service.  Success
+ * or failure of the generated operation is signalled through the controller
+ * event callback and/or operation completion callback.
+ *
+ * @param op_cls the closure for the operation
+ * @param peer the peer whose service is to be started/stopped
+ * @param service_name the name of the service
+ * @param cb the operation completion callback
+ * @param cb_cls the closure for the operation completion callback
+ * @param start 1 to start the service; 0 to stop the service
+ * @return an operation handle; NULL upon error (peer not running)
+ */
+struct GNUNET_TESTBED_Operation *
+GNUNET_TESTBED_peer_manage_service (void *op_cls,
+                                    struct GNUNET_TESTBED_Peer *peer,
+                                    const char *service_name,
+                                    GNUNET_TESTBED_OperationCompletionCallback cb,
+                                    void *cb_cls,
+                                    unsigned int start)
+{
+  struct ManageServiceData *data;
+  struct OperationContext *opc;
+  size_t msize;
+
+  GNUNET_assert (TESTBED_PS_STARTED == peer->state); /* peer is not running? */
+  msize = strlen (service_name) + 1;
+  msize += sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage);
+  if (GNUNET_SERVER_MAX_MESSAGE_SIZE < msize)
+    return NULL;
+  data = GNUNET_new (struct ManageServiceData);
+  data->cb = cb;
+  data->cb_cls = cb_cls;
+  data->peer = peer;
+  data->service_name = GNUNET_strdup (service_name);
+  data->start = start;
+  data->msize = (uint16_t) msize;
+  opc = GNUNET_new (struct OperationContext);
+  opc->data = data;
+  opc->c = peer->controller;
+  opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
+  opc->type = OP_MANAGE_SERVICE;
+  opc->op_cls = op_cls;
+  opc->op =
+      GNUNET_TESTBED_operation_create_ (opc, &opstart_manage_service,
+                                        &oprelease_manage_service);
+  GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations,
+                                          opc->op);
+  GNUNET_TESTBED_operation_begin_wait_ (opc->op);
+  return opc->op;
+}
+
+
 
 /* end of testbed_api_peers.c */