implement exponential back-off cool down for ARM process restarts
[oweals/gnunet.git] / src / arm / arm_api.c
index d27b6b780cf9a8303f13476ea56bae6af007a0ea..56af544b162b99a9e45ba9af803107f7d3cc63fc 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010, 2012, 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2012, 2013, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
 
 #define LOG(kind,...) GNUNET_log_from (kind, "arm-api",__VA_ARGS__)
 
+
 /**
- * Handle for interacting with ARM.
+ * Entry in a doubly-linked list of operations awaiting for replies
+ * (in-order) from the ARM service.
  */
-struct GNUNET_ARM_Handle
+struct GNUNET_ARM_Operation
 {
   /**
-   * Our control connection to the ARM service.
-   */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * The configuration that we are using.
-   */
-  struct GNUNET_CONFIGURATION_Handle *cfg;
-
-  /**
-   * Handle for our current transmission request.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *cth;
-
-  /**
-   * Head of doubly-linked list of pending requests.
-   */
-  struct ARMControlMessage *control_pending_head;
-
-  /**
-   * Tail of doubly-linked list of pending requests.
-   */
-  struct ARMControlMessage *control_pending_tail;
-
-  /**
-   * Head of doubly-linked list of sent requests.
+   * This is a doubly-linked list.
    */
-  struct ARMControlMessage *control_sent_head;
+  struct GNUNET_ARM_Operation *next;
 
   /**
-   * Tail of doubly-linked list of sent requests.
+   * This is a doubly-linked list.
    */
-  struct ARMControlMessage *control_sent_tail;
+  struct GNUNET_ARM_Operation *prev;
 
   /**
-   * Callback to invoke on connection/disconnection.
+   * ARM handle.
    */
-  GNUNET_ARM_ConnectionStatusCallback conn_status;
+  struct GNUNET_ARM_Handle *h;
 
   /**
-   * Closure for conn_status.
+   * Callback for service state change requests.
    */
-  void *conn_status_cls;
+  GNUNET_ARM_ResultCallback result_cont;
 
   /**
-   * ARM control message for the 'arm_termination_handler'
-   * with the continuation to call once the ARM shutdown is done.
+   * Callback for service list requests.
    */
-  struct ARMControlMessage *thm;
+  GNUNET_ARM_ServiceListCallback list_cont;
 
   /**
-   * ID of the reconnect task (if any).
+   * Closure for @e result_cont or @e list_cont.
    */
-  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+  void *cont_cls;
 
   /**
-   * Current delay we use for re-trying to connect to core.
+   * Task for async completion.
    */
-  struct GNUNET_TIME_Relative retry_backoff;
+  struct GNUNET_SCHEDULER_Task *async;
 
   /**
-   * Counter for request identifiers
+   * Unique ID for the request.
    */
-  uint64_t request_id_counter;
+  uint64_t id;
 
   /**
-   * Are we currently disconnected and hence unable to send?
+   * Result of this operation for #notify_starting().
    */
-  unsigned char currently_down;
+  enum GNUNET_ARM_Result starting_ret;
 
   /**
-   * GNUNET_YES if we're running a service test.
+   * Is this an operation to stop the ARM service?
    */
-  unsigned char service_test_is_active;
+  int is_arm_stop;
 };
 
 
 /**
- * Entry in a doubly-linked list of control messages to be transmitted
- * to the arm service.
- *
- * The actual message is allocated at the end of this struct.
+ * Handle for interacting with ARM.
  */
-struct ARMControlMessage
+struct GNUNET_ARM_Handle
 {
   /**
-   * This is a doubly-linked list.
+   * Our connection to the ARM service.
    */
-  struct ARMControlMessage *next;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
-   * This is a doubly-linked list.
+   * The configuration that we are using.
    */
-  struct ARMControlMessage *prev;
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * ARM handle.
+   * Head of doubly-linked list of pending operations.
    */
-  struct GNUNET_ARM_Handle *h;
+  struct GNUNET_ARM_Operation *operation_pending_head;
 
   /**
-   * Message to send.
+   * Tail of doubly-linked list of pending operations.
    */
-  struct GNUNET_ARM_Message *msg;
+  struct GNUNET_ARM_Operation *operation_pending_tail;
 
   /**
-   * Callback for service state change requests.
+   * Callback to invoke on connection/disconnection.
    */
-  GNUNET_ARM_ResultCallback result_cont;
+  GNUNET_ARM_ConnectionStatusCallback conn_status;
 
   /**
-   * Callback for service list requests.
+   * Closure for @e conn_status.
    */
-  GNUNET_ARM_ServiceListCallback list_cont;
+  void *conn_status_cls;
 
   /**
-   * Closure for 'result_cont' or 'list_cont'.
+   * ARM operation where the goal is to wait for ARM shutdown to
+   * complete.  This operation is special in that it waits for an
+   * error on the @e mq.  So we complete it by calling the
+   * continuation in the #mq_error_handler().  Note that the operation
+   * is no longer in the @e operation_pending_head DLL once it is
+   * referenced from this field.
    */
-  void *cont_cls;
+  struct GNUNET_ARM_Operation *thm;
 
   /**
-   * Timeout for the operation.
+   * ID of the reconnect task (if any).
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
   /**
-   * Task to run when request times out.
+   * Current delay we use for re-trying to connect to core.
    */
-  GNUNET_SCHEDULER_TaskIdentifier timeout_task_id;
+  struct GNUNET_TIME_Relative retry_backoff;
 
   /**
-   * Flags for passing std descriptors to ARM (when starting ARM).
+   * Counter for request identifiers.  They are used to match replies
+   * from ARM to operations in the @e operation_pending_head DLL.
    */
-  enum GNUNET_OS_InheritStdioFlags std_inheritance;
+  uint64_t request_id_counter;
 
   /**
-   * Type of the request expressed as a message type (start, stop or list).
+   * Have we detected that ARM is up?
    */
-  uint16_t type;
+  int currently_up;
+
 };
 
 
@@ -184,36 +164,23 @@ struct ARMControlMessage
  * Connect to arm.
  *
  * @param h arm handle
- * @return GNUNET_OK on success, GNUNET_SYSERR on failure
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
 static int
 reconnect_arm (struct GNUNET_ARM_Handle *h);
 
 
-/**
- * Check the list of pending requests, send the next
- * one to the arm.
- *
- * @param h arm handle
- * @param ignore_currently_down transmit message even if not initialized?
- */
-static void
-trigger_next_request (struct GNUNET_ARM_Handle *h, int ignore_currently_down);
-
-
 /**
  * Task scheduled to try to re-connect to arm.
  *
- * @param cls the 'struct GNUNET_ARM_Handle'
- * @param tc task context
+ * @param cls the `struct GNUNET_ARM_Handle`
  */
 static void
-reconnect_arm_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+reconnect_arm_task (void *cls)
 {
   struct GNUNET_ARM_Handle *h = cls;
 
-  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to ARM service after delay\n");
+  h->reconnect_task = NULL;
   reconnect_arm (h);
 }
 
@@ -227,29 +194,37 @@ reconnect_arm_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 static void
 reconnect_arm_later (struct GNUNET_ARM_Handle *h)
 {
-  if (GNUNET_NO != h->currently_down)
-    return;
-  if (NULL != h->cth)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
-    h->cth = NULL;
-  }
-  if (NULL != h->client)
+  struct GNUNET_ARM_Operation *op;
+
+  if (NULL != h->mq)
   {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
-  h->currently_down = GNUNET_YES;
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task);
+  h->currently_up = GNUNET_NO;
+  GNUNET_assert (NULL == h->reconnect_task);
   h->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_arm_task, h);
-  /* Don't clear pending messages on disconnection, deliver them later
-  clear_pending_messages (h, GNUNET_ARM_REQUEST_DISCONNECTED);
-  GNUNET_assert (NULL == h->control_pending_head);
-  */
+      GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
+                                   &reconnect_arm_task,
+                                   h);
+  while (NULL != (op = h->operation_pending_head))
+  {
+    if (NULL != op->result_cont)
+      op->result_cont (op->cont_cls,
+                       GNUNET_ARM_REQUEST_DISCONNECTED,
+                       0);
+    if (NULL != op->list_cont)
+      op->list_cont (op->cont_cls,
+                     GNUNET_ARM_REQUEST_DISCONNECTED,
+                     0,
+                     NULL);
+    GNUNET_ARM_operation_cancel (op);
+  }
+  GNUNET_assert (NULL == h->operation_pending_head);
   h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
   if (NULL != h->conn_status)
-    h->conn_status (h->conn_status_cls, GNUNET_NO);
+    h->conn_status (h->conn_status_cls,
+                    GNUNET_NO);
 }
 
 
@@ -260,167 +235,50 @@ reconnect_arm_later (struct GNUNET_ARM_Handle *h)
  * @param id unique message ID to use for the lookup
  * @return NULL if not found
  */
-static struct ARMControlMessage *
-find_cm_by_id (struct GNUNET_ARM_Handle *h, uint64_t id)
+static struct GNUNET_ARM_Operation *
+find_op_by_id (struct GNUNET_ARM_Handle *h,
+               uint64_t id)
 {
-  struct ARMControlMessage *result;
-  for (result = h->control_sent_head; result; result = result->next)
-    if (id == result->msg->request_id)
+  struct GNUNET_ARM_Operation *result;
+
+  for (result = h->operation_pending_head; NULL != result; result = result->next)
+    if (id == result->id)
       return result;
   return NULL;
 }
 
 
-/**
- * Handler for ARM 'termination' reply (failure to receive).
- *
- * @param cls our "struct GNUNET_ARM_Handle"
- * @param msg expected to be NULL
- */
-static void
-arm_termination_handler (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_ARM_Handle *h = cls;
-  struct ARMControlMessage *cm;
-
-  if (NULL != msg)
-  {
-    GNUNET_break (0);
-    GNUNET_CLIENT_receive (h->client, &arm_termination_handler, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    return;
-  }
-  cm = h->thm;
-  h->thm = NULL;
-  h->currently_down = GNUNET_YES;
-  GNUNET_CLIENT_disconnect (h->client);
-  h->client = NULL;
-  if (NULL != cm->result_cont)
-    cm->result_cont (cm->cont_cls,
-                    GNUNET_ARM_REQUEST_SENT_OK,
-                    (const char *) &cm->msg[1],
-                    GNUNET_ARM_RESULT_STOPPED);
-  GNUNET_free (cm->msg);
-  GNUNET_free (cm);
-}
-
-
 /**
  * Handler for ARM replies.
  *
- * @param cls our "struct GNUNET_ARM_Handle"
- * @param msg the message received from the arm service
+ * @param cls our `struct GNUNET_ARM_Handle`
+ * @param res the message received from the arm service
  */
 static void
-client_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+handle_arm_result (void *cls,
+                   const struct GNUNET_ARM_ResultMessage *res)
 {
   struct GNUNET_ARM_Handle *h = cls;
-  const struct GNUNET_ARM_Message *arm_msg;
-  const struct GNUNET_ARM_ResultMessage *res;
-  const struct GNUNET_ARM_ListResultMessage *lres;
-  struct ARMControlMessage *cm;
-  const char **list;
-  const char *pos;
+  struct GNUNET_ARM_Operation *op;
   uint64_t id;
   enum GNUNET_ARM_Result result;
-  uint16_t size_check;
-  uint16_t rcount;
-  uint16_t msize;
-  unsigned char fail;
+  GNUNET_ARM_ResultCallback result_cont;
+  void *result_cont_cls;
 
-  list = NULL;
-  rcount = 0;
-  if (NULL == msg)
-  {
-    LOG (GNUNET_ERROR_TYPE_INFO,
-         _("Client was disconnected from arm service, trying to reconnect.\n"));
-    reconnect_arm_later (h);
-    return;
-  }
-  msize = ntohs (msg->size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Processing message of type %u and size %u from arm service\n",
-       ntohs (msg->type), msize);
-  if (msize < sizeof (struct GNUNET_ARM_Message))
-  {
-    GNUNET_break (0);
-    reconnect_arm_later (h);
-    return;
-  }
-  arm_msg = (const struct GNUNET_ARM_Message *) msg;
-  GNUNET_break (0 == ntohl (arm_msg->reserved));
-  id = GNUNET_ntohll (arm_msg->request_id);
-  cm = find_cm_by_id (h, id);
-  if (NULL == cm)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Message with unknown id %llu\n", id);
-    return;
-  }
-  fail = GNUNET_NO;
-  switch (ntohs (msg->type))
-  {
-  case GNUNET_MESSAGE_TYPE_ARM_RESULT:
-    if (msize < sizeof (struct GNUNET_ARM_ResultMessage))
-    {
-      GNUNET_assert (0);
-      fail = GNUNET_YES;
-    }
-    break;
-  case GNUNET_MESSAGE_TYPE_ARM_LIST_RESULT:
-    if (msize < sizeof (struct GNUNET_ARM_ListResultMessage))
-    {
-      GNUNET_break (0);
-      fail = GNUNET_YES;
-      break;
-    }
-    size_check = 0;
-    lres = (const struct GNUNET_ARM_ListResultMessage *) msg;
-    rcount = ntohs (lres->count);
-    {
-      unsigned int i;
-
-      list = GNUNET_malloc (sizeof (const char *) * rcount);
-      pos = (const char *)&lres[1];
-      for (i = 0; i < rcount; i++)
-      {
-        const char *end = memchr (pos, 0, msize - size_check);
-        if (NULL == end)
-        {
-          GNUNET_break (0);
-          fail = GNUNET_YES;
-          break;
-        }
-        list[i] = pos;
-        size_check += (end - pos) + 1;
-        pos = end + 1;
-      }
-      if (GNUNET_YES == fail)
-      {
-        GNUNET_free (list);
-        list = NULL;
-      }
-    }
-    break;
-  default:
-    fail = GNUNET_YES;
-    break;
-  }
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != cm->timeout_task_id);
-  GNUNET_SCHEDULER_cancel (cm->timeout_task_id);
-  GNUNET_CONTAINER_DLL_remove (h->control_sent_head,
-                               h->control_sent_tail, cm);
-  if (GNUNET_YES == fail)
+  id = GNUNET_ntohll (res->arm_msg.request_id);
+  op = find_op_by_id (h,
+                      id);
+  if (NULL == op)
   {
-    reconnect_arm_later (h);
-    GNUNET_free (cm->msg);
-    GNUNET_free (cm);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Message with unknown id %llu\n",
+         (unsigned long long) id);
     return;
   }
-  if ( (GNUNET_MESSAGE_TYPE_ARM_RESULT == ntohs (msg->type)) &&
-       (0 == strcasecmp ((const char *) &cm->msg[1],
-                        "arm")) &&
-       (NULL != (res = (const struct GNUNET_ARM_ResultMessage *) msg)) &&
-       (GNUNET_ARM_RESULT_STOPPING == ntohl (res->result)) )
+
+  result = (enum GNUNET_ARM_Result) ntohl (res->result);
+  if ( (GNUNET_YES == op->is_arm_stop) &&
+       (GNUNET_ARM_RESULT_STOPPING == result) )
   {
     /* special case: if we are stopping 'gnunet-service-arm', we do not just
        wait for the result message, but also wait for the service to close
@@ -430,165 +288,159 @@ client_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
     if (NULL != h->thm)
     {
       GNUNET_break (0);
-      cm->result_cont (h->thm->cont_cls,
+      op->result_cont (h->thm->cont_cls,
                       GNUNET_ARM_REQUEST_SENT_OK,
-                       (const char *) &h->thm->msg[1],
                       GNUNET_ARM_RESULT_IS_NOT_KNOWN);
-      GNUNET_free (h->thm->msg);
       GNUNET_free (h->thm);
     }
-    h->thm = cm;
-    GNUNET_CLIENT_receive (h->client, &arm_termination_handler, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
+    GNUNET_CONTAINER_DLL_remove (h->operation_pending_head,
+                                 h->operation_pending_tail,
+                                 op);
+    h->thm = op;
     return;
   }
-  GNUNET_CLIENT_receive (h->client, &client_notify_handler, h,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  switch (ntohs (msg->type))
-  {
-  case GNUNET_MESSAGE_TYPE_ARM_RESULT:
-    res = (const struct GNUNET_ARM_ResultMessage *) msg;
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received response from ARM for service `%s': %u\n",
-         (const char *) &cm->msg[1], ntohs (msg->type));
-    result = (enum GNUNET_ARM_Result) ntohl (res->result);
-    if (NULL != cm->result_cont)
-      cm->result_cont (cm->cont_cls, GNUNET_ARM_REQUEST_SENT_OK,
-                       (const char *) &cm->msg[1], result);
-    break;
-  case GNUNET_MESSAGE_TYPE_ARM_LIST_RESULT:
-    if (NULL != cm->list_cont)
-        cm->list_cont (cm->cont_cls, GNUNET_ARM_REQUEST_SENT_OK, rcount,
-                       list);
-    GNUNET_free (list);
-    break;
-  }
-  GNUNET_free (cm->msg);
-  GNUNET_free (cm);
+  result_cont = op->result_cont;
+  result_cont_cls = op->cont_cls;
+  GNUNET_ARM_operation_cancel (op);
+  if (NULL != result_cont)
+    result_cont (result_cont_cls,
+                 GNUNET_ARM_REQUEST_SENT_OK,
+                 result);
 }
 
 
 /**
- * Transmit the next message to the arm service.
+ * Checked that list result message is well-formed.
  *
- * @param cls closure with the 'struct GNUNET_ARM_Handle'
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls our `struct GNUNET_ARM_Handle`
+ * @param lres the message received from the arm service
+ * @return #GNUNET_OK if message is well-formed
  */
-static size_t
-transmit_arm_message (void *cls, size_t size, void *buf)
+static int
+check_arm_list_result (void *cls,
+                       const struct GNUNET_ARM_ListResultMessage *lres)
 {
-  struct GNUNET_ARM_Handle *h = cls;
-  struct ARMControlMessage *cm;
-  struct GNUNET_ARM_Message *arm_msg;
-  uint64_t request_id;
-  int notify_connection;
-  uint16_t msize;
+  const char *pos = (const char *) &lres[1];
+  uint16_t rcount = ntohs (lres->count);
+  uint16_t msize = ntohs (lres->arm_msg.header.size) - sizeof (*lres);
+  uint16_t size_check;
 
-  notify_connection = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "transmit_arm_message is running with %p buffer of size %lu. ARM is known to be %s\n",
-      buf, size, h->currently_down ? "unconnected" : "connected");
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task);
-  h->cth = NULL;
-  if ((GNUNET_YES == h->currently_down) && (NULL != buf))
+  size_check = 0;
+  for (unsigned int i = 0; i < rcount; i++)
   {
-    h->currently_down = GNUNET_NO;
-    notify_connection = GNUNET_YES;
-    h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-    GNUNET_CLIENT_receive (h->client, &client_notify_handler, h,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
-  if (NULL == buf)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Transmission failed, initiating reconnect\n");
-    reconnect_arm_later (h);
-    return 0;
-  }
-  if (NULL == (cm = h->control_pending_head))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue is empty, not sending anything\n");
-    msize = 0;
-    goto end;
-  }
-  GNUNET_assert (NULL != cm->msg);
-  msize = ntohs (cm->msg->header.size);
-  if (size < msize)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Request is too big (%u < %u), not sending it\n", size, msize);
-    trigger_next_request (h, GNUNET_NO);
-    msize = 0;
-    goto end;
+    const char *end = memchr (pos, 0, msize - size_check);
+    if (NULL == end)
+    {
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
+    size_check += (end - pos) + 1;
+    pos = end + 1;
   }
-  arm_msg = cm->msg;
-  if (0 == h->request_id_counter)
-    h->request_id_counter++;
-  request_id = h->request_id_counter++;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting control message with %u bytes of type %u to arm with id %llu\n",
-       (unsigned int) msize, (unsigned int) ntohs (cm->msg->header.type), request_id);
-  arm_msg->reserved = htonl (0);
-  arm_msg->request_id = GNUNET_htonll (request_id);
-  memcpy (buf, cm->msg, msize);
-  /* Otherwise we won't be able to find it later! */
-  arm_msg->request_id = request_id;
-  GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
-                               h->control_pending_tail, cm);
-  GNUNET_CONTAINER_DLL_insert_tail (h->control_sent_head,
-                                    h->control_sent_tail, cm);
-  /* Don't free msg, keep it around (kind of wasteful, but then we don't
-   * really have many messages to handle, and it'll be freed when it times
-   * out anyway.
-   */
-  trigger_next_request (h, GNUNET_NO);
-
- end:
-  if ((GNUNET_YES == notify_connection) && (NULL != h->conn_status))
-    h->conn_status (h->conn_status_cls, GNUNET_YES);
-  return msize;
+  return GNUNET_OK;
 }
 
 
 /**
- * Check the list of pending requests, send the next
- * one to the arm.
+ * Handler for ARM list replies.
  *
- * @param h arm handle
- * @param ignore_currently_down transmit message even if not initialized?
+ * @param cls our `struct GNUNET_ARM_Handle`
+ * @param lres the message received from the arm service
  */
 static void
-trigger_next_request (struct GNUNET_ARM_Handle *h, int ignore_currently_down)
+handle_arm_list_result (void *cls,
+                        const struct GNUNET_ARM_ListResultMessage *lres)
 {
-  uint16_t msize;
+  struct GNUNET_ARM_Handle *h = cls;
+  uint16_t rcount = ntohs (lres->count);
+  const char *list[rcount];
+  const char *pos = (const char *) &lres[1];
+  uint16_t msize = ntohs (lres->arm_msg.header.size) - sizeof (*lres);
+  struct GNUNET_ARM_Operation *op;
+  uint16_t size_check;
+  uint64_t id;
 
-  msize = sizeof (struct GNUNET_MessageHeader);
-  if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
+  id = GNUNET_ntohll (lres->arm_msg.request_id);
+  op = find_op_by_id (h,
+                      id);
+  if (NULL == op)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "ARM connection down, not processing queue\n");
+         "Message with unknown id %llu\n",
+         (unsigned long long) id);
     return;
   }
-  if (NULL != h->cth)
+  size_check = 0;
+  for (unsigned int i = 0; i < rcount; i++)
+  {
+    const char *end = memchr (pos,
+                              0,
+                              msize - size_check);
+
+    /* Assert, as this was already checked in #check_arm_list_result() */
+    GNUNET_assert (NULL != end);
+    list[i] = pos;
+    size_check += (end - pos) + 1;
+    pos = end + 1;
+  }
+  if (NULL != op->list_cont)
+    op->list_cont (op->cont_cls,
+                   GNUNET_ARM_REQUEST_SENT_OK,
+                   rcount,
+                   list);
+  GNUNET_ARM_operation_cancel (op);
+}
+
+
+/**
+ * Receive confirmation from test, ARM service is up.
+ *
+ * @param cls closure with the `struct GNUNET_ARM_Handle`
+ * @param msg message received
+ */
+static void
+handle_confirm (void *cls,
+                const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_ARM_Handle *h = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got confirmation from ARM that we are up!\n");
+  if (GNUNET_NO == h->currently_up)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
-    return;
+    h->currently_up = GNUNET_YES;
+    if (NULL != h->conn_status)
+      h->conn_status (h->conn_status_cls,
+                      GNUNET_YES);
   }
-  if (NULL != h->control_pending_head)
-    msize =
-        ntohs (h->control_pending_head->msg->header.size);
-  else if (GNUNET_NO == ignore_currently_down)
+}
+
+
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_ARM_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_ARM_Handle *h = cls;
+  struct GNUNET_ARM_Operation *op;
+
+  h->currently_up = GNUNET_NO;
+  if (NULL != (op = h->thm))
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Request queue empty, not processing queue\n");
-    return;                     /* no pending message */
+    h->thm = NULL;
+    op->result_cont (op->cont_cls,
+                    GNUNET_ARM_REQUEST_SENT_OK,
+                    GNUNET_ARM_RESULT_STOPPED);
+    GNUNET_free (op);
   }
-  h->cth =
-      GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           GNUNET_NO, &transmit_arm_message, h);
+  reconnect_arm_later (h);
 }
 
 
@@ -596,25 +448,52 @@ trigger_next_request (struct GNUNET_ARM_Handle *h, int ignore_currently_down)
  * Connect to arm.
  *
  * @param h arm handle
- * @return GNUNET_OK on success, GNUNET_SYSERR on failure
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
 static int
 reconnect_arm (struct GNUNET_ARM_Handle *h)
 {
-  GNUNET_assert (NULL == h->client);
-  GNUNET_assert (GNUNET_YES == h->currently_down);
-  h->client = GNUNET_CLIENT_connect ("arm", h->cfg);
-  if (NULL == h->client)
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (arm_result,
+                             GNUNET_MESSAGE_TYPE_ARM_RESULT,
+                             struct GNUNET_ARM_ResultMessage,
+                             h),
+    GNUNET_MQ_hd_var_size (arm_list_result,
+                           GNUNET_MESSAGE_TYPE_ARM_LIST_RESULT,
+                           struct GNUNET_ARM_ListResultMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (confirm,
+                             GNUNET_MESSAGE_TYPE_ARM_TEST,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MessageHeader *test;
+  struct GNUNET_MQ_Envelope *env;
+
+  if (NULL != h->mq)
+    return GNUNET_OK;
+  GNUNET_assert (GNUNET_NO == h->currently_up);
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                 "arm",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-          "arm_api, GNUNET_CLIENT_connect returned NULL\n");
+         "GNUNET_CLIENT_connect returned NULL\n");
     if (NULL != h->conn_status)
-      h->conn_status (h->conn_status_cls, GNUNET_SYSERR);
+      h->conn_status (h->conn_status_cls,
+                      GNUNET_SYSERR);
     return GNUNET_SYSERR;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "arm_api, GNUNET_CLIENT_connect returned non-NULL\n");
-  trigger_next_request (h, GNUNET_YES);
+       "Sending TEST message to ARM\n");
+  env = GNUNET_MQ_msg (test,
+                       GNUNET_MESSAGE_TYPE_ARM_TEST);
+  GNUNET_MQ_send (h->mq,
+                  env);
   return GNUNET_OK;
 }
 
@@ -627,21 +506,20 @@ reconnect_arm (struct GNUNET_ARM_Handle *h)
  *        the ARM service may internally use a different
  *        configuration to determine how to start the service).
  * @param conn_status will be called when connecting/disconnecting
- * @param cls closure for conn_status
+ * @param conn_status_cls closure for @a conn_status
  * @return context to use for further ARM operations, NULL on error.
  */
 struct GNUNET_ARM_Handle *
 GNUNET_ARM_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                    GNUNET_ARM_ConnectionStatusCallback conn_status, void *cls)
+                    GNUNET_ARM_ConnectionStatusCallback conn_status,
+                   void *conn_status_cls)
 {
   struct GNUNET_ARM_Handle *h;
 
-  h = GNUNET_malloc (sizeof (struct GNUNET_ARM_Handle));
-  h->cfg = GNUNET_CONFIGURATION_dup (cfg);
-  h->currently_down = GNUNET_YES;
-  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  h = GNUNET_new (struct GNUNET_ARM_Handle);
+  h->cfg = cfg;
   h->conn_status = conn_status;
-  h->conn_status_cls = cls;
+  h->conn_status_cls = conn_status_cls;
   if (GNUNET_OK != reconnect_arm (h))
   {
     GNUNET_free (h);
@@ -657,106 +535,60 @@ GNUNET_ARM_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
  * @param h the handle that was being used
  */
 void
-GNUNET_ARM_disconnect_and_free (struct GNUNET_ARM_Handle *h)
+GNUNET_ARM_disconnect (struct GNUNET_ARM_Handle *h)
 {
-  struct ARMControlMessage *cm;
+  struct GNUNET_ARM_Operation *op;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from ARM service\n");
-  if (NULL != h->cth)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
-    h->cth = NULL;
-  }
-  while ((NULL != (cm = h->control_pending_head))
-         || (NULL != (cm = h->control_sent_head)) )
-  {
-    if (NULL != h->control_pending_head)
-      GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
-                                   h->control_pending_tail, cm);
-    else
-      GNUNET_CONTAINER_DLL_remove (h->control_sent_head,
-                                   h->control_sent_tail, cm);
-    GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != cm->timeout_task_id);
-    GNUNET_SCHEDULER_cancel (cm->timeout_task_id);
-    if (NULL != cm->result_cont)
-      cm->result_cont (cm->cont_cls, GNUNET_ARM_REQUEST_DISCONNECTED,
-                       NULL, 0);
-    /* FIXME: What about list callback? */
-    GNUNET_free_non_null (cm->msg);
-    GNUNET_free (cm);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Disconnecting from ARM service\n");
+  while (NULL != (op = h->operation_pending_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (h->operation_pending_head,
+                                 h->operation_pending_tail,
+                                 op);
+    if (NULL != op->result_cont)
+      op->result_cont (op->cont_cls,
+                       GNUNET_ARM_REQUEST_DISCONNECTED,
+                       0);
+    if (NULL != op->list_cont)
+      op->list_cont (op->cont_cls,
+                     GNUNET_ARM_REQUEST_DISCONNECTED,
+                     0,
+                     NULL);
+    if (NULL != op->async)
+    {
+      GNUNET_SCHEDULER_cancel (op->async);
+      op->async = NULL;
+    }
+    GNUNET_free (op);
   }
-  if (NULL != h->client)
+  if (NULL != h->mq)
   {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
-  if (GNUNET_SCHEDULER_NO_TASK != h->reconnect_task)
+  if (NULL != h->reconnect_task)
   {
     GNUNET_SCHEDULER_cancel (h->reconnect_task);
-    h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-  if (GNUNET_NO == h->service_test_is_active)
-  {
-    GNUNET_CONFIGURATION_destroy (h->cfg);
-    GNUNET_free (h);
+    h->reconnect_task = NULL;
   }
+  GNUNET_free (h);
 }
 
 
-/**
- * Message timed out. Remove it from the queue.
- *
- * @param cls the message (struct ARMControlMessage *)
- * @param tc task context
- */
-static void
-control_message_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct ARMControlMessage *cm = cls;
-  struct GNUNET_ARM_Message *arm_msg;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Control message timed out\n");
-  arm_msg = cm->msg;
-  if ((NULL == arm_msg) || (0 == arm_msg->request_id))
-  {
-    GNUNET_CONTAINER_DLL_remove (cm->h->control_pending_head,
-                                 cm->h->control_pending_tail, cm);
-  }
-  else
-  {
-    GNUNET_CONTAINER_DLL_remove (cm->h->control_sent_head,
-                                 cm->h->control_sent_tail, cm);
-  }
-  if (NULL != cm->result_cont)
-    cm->result_cont (cm->cont_cls, GNUNET_ARM_REQUEST_TIMEOUT, NULL, 0);
-  else if (NULL != cm->list_cont)
-    cm->list_cont (cm->cont_cls, GNUNET_ARM_REQUEST_TIMEOUT, 0, NULL);
-  GNUNET_free_non_null (cm->msg);
-  GNUNET_free (cm);
-}
-
-
-#include "do_start_process.c"
-
-
 /**
  * A client specifically requested starting of ARM itself.
- * This function is called with information about whether
- * or not ARM is running; if it is, report success.  If
- * it is not, start the ARM process.
+ * Starts the ARM service.
  *
- * @param cls the context for the request that we will report on (struct ARMControlMessage *)
- * @param result GNUNET_YES if ARM is running
+ * @param h the handle with configuration details
+ * @param std_inheritance inheritance of std streams
+ * @return operation status code
  */
-static void
-arm_service_report (void *cls,
-                   int result)
+static enum GNUNET_ARM_Result
+start_arm_service (struct GNUNET_ARM_Handle *h,
+                   enum GNUNET_OS_InheritStdioFlags std_inheritance)
 {
-  struct ARMControlMessage *cm = cls;
-  struct GNUNET_ARM_Handle *h;
   struct GNUNET_OS_Process *proc;
-  unsigned char test_is_active;
   char *cbinary;
   char *binary;
   char *quotedbinary;
@@ -764,94 +596,103 @@ arm_service_report (void *cls,
   char *loprefix;
   char *lopostfix;
 
-  test_is_active = cm->h->service_test_is_active;
-  if ((GNUNET_YES == test_is_active) &&
-      (GNUNET_YES == result))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Looks like `%s' is already running.\n",
-        "gnunet-service-arm");
-    /* arm is running! */
-    if (cm->result_cont)
-      cm->result_cont (cm->cont_cls,
-                      GNUNET_ARM_REQUEST_SENT_OK, "arm",
-                      GNUNET_ARM_RESULT_IS_STARTED_ALREADY);
-  }
-  if (GNUNET_NO == test_is_active)
-  {
-    /* User disconnected & destroyed ARM handle in the middle of
-     * the service test, so we kept the handle around until now.
-     */
-    GNUNET_CONFIGURATION_destroy (cm->h->cfg);
-    GNUNET_free (cm->h);
-  }
-  if ((GNUNET_YES == result) ||
-      (GNUNET_NO == test_is_active))
-  {
-    GNUNET_free (cm);
-    return;
-  }
-  cm->h->service_test_is_active = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Looks like `%s' is not running, will start it.\n",
-       "gnunet-service-arm");
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (
-      cm->h->cfg, "arm", "PREFIX", &loprefix))
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_string (h->cfg,
+                                             "arm",
+                                             "PREFIX",
+                                             &loprefix))
     loprefix = GNUNET_strdup ("");
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (
-      cm->h->cfg, "arm", "OPTIONS", &lopostfix))
+  else
+    loprefix = GNUNET_CONFIGURATION_expand_dollar (h->cfg,
+                                                   loprefix);
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_string (h->cfg,
+                                             "arm",
+                                             "OPTIONS",
+                                             &lopostfix))
     lopostfix = GNUNET_strdup ("");
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (
-      cm->h->cfg, "arm", "BINARY", &cbinary))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_WARNING, "arm", "BINARY");
-    if (cm->result_cont)
-      cm->result_cont (cm->cont_cls,
-                      GNUNET_ARM_REQUEST_SENT_OK, "arm",
-                      GNUNET_ARM_RESULT_IS_NOT_KNOWN);
-    GNUNET_free (cm);
+  else
+    lopostfix = GNUNET_CONFIGURATION_expand_dollar (h->cfg,
+                                                    lopostfix);
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_string (h->cfg,
+                                             "arm",
+                                             "BINARY",
+                                             &cbinary))
+  {
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_WARNING,
+                               "arm",
+                               "BINARY");
     GNUNET_free (loprefix);
     GNUNET_free (lopostfix);
-    return;
+    return GNUNET_ARM_RESULT_IS_NOT_KNOWN;
   }
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (
-      cm->h->cfg, "arm", "CONFIG", &config))
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_filename (h->cfg,
+                                               "arm",
+                                               "CONFIG",
+                                               &config))
     config = NULL;
   binary = GNUNET_OS_get_libexec_binary_path (cbinary);
   GNUNET_asprintf (&quotedbinary,
                   "\"%s\"",
                   binary);
   GNUNET_free (cbinary);
-  if ((GNUNET_YES == GNUNET_CONFIGURATION_have_value (
-          cm->h->cfg, "TESTING", "WEAKRANDOM")) &&
-      (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno (
-          cm->h->cfg, "TESTING", "WEAKRANDOM")) &&
-      (GNUNET_NO == GNUNET_CONFIGURATION_have_value (
-          cm->h->cfg, "TESTING", "HOSTFILE")))
+  if ( (GNUNET_YES ==
+        GNUNET_CONFIGURATION_have_value (h->cfg,
+                                         "TESTING",
+                                         "WEAKRANDOM")) &&
+       (GNUNET_YES ==
+        GNUNET_CONFIGURATION_get_value_yesno (h->cfg,
+                                              "TESTING",
+                                              "WEAKRANDOM")) &&
+       (GNUNET_NO ==
+        GNUNET_CONFIGURATION_have_value (h->cfg,
+                                         "TESTING",
+                                         "HOSTFILE")))
   {
     /* Means we are ONLY running locally */
     /* we're clearly running a test, don't daemonize */
     if (NULL == config)
-      proc = do_start_process (GNUNET_NO, cm->std_inheritance,
-                              NULL, loprefix, quotedbinary,
-                              /* no daemonization! */
-                              lopostfix, NULL);
+      proc = GNUNET_OS_start_process_s (GNUNET_NO,
+                                        std_inheritance,
+                                        NULL,
+                                        loprefix,
+                                        quotedbinary,
+                                        /* no daemonization! */
+                                        lopostfix,
+                                        NULL);
     else
-      proc = do_start_process (GNUNET_NO, cm->std_inheritance,
-                              NULL, loprefix, quotedbinary, "-c", config,
-                              /* no daemonization! */
-                              lopostfix, NULL);
+      proc = GNUNET_OS_start_process_s (GNUNET_NO,
+                                        std_inheritance,
+                                        NULL,
+                                        loprefix,
+                                        quotedbinary,
+                                        "-c", config,
+                                        /* no daemonization! */
+                                        lopostfix,
+                                        NULL);
   }
   else
   {
     if (NULL == config)
-      proc = do_start_process (GNUNET_NO, cm->std_inheritance,
-                              NULL, loprefix, quotedbinary,
-                              "-d", lopostfix, NULL);
+      proc = GNUNET_OS_start_process_s (GNUNET_NO,
+                                        std_inheritance,
+                                        NULL,
+                                        loprefix,
+                                        quotedbinary,
+                                        "-d", /* do daemonize */
+                                        lopostfix, NULL);
     else
-      proc = do_start_process (GNUNET_NO, cm->std_inheritance,
-                              NULL, loprefix, quotedbinary, "-c", config,
-                              "-d", lopostfix, NULL);
+      proc = GNUNET_OS_start_process_s (GNUNET_NO,
+                                        std_inheritance,
+                                        NULL,
+                                        loprefix,
+                                        quotedbinary,
+                                        "-c", config,
+                                        "-d", /* do daemonize */
+                                        lopostfix,
+                                        NULL);
   }
   GNUNET_free (binary);
   GNUNET_free (quotedbinary);
@@ -859,20 +700,32 @@ arm_service_report (void *cls,
   GNUNET_free (loprefix);
   GNUNET_free (lopostfix);
   if (NULL == proc)
+    return GNUNET_ARM_RESULT_START_FAILED;
+  GNUNET_OS_process_destroy (proc);
+  return GNUNET_ARM_RESULT_STARTING;
+}
+
+
+/**
+ * Abort an operation.  Only prevents the callback from being
+ * called, the operation may still complete.
+ *
+ * @param op operation to cancel
+ */
+void
+GNUNET_ARM_operation_cancel (struct GNUNET_ARM_Operation *op)
+{
+  struct GNUNET_ARM_Handle *h = op->h;
+
+  if (h->thm == op)
   {
-    if (cm->result_cont)
-      cm->result_cont (cm->cont_cls, GNUNET_ARM_REQUEST_SENT_OK, "arm",
-          GNUNET_ARM_RESULT_START_FAILED);
-    GNUNET_free (cm);
+    op->result_cont = NULL;
     return;
   }
-  if (cm->result_cont)
-    cm->result_cont (cm->cont_cls, GNUNET_ARM_REQUEST_SENT_OK, "arm",
-        GNUNET_ARM_RESULT_STARTING);
-  GNUNET_OS_process_destroy (proc);
-  h = cm->h;
-  GNUNET_free (cm);
-  reconnect_arm (h);
+  GNUNET_CONTAINER_DLL_remove (h->operation_pending_head,
+                               h->operation_pending_tail,
+                               op);
+  GNUNET_free (op);
 }
 
 
@@ -881,54 +734,103 @@ arm_service_report (void *cls,
  *
  * @param h handle to ARM
  * @param service_name name of the service
- * @param timeout how long to wait before failing for good
  * @param cb callback to invoke when service is ready
- * @param cb_cls closure for callback
+ * @param cb_cls closure for @a cb
  * @param type type of the request
+ * @return handle to queue, NULL on error
  */
-static void
-change_service (struct GNUNET_ARM_Handle *h, const char *service_name,
-               struct GNUNET_TIME_Relative timeout, GNUNET_ARM_ResultCallback cb,
-               void *cb_cls, uint16_t type)
+static struct GNUNET_ARM_Operation *
+change_service (struct GNUNET_ARM_Handle *h,
+                const char *service_name,
+                GNUNET_ARM_ResultCallback cb,
+               void *cb_cls,
+                uint16_t type)
 {
-  struct ARMControlMessage *cm;
+  struct GNUNET_ARM_Operation *op;
   size_t slen;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_ARM_Message *msg;
 
   slen = strlen (service_name) + 1;
   if (slen + sizeof (struct GNUNET_ARM_Message) >=
-      GNUNET_SERVER_MAX_MESSAGE_SIZE)
+      GNUNET_MAX_MESSAGE_SIZE)
   {
     GNUNET_break (0);
-    if (cb != NULL)
-      cb (cb_cls, GNUNET_ARM_REQUEST_TOO_LONG, NULL, 0);
-    return;
+    return NULL;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting %s of service `%s'.\n",
-       (GNUNET_MESSAGE_TYPE_ARM_START == type) ? "start" : "termination",
-       service_name);
-  cm = GNUNET_malloc (sizeof (struct ARMControlMessage) + slen);
-  cm->h = h;
-  cm->result_cont = cb;
-  cm->cont_cls = cb_cls;
-  cm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  memcpy (&cm[1], service_name, slen);
-  msg = GNUNET_malloc (sizeof (struct GNUNET_ARM_Message) + slen);
-  msg->header.size = htons (sizeof (struct GNUNET_ARM_Message) + slen);
-  msg->header.type = htons (type);
+  if (0 == h->request_id_counter)
+    h->request_id_counter++;
+  op = GNUNET_new (struct GNUNET_ARM_Operation);
+  op->h = h;
+  op->result_cont = cb;
+  op->cont_cls = cb_cls;
+  op->id = h->request_id_counter++;
+  GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head,
+                                    h->operation_pending_tail,
+                                    op);
+  env = GNUNET_MQ_msg_extra (msg,
+                             slen,
+                             type);
   msg->reserved = htonl (0);
-  memcpy (&msg[1], service_name, slen);
-  cm->msg = msg;
+  msg->request_id = GNUNET_htonll (op->id);
+  GNUNET_memcpy (&msg[1],
+          service_name,
+          slen);
+  GNUNET_MQ_send (h->mq,
+                  env);
+  return op;
+}
+
+
+/**
+ * Task run to notify application that ARM is already up.
+ *
+ * @param cls the operation that asked ARM to be started
+ */
+static void
+notify_running (void *cls)
+{
+  struct GNUNET_ARM_Operation *op = cls;
+  struct GNUNET_ARM_Handle *h = op->h;
+
+  op->async = NULL;
+  GNUNET_CONTAINER_DLL_remove (h->operation_pending_head,
+                               h->operation_pending_tail,
+                               op);
+  if (NULL != op->result_cont)
+    op->result_cont (op->cont_cls,
+                     GNUNET_ARM_REQUEST_SENT_OK,
+                     GNUNET_ARM_RESULT_IS_STARTED_ALREADY);
+  if ( (GNUNET_YES == h->currently_up) &&
+       (NULL != h->conn_status) )
+    h->conn_status (h->conn_status_cls,
+                    GNUNET_YES);
+  GNUNET_free (op);
+}
+
+
+/**
+ * Task run to notify application that ARM is being started.
+ *
+ * @param cls the operation that asked ARM to be started
+ */
+static void
+notify_starting (void *cls)
+{
+  struct GNUNET_ARM_Operation *op = cls;
+  struct GNUNET_ARM_Handle *h = op->h;
+
+  op->async = NULL;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "Inserting a control message into the queue. Timeout is %s\n",
-       GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (cm->timeout),
-                                              GNUNET_NO));
-  GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
-                                    h->control_pending_tail, cm);
-  cm->timeout_task_id =
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
-                                    (cm->timeout), &control_message_timeout, cm);
-  trigger_next_request (h, GNUNET_NO);
+       "Notifying client that we started the ARM service\n");
+  GNUNET_CONTAINER_DLL_remove (h->operation_pending_head,
+                               h->operation_pending_tail,
+                               op);
+  if (NULL != op->result_cont)
+    op->result_cont (op->cont_cls,
+                     GNUNET_ARM_REQUEST_SENT_OK,
+                     op->starting_ret);
+  GNUNET_free (op);
 }
 
 
@@ -938,114 +840,116 @@ change_service (struct GNUNET_ARM_Handle *h, const char *service_name,
  * @param h handle to ARM
  * @param service_name name of the service
  * @param std_inheritance inheritance of std streams
- * @param timeout how long to wait before failing for good
  * @param cont callback to invoke after request is sent or not sent
- * @param cont_cls closure for callback
+ * @param cont_cls closure for @a cont
+ * @return handle for the operation, NULL on error
  */
-void
+struct GNUNET_ARM_Operation *
 GNUNET_ARM_request_service_start (struct GNUNET_ARM_Handle *h,
                                  const char *service_name,
                                  enum GNUNET_OS_InheritStdioFlags std_inheritance,
-                                 struct GNUNET_TIME_Relative timeout,
                                  GNUNET_ARM_ResultCallback cont,
                                  void *cont_cls)
 {
-  struct ARMControlMessage *cm;
-  size_t slen;
+  struct GNUNET_ARM_Operation *op;
+  enum GNUNET_ARM_Result ret;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asked to start service `%s' within %s\n", service_name,
-       GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_NO));
-  if (0 == strcasecmp ("arm", service_name))
+       "Starting service `%s'\n",
+       service_name);
+  if (0 != strcasecmp ("arm",
+                       service_name))
+    return change_service (h,
+                           service_name,
+                           cont,
+                           cont_cls,
+                           GNUNET_MESSAGE_TYPE_ARM_START);
+
+  /* Possible cases:
+   * 1) We're connected to ARM already. Invoke the callback immediately.
+   * 2) We're not connected to ARM.
+   *    Cancel any reconnection attempts temporarily, then perform
+   *    a service test.
+   */
+  if (GNUNET_YES == h->currently_up)
   {
-    /* Possible cases:
-     * 1) We're connected to ARM already. Invoke the callback immediately.
-     * 2) We're not connected to ARM.
-     *    Cancel any reconnection attempts temporarily, then perform
-     *    a service test.
-     */
-    if (GNUNET_NO == h->currently_down)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "ARM is already running\n");
-      if (NULL != cont)
-        cont (cont_cls, GNUNET_ARM_REQUEST_SENT_OK, "arm", GNUNET_ARM_RESULT_IS_STARTED_ALREADY);
-    }
-    else if (GNUNET_NO == h->service_test_is_active)
-    {
-      if (NULL != h->cth)
-      {
-        GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
-        h->cth = NULL;
-      }
-      if (NULL != h->client)
-      {
-        GNUNET_CLIENT_disconnect (h->client);
-        h->client = NULL;
-      }
-      if (GNUNET_SCHEDULER_NO_TASK != h->reconnect_task)
-      {
-        GNUNET_SCHEDULER_cancel (h->reconnect_task);
-        h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-      }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-          "Not connected to ARM, will do a service test\n");
-
-      slen = strlen ("arm") + 1;
-      cm = GNUNET_malloc (sizeof (struct ARMControlMessage) + slen);
-      cm->h = h;
-      cm->result_cont = cont;
-      cm->cont_cls = cont_cls;
-      cm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-      cm->std_inheritance = std_inheritance;
-      memcpy (&cm[1], service_name, slen);
-      h->service_test_is_active = GNUNET_YES;
-      GNUNET_CLIENT_service_test ("arm", h->cfg, timeout, &arm_service_report,
-                                 cm);
-    }
-    else
-    {
-      /* Service test is already running - tell user to chill out and try
-       * again later.
-       */
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Service test is already in progress, we're busy\n");
-      if (NULL != cont)
-        cont (cont_cls, GNUNET_ARM_REQUEST_BUSY, NULL, 0);
-    }
-    return;
-  }
-  change_service (h, service_name, timeout, cont, cont_cls,
-                 GNUNET_MESSAGE_TYPE_ARM_START);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "ARM is already running\n");
+    op = GNUNET_new (struct GNUNET_ARM_Operation);
+    op->h = h;
+    op->result_cont = cont;
+    op->cont_cls = cont_cls;
+    GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head,
+                                      h->operation_pending_tail,
+                                      op);
+    op->async = GNUNET_SCHEDULER_add_now (&notify_running,
+                                          op);
+    return op;
+  }
+  /* This is an inherently uncertain choice, as it is of course
+     theoretically possible that ARM is up and we just did not
+     yet complete the MQ handshake.  However, given that users
+     are unlikely to hammer 'gnunet-arm -s' on a busy system,
+     the above check should catch 99.99% of the cases where ARM
+     is already running. */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Starting ARM service\n");
+  ret = start_arm_service (h,
+                           std_inheritance);
+  if (GNUNET_ARM_RESULT_STARTING == ret)
+    reconnect_arm (h);
+  op = GNUNET_new (struct GNUNET_ARM_Operation);
+  op->h = h;
+  op->result_cont = cont;
+  op->cont_cls = cont_cls;
+  GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head,
+                                    h->operation_pending_tail,
+                                    op);
+  op->starting_ret = ret;
+  op->async = GNUNET_SCHEDULER_add_now (&notify_starting,
+                                        op);
+  return op;
 }
 
 
 /**
- * Request a service to be stopped.
- * Stopping arm itself will not invalidate its handle, and
- * ARM API will try to restore connection to the ARM service,
- * even if ARM connection was lost because you asked for ARM to be stopped.
- * Call GNUNET_ARM_disconnect_and_free () to free the handle and prevent
+ * Request a service to be stopped.  Stopping arm itself will not
+ * invalidate its handle, and ARM API will try to restore connection
+ * to the ARM service, even if ARM connection was lost because you
+ * asked for ARM to be stopped.  Call
+ * #GNUNET_ARM_disconnect() to free the handle and prevent
  * further connection attempts.
  *
  * @param h handle to ARM
  * @param service_name name of the service
- * @param timeout how long to wait before failing for good
  * @param cont callback to invoke after request is sent or is not sent
- * @param cont_cls closure for callback
+ * @param cont_cls closure for @a cont
+ * @return handle for the operation, NULL on error
  */
-void
+struct GNUNET_ARM_Operation *
 GNUNET_ARM_request_service_stop (struct GNUNET_ARM_Handle *h,
                                 const char *service_name,
-                                struct GNUNET_TIME_Relative timeout,
                                 GNUNET_ARM_ResultCallback cont,
                                 void *cont_cls)
 {
+  struct GNUNET_ARM_Operation *op;
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Stopping service `%s' within %s\n",
-       service_name,
-       GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_NO));
-  change_service (h, service_name, timeout, cont, cont_cls,
-                 GNUNET_MESSAGE_TYPE_ARM_STOP);
+       "Stopping service `%s'\n",
+       service_name);
+  op = change_service (h,
+                       service_name,
+                       cont,
+                       cont_cls,
+                       GNUNET_MESSAGE_TYPE_ARM_STOP);
+  if (NULL == op)
+    return NULL;
+  /* If the service is ARM, set a flag as we will use MQ errors
+     to detect that the process is really gone. */
+  if (0 == strcasecmp (service_name,
+                       "arm"))
+    op->is_arm_stop = GNUNET_YES;
+  return op;
 }
 
 
@@ -1053,38 +957,38 @@ GNUNET_ARM_request_service_stop (struct GNUNET_ARM_Handle *h,
  * Request a list of running services.
  *
  * @param h handle to ARM
- * @param timeout how long to wait before failing for good
  * @param cont callback to invoke after request is sent or is not sent
- * @param cont_cls closure for callback
+ * @param cont_cls closure for @a cont
+ * @return handle for the operation, NULL on error
  */
-void
+struct GNUNET_ARM_Operation *
 GNUNET_ARM_request_service_list (struct GNUNET_ARM_Handle *h,
-                                 struct GNUNET_TIME_Relative timeout,
                                  GNUNET_ARM_ServiceListCallback cont,
                                  void *cont_cls)
 {
-  struct ARMControlMessage *cm;
+  struct GNUNET_ARM_Operation *op;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_ARM_Message *msg;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Requesting LIST from ARM service with timeout: %s\n",
-       GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
-  cm = GNUNET_malloc (sizeof (struct ARMControlMessage));
-  cm->h = h;
-  cm->list_cont = cont;
-  cm->cont_cls = cont_cls;
-  cm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  msg = GNUNET_malloc (sizeof (struct GNUNET_ARM_Message));
-  msg->header.size = htons (sizeof (struct GNUNET_ARM_Message));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_ARM_LIST);
+       "Requesting LIST from ARM service\n");
+  if (0 == h->request_id_counter)
+    h->request_id_counter++;
+  op = GNUNET_new (struct GNUNET_ARM_Operation);
+  op->h = h;
+  op->list_cont = cont;
+  op->cont_cls = cont_cls;
+  op->id = h->request_id_counter++;
+  GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head,
+                                    h->operation_pending_tail,
+                                    op);
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_ARM_LIST);
   msg->reserved = htonl (0);
-  cm->msg = msg;
-  GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
-                                    h->control_pending_tail, cm);
-  cm->timeout_task_id =
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
-                                    (cm->timeout), &control_message_timeout, cm);
-  trigger_next_request (h, GNUNET_NO);
+  msg->request_id = GNUNET_htonll (op->id);
+  GNUNET_MQ_send (h->mq,
+                  env);
+  return op;
 }