move to MQ api
authorMartin Schanzenbach <mschanzenbach@posteo.de>
Fri, 15 Jul 2016 09:28:14 +0000 (09:28 +0000)
committerMartin Schanzenbach <mschanzenbach@posteo.de>
Fri, 15 Jul 2016 09:28:14 +0000 (09:28 +0000)
src/identity-provider/gnunet-service-identity-provider.c
src/identity-provider/identity_provider.h
src/identity-provider/identity_provider_api.c

index 80c15f85f89a40386850d8c10581e38d74b63010..358017d5b1f01add43e4a4852915fb12be27fedb 100644 (file)
@@ -191,6 +191,11 @@ struct ExchangeHandle
    * Label to return
    */
   char *label;
+
+  /**
+   * request id
+   */
+  uint32_t r_id;
 };
 
 struct IssueHandle
@@ -260,6 +265,11 @@ struct IssueHandle
    * The label the token is stored under
    */
   char *label;
+
+  /**
+   * request id
+   */
+  uint32_t r_id;
 };
 
 /**
@@ -1016,6 +1026,7 @@ store_token_issue_cont (void *cls,
   irm = create_issue_result_message (handle->label,
                                      ticket_str,
                                      token_str);
+  irm->id = handle->r_id;
   GNUNET_SERVER_notification_context_unicast (nc,
                                               handle->client,
                                               &irm->header,
@@ -1250,6 +1261,7 @@ process_lookup_result (void *cls, uint32_t rd_count,
   erm = create_exchange_result_message (token_str,
                                         handle->label,
                                         handle->ticket->payload->nonce);
+  erm->id = handle->r_id;
   GNUNET_SERVER_notification_context_unicast (nc,
                                               handle->client,
                                               &erm->header,
@@ -1298,7 +1310,7 @@ handle_exchange_message (void *cls,
               ticket);
   xchange_handle = GNUNET_malloc (sizeof (struct ExchangeHandle));
   xchange_handle->aud_privkey = em->aud_privkey;
-
+  xchange_handle->r_id = em->id;
   if (GNUNET_SYSERR == ticket_parse (ticket,
                                      &xchange_handle->aud_privkey,
                                      &xchange_handle->ticket))
@@ -1537,7 +1549,7 @@ handle_issue_message (void *cls,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
   GNUNET_free (scopes_tmp);
-
+  issue_handle->r_id = im->id;
   issue_handle->aud_key = im->aud_key;
   issue_handle->iss_key = im->iss_key;
   GNUNET_CRYPTO_ecdsa_key_get_public (&im->iss_key,
index 682a207607c40d83077a8a43c7b468c282b7a50b..da7470bf97702ee079b15b323db7a987983841b5 100644 (file)
@@ -65,6 +65,11 @@ struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage
    */
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Unique identifier for this request (for key collisions).
+   */
+  uint32_t id GNUNET_PACKED;
+
   /* followed by 0-terminated label,ticket,token */
 
 };
@@ -80,6 +85,11 @@ struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage
    */
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Unique identifier for this request (for key collisions).
+   */
+  uint32_t id GNUNET_PACKED;
+
   /**
    * Nonce found in ticket. NBO
    * 0 on error.
@@ -102,6 +112,12 @@ struct GNUNET_IDENTITY_PROVIDER_IssueMessage
    */
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Unique identifier for this request (for key collisions).
+   */
+  uint32_t id GNUNET_PACKED;
+
+
   /**
    * Issuer identity private key
    */
@@ -137,7 +153,12 @@ struct GNUNET_IDENTITY_PROVIDER_ExchangeMessage
    * Type: #GNUNET_MESSAGE_TYPE_IDENTITY_SET_DEFAULT
    */
   struct GNUNET_MessageHeader header;
-  
+
+  /**
+   * Unique identifier for this request (for key collisions).
+   */
+  uint32_t id GNUNET_PACKED;
+
   /**
    * Audience identity private key
    */
index d0413c7488f5912054f1db96ea5e2b58ae80b8cd..6e1b867de685d7a0c4da696f77d0777c06723a54 100644 (file)
@@ -27,6 +27,7 @@
 #include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
 #include "gnunet_protocols.h"
+#include "gnunet_mq_lib.h"
 #include "gnunet_identity_provider_service.h"
 #include "identity_provider.h"
 
@@ -73,6 +74,16 @@ struct GNUNET_IDENTITY_PROVIDER_Operation
    */
   GNUNET_IDENTITY_PROVIDER_IssueCallback iss_cb;
 
+  /**
+   * Envelope with the message for this queue entry.
+   */
+  struct GNUNET_MQ_Envelope *env;
+
+  /**
+   * request id
+   */
+  uint32_t r_id;
+
   /**
    * Closure for @e cont or @e cb.
    */
@@ -124,7 +135,17 @@ struct GNUNET_IDENTITY_PROVIDER_Handle
   /**
    * Time for next connect retry.
    */
-  struct GNUNET_TIME_Relative reconnect_delay;
+  struct GNUNET_TIME_Relative reconnect_backoff;
+
+  /**
+   * Connection to service (if available).
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Request Id generator.  Incremented by one for each request.
+   */
+  uint32_t r_id_gen;
 
   /**
    * Are we polling for incoming messages right now?
@@ -140,256 +161,199 @@ struct GNUNET_IDENTITY_PROVIDER_Handle
  * @param cls handle to the service.
  */
 static void
-reconnect (void *cls);
-
+reconnect (struct GNUNET_IDENTITY_PROVIDER_Handle *handle);
 
 /**
- * Reschedule a connect attempt to the service.
+ * Reconnect
  *
- * @param h transport service to reconnect
+ * @param cls the handle
  */
 static void
-reschedule_connect (struct GNUNET_IDENTITY_PROVIDER_Handle *h)
+reconnect_task (void *cls)
 {
-  GNUNET_assert (h->reconnect_task == NULL);
+  struct GNUNET_IDENTITY_PROVIDER_Handle *handle = cls;
 
-  if (NULL != h->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
-  }
-  if (NULL != h->client)
-  {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
-  }
-  h->in_receive = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Scheduling task to reconnect to identity provider service in %s.\n",
-       GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
-  h->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
-  h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
+  handle->reconnect_task = NULL;
+  reconnect (handle);
 }
 
 
 /**
- * Type of a function to call when we receive a message
- * from the service.
+ * Disconnect from service and then reconnect.
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param handle our handle
  */
 static void
-message_handler (void *cls,
-                const struct GNUNET_MessageHeader *msg)
+force_reconnect (struct GNUNET_IDENTITY_PROVIDER_Handle *handle)
 {
-  struct GNUNET_IDENTITY_PROVIDER_Handle *h = cls;
-  struct GNUNET_IDENTITY_PROVIDER_Operation *op;
-  struct GNUNET_IDENTITY_PROVIDER_Token token;
-  struct GNUNET_IDENTITY_PROVIDER_Ticket ticket;
-  const struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage *irm;
-  const struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage *erm;
-  char *str;
-  char *ticket_str;
-  char *token_str;
-  char *label_str;
-  uint16_t size;
-  uint64_t ticket_nonce;
-
-  if (NULL == msg)
-  {
-    reschedule_connect (h);
-    return;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message of type %d from identity provider service\n",
-       ntohs (msg->type));
-  size = ntohs (msg->size);
-  switch (ntohs (msg->type))
-  {
-  case GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_ISSUE_RESULT:
-    if (size < sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage))
-    {
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    irm = (const struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage *) msg;
-    str = GNUNET_strdup ((char *) &irm[1]);
-    if ( (size > sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage)) &&
-        ('\0' != str[size - sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage) - 1]) )
-    {
-      GNUNET_free (str);
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    if (size == sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage))
-    {
-      GNUNET_free (str);
-      str = NULL;
-    }
-    label_str = strtok (str, ",");
-
-    if (NULL == label_str)
-    {
-      GNUNET_free (str);
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    ticket_str = strtok (NULL, ",");
-    if (NULL == ticket_str)
-    {
-      GNUNET_free (str);
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    token_str = strtok (NULL, ",");
-    if (NULL == token_str)
-    {
-      GNUNET_free (str);
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    op = h->op_head;
-    GNUNET_CONTAINER_DLL_remove (h->op_head,
-                                h->op_tail,
-                                op);
-    GNUNET_CLIENT_receive (h->client, &message_handler, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    ticket.data = ticket_str;
-    token.data = token_str;
-    if (NULL != op->iss_cb)
-      op->iss_cb (op->cls, label_str, &ticket, &token);
-    GNUNET_free (str);
-    GNUNET_free (op);
-    break;
-   case GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_EXCHANGE_RESULT:
-    if (size < sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage))
-    {
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    erm = (const struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage *) msg;
-    str = (char *) &erm[1];
-    if ( (size > sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage)) &&
-        ('\0' != str[size - sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage) - 1]) )
-    {
-      GNUNET_break (0);
-      reschedule_connect (h);
-      return;
-    }
-    if (size == sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage))
-      str = NULL;
-
-    op = h->op_head;
-    GNUNET_CONTAINER_DLL_remove (h->op_head,
-                                h->op_tail,
-                                op);
-    GNUNET_CLIENT_receive (h->client, &message_handler, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
-    token.data = str;
-    ticket_nonce = ntohl (erm->ticket_nonce);
-    if (NULL != op->ex_cb)
-      op->ex_cb (op->cls, &token, ticket_nonce);
-    GNUNET_free (op);
-    break;
-
-  default:
-    GNUNET_break (0);
-    reschedule_connect (h);
-    return;
-  }
+  GNUNET_MQ_destroy (handle->mq);
+  handle->mq = NULL;
+  handle->reconnect_backoff
+    = GNUNET_TIME_STD_BACKOFF (handle->reconnect_backoff);
+  handle->reconnect_task
+    = GNUNET_SCHEDULER_add_delayed (handle->reconnect_backoff,
+                                    &reconnect_task,
+                                    handle);
 }
 
-
 /**
- * Schedule transmission of the next message from our queue.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
  *
- * @param h identity handle
+ * @param cls closure with the `struct GNUNET_GNS_Handle *`
+ * @param error error code
  */
 static void
-transmit_next (struct GNUNET_IDENTITY_PROVIDER_Handle *h);
-
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_IDENTITY_PROVIDER_Handle *handle = cls;
+  force_reconnect (handle);
+}
 
 /**
- * Transmit next message to service.
+ * Check validity of message received from the service
  *
- * @param cls the `struct GNUNET_IDENTITY_PROVIDER_Handle`.
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to buf
+ * @param cls the `struct GNUNET_IDENTITY_PROVIDER_Handle *`
+ * @param result_msg the incoming message
  */
-static size_t
-send_next_message (void *cls,
-                  size_t size,
-                  void *buf)
+static int
+check_exchange_result (void *cls,
+              const struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage *erm)
 {
-  struct GNUNET_IDENTITY_PROVIDER_Handle *h = cls;
-  struct GNUNET_IDENTITY_PROVIDER_Operation *op = h->op_head;
-  size_t ret;
+  char *str;
+  size_t size = ntohs (erm->header.size) - sizeof (*erm);
+  
 
-  h->th = NULL;
-  if (NULL == op)
-    return 0;
-  ret = ntohs (op->msg->size);
-  if (ret > size)
-  {
-    reschedule_connect (h);
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sending message of type %d to identity provider service\n",
-       ntohs (op->msg->type));
-  GNUNET_memcpy (buf, op->msg, ret);
-  if ( (NULL == op->iss_cb) &&
-       (NULL == op->ex_cb) )
+  str = (char *) &erm[1];
+  if ( (size > sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage)) &&
+       ('\0' != str[size - sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage) - 1]) )
   {
-    GNUNET_CONTAINER_DLL_remove (h->op_head,
-                                h->op_tail,
-                                op);
-    GNUNET_free (op);
-    transmit_next (h);
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-  if (GNUNET_NO == h->in_receive)
+  return GNUNET_OK;
+}
+
+
+/**
+ * Check validity of message received from the service
+ *
+ * @param cls the `struct GNUNET_IDENTITY_PROVIDER_Handle *`
+ * @param result_msg the incoming message
+ */
+static int
+check_result (void *cls,
+              const struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage *irm)
+{
+  char *str;
+  size_t size = ntohs (irm->header.size) - sizeof (*irm);
+  str = (char*) &irm[1];
+  if ( (size > sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage)) &&
+       ('\0' != str[size - sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage) - 1]) )
   {
-    h->in_receive = GNUNET_YES;
-    GNUNET_CLIENT_receive (h->client,
-                          &message_handler, h,
-                          GNUNET_TIME_UNIT_FOREVER_REL);
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-  return ret;
+  return GNUNET_OK;
 }
 
+/**
+ * Handler for messages received from the GNS service
+ *
+ * @param cls the `struct GNUNET_GNS_Handle *`
+ * @param loookup_msg the incoming message
+ */
+static void
+handle_exchange_result (void *cls,
+                        const struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage *erm)
+{
+  struct GNUNET_IDENTITY_PROVIDER_Handle *handle = cls;
+  struct GNUNET_IDENTITY_PROVIDER_Operation *op;
+  struct GNUNET_IDENTITY_PROVIDER_Token token;
+  uint64_t ticket_nonce;
+  uint32_t r_id = ntohl (erm->id);
+  char *str;
+  
+  for (op = handle->op_head; NULL != op; op = op->next)
+    if (op->r_id == r_id)
+      break;
+  if (NULL == op)
+    return;
+  str = GNUNET_strdup ((char*)&erm[1]);
+  op = handle->op_head;
+  GNUNET_CONTAINER_DLL_remove (handle->op_head,
+                               handle->op_tail,
+                               op);
+  token.data = str;
+  ticket_nonce = ntohl (erm->ticket_nonce);
+  if (NULL != op->ex_cb)
+    op->ex_cb (op->cls, &token, ticket_nonce);
+  GNUNET_free (str);
+  GNUNET_free (op);
+
+}
 
 /**
- * Schedule transmission of the next message from our queue.
+ * Handler for messages received from the GNS service
  *
- * @param h identity provider handle
+ * @param cls the `struct GNUNET_GNS_Handle *`
+ * @param loookup_msg the incoming message
  */
 static void
-transmit_next (struct GNUNET_IDENTITY_PROVIDER_Handle *h)
+handle_result (void *cls,
+               const struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage *irm)
 {
-  struct GNUNET_IDENTITY_PROVIDER_Operation *op = h->op_head;
+  struct GNUNET_IDENTITY_PROVIDER_Handle *handle = cls;
+  struct GNUNET_IDENTITY_PROVIDER_Operation *op;
+  struct GNUNET_IDENTITY_PROVIDER_Token token;
+  struct GNUNET_IDENTITY_PROVIDER_Ticket ticket;
+  uint32_t r_id = ntohl (irm->id);
+  char *str;
+  char *label_str;
+  char *ticket_str;
+  char *token_str;
 
-  GNUNET_assert (NULL == h->th);
+  for (op = handle->op_head; NULL != op; op = op->next)
+    if (op->r_id == r_id)
+      break;
   if (NULL == op)
     return;
-  if (NULL == h->client)
+  str = GNUNET_strdup ((char*)&irm[1]);
+  label_str = strtok (str, ",");
+
+  if (NULL == label_str)
+  {
+    GNUNET_free (str);
+    GNUNET_break (0);
     return;
-  h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                              ntohs (op->msg->size),
-                                              GNUNET_TIME_UNIT_FOREVER_REL,
-                                              GNUNET_NO,
-                                              &send_next_message,
-                                              h);
-}
+  }
+  ticket_str = strtok (NULL, ",");
+  if (NULL == ticket_str)
+  {
+    GNUNET_free (str);
+    GNUNET_break (0);
+    return;
+  }
+  token_str = strtok (NULL, ",");
+  if (NULL == token_str)
+  {
+    GNUNET_free (str);
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (handle->op_head,
+                               handle->op_tail,
+                               op);
+  ticket.data = ticket_str;
+  token.data = token_str;
+  if (NULL != op->iss_cb)
+    op->iss_cb (op->cls, label_str, &ticket, &token);
+  GNUNET_free (str);
+  GNUNET_free (op);
 
+}
 
 /**
  * Try again to connect to the service.
@@ -397,18 +361,35 @@ transmit_next (struct GNUNET_IDENTITY_PROVIDER_Handle *h)
  * @param cls handle to the identity provider service.
  */
 static void
-reconnect (void *cls)
+reconnect (struct GNUNET_IDENTITY_PROVIDER_Handle *h)
 {
-  struct GNUNET_IDENTITY_PROVIDER_Handle *h = cls;
+  GNUNET_MQ_hd_var_size (result,
+                         GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_ISSUE_RESULT,
+                         struct GNUNET_IDENTITY_PROVIDER_IssueResultMessage);
+  GNUNET_MQ_hd_var_size (exchange_result,
+                         GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_EXCHANGE_RESULT,
+                         struct GNUNET_IDENTITY_PROVIDER_ExchangeResultMessage);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_result_handler (h),
+    make_exchange_result_handler (h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_IDENTITY_PROVIDER_Operation *op;
 
-  h->reconnect_task = NULL;
+  GNUNET_assert (NULL == h->mq);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Connecting to identity provider service.\n");
-  GNUNET_assert (NULL == h->client);
-  h->client = GNUNET_CLIENT_connect ("identity-provider", h->cfg);
-  GNUNET_assert (NULL != h->client);
-  transmit_next (h);
-  GNUNET_assert (NULL != h->th);
+
+  h->mq = GNUNET_CLIENT_connecT (h->cfg,
+                                 "identity-provider",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+    return;
+  for (op = h->op_head; NULL != op; op = op->next)
+    GNUNET_MQ_send_copy (h->mq,
+                         op->env);
 }
 
 
@@ -425,8 +406,12 @@ GNUNET_IDENTITY_PROVIDER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 
   h = GNUNET_new (struct GNUNET_IDENTITY_PROVIDER_Handle);
   h->cfg = cfg;
-  h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
+  reconnect (h);
+  if (NULL == h->mq)
+  {
+    GNUNET_free (h);
+    return NULL;
+  }
   return h;
 }
 
@@ -442,13 +427,13 @@ GNUNET_IDENTITY_PROVIDER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
  */
 struct GNUNET_IDENTITY_PROVIDER_Operation *
 GNUNET_IDENTITY_PROVIDER_issue_token (struct GNUNET_IDENTITY_PROVIDER_Handle *id,
-                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *iss_key,
-         const struct GNUNET_CRYPTO_EcdsaPublicKey *aud_key,
-         const char* scopes,
-         struct GNUNET_TIME_Absolute expiration,
-         uint64_t nonce,
-                    GNUNET_IDENTITY_PROVIDER_IssueCallback cb,
-                    void *cb_cls)
+                                      const struct GNUNET_CRYPTO_EcdsaPrivateKey *iss_key,
+                                      const struct GNUNET_CRYPTO_EcdsaPublicKey *aud_key,
+                                      const char* scopes,
+                                      struct GNUNET_TIME_Absolute expiration,
+                                      uint64_t nonce,
+                                      GNUNET_IDENTITY_PROVIDER_IssueCallback cb,
+                                      void *cb_cls)
 {
   struct GNUNET_IDENTITY_PROVIDER_Operation *op;
   struct GNUNET_IDENTITY_PROVIDER_IssueMessage *im;
@@ -460,27 +445,26 @@ GNUNET_IDENTITY_PROVIDER_issue_token (struct GNUNET_IDENTITY_PROVIDER_Handle *id
     GNUNET_break (0);
     return NULL;
   }
-  op = GNUNET_malloc (sizeof (struct GNUNET_IDENTITY_PROVIDER_Operation) +
-                     sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueMessage) +
-                     slen);
+  op = GNUNET_new (struct GNUNET_IDENTITY_PROVIDER_Operation);
   op->h = id;
   op->iss_cb = cb;
   op->cls = cb_cls;
-  im = (struct GNUNET_IDENTITY_PROVIDER_IssueMessage *) &op[1];
-  im->header.type = htons (GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_ISSUE);
-  im->header.size = htons (sizeof (struct GNUNET_IDENTITY_PROVIDER_IssueMessage) +
-                           slen);
+  op->r_id = id->r_id_gen++;
+  op->env = GNUNET_MQ_msg_extra (im,
+                                 slen,
+                                 GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_ISSUE);
+  im->id = op->r_id;
   im->iss_key = *iss_key;
   im->aud_key = *aud_key;
   im->nonce = htonl (nonce);
   im->expiration = GNUNET_TIME_absolute_hton (expiration);
   GNUNET_memcpy (&im[1], scopes, slen);
-  op->msg = &im->header;
   GNUNET_CONTAINER_DLL_insert_tail (id->op_head,
-                                   id->op_tail,
-                                   op);
-  if (NULL == id->th)
-    transmit_next (id);
+                                    id->op_tail,
+                                    op);
+  if (NULL != id->mq)
+    GNUNET_MQ_send_copy (id->mq,
+                         op->env);
   return op;
 }
 
@@ -515,25 +499,24 @@ GNUNET_IDENTITY_PROVIDER_exchange_ticket (struct GNUNET_IDENTITY_PROVIDER_Handle
     GNUNET_break (0);
     return NULL;
   }
-  op = GNUNET_malloc (sizeof (struct GNUNET_IDENTITY_PROVIDER_Operation) +
-                      sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeMessage) +
-                      slen);
+  op = GNUNET_new (struct GNUNET_IDENTITY_PROVIDER_Operation);
   op->h = id;
   op->ex_cb = cont;
   op->cls = cont_cls;
-  em = (struct GNUNET_IDENTITY_PROVIDER_ExchangeMessage *) &op[1];
-  em->header.type = htons (GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_EXCHANGE);
-  em->header.size = htons (sizeof (struct GNUNET_IDENTITY_PROVIDER_ExchangeMessage) +
-                           slen);
+  op->r_id = id->r_id_gen++;
+  op->env = GNUNET_MQ_msg_extra (em,
+                                 slen,
+                                 GNUNET_MESSAGE_TYPE_IDENTITY_PROVIDER_EXCHANGE);
   em->aud_privkey = *aud_privkey;
+  em->id = htonl (op->r_id);
   GNUNET_memcpy (&em[1], ticket_str, slen);
   GNUNET_free (ticket_str);
-  op->msg = &em->header;
   GNUNET_CONTAINER_DLL_insert_tail (id->op_head,
                                     id->op_tail,
                                     op);
-  if (NULL == id->th)
-    transmit_next (id);
+  if (NULL != id->mq)
+    GNUNET_MQ_send_copy (id->mq,
+                         op->env);
   return op;
 }
 
@@ -551,37 +534,11 @@ GNUNET_IDENTITY_PROVIDER_cancel (struct GNUNET_IDENTITY_PROVIDER_Operation *op)
 {
   struct GNUNET_IDENTITY_PROVIDER_Handle *h = op->h;
 
-  if ( (h->op_head != op) ||
-       (NULL == h->client) )
-  {
-    /* request not active, can simply remove */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Client aborted non-head operation, simply removing it\n");
-    GNUNET_CONTAINER_DLL_remove (h->op_head,
-                                 h->op_tail,
-                                 op);
-    GNUNET_free (op);
-    return;
-  }
-  if (NULL != h->th)
-  {
-    /* request active but not yet with service, can still abort */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Client aborted head operation prior to transmission, aborting it\n");
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
-    GNUNET_CONTAINER_DLL_remove (h->op_head,
-                                 h->op_tail,
-                                 op);
-    GNUNET_free (op);
-    transmit_next (h);
-    return;
-  }
-  /* request active with service, simply ensure continuations are not called */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client aborted active request, NULLing continuation\n");
-  op->ex_cb = NULL;
-  op->iss_cb = NULL;
+  GNUNET_CONTAINER_DLL_remove (h->op_head,
+                               h->op_tail,
+                               op);
+  GNUNET_MQ_discard (op->env);
+  GNUNET_free (op);
 }
 
 
@@ -593,31 +550,18 @@ GNUNET_IDENTITY_PROVIDER_cancel (struct GNUNET_IDENTITY_PROVIDER_Operation *op)
 void
 GNUNET_IDENTITY_PROVIDER_disconnect (struct GNUNET_IDENTITY_PROVIDER_Handle *h)
 {
-  struct GNUNET_IDENTITY_PROVIDER_Operation *op;
-
   GNUNET_assert (NULL != h);
-  if (h->reconnect_task != NULL)
-  {
-    GNUNET_SCHEDULER_cancel (h->reconnect_task);
-    h->reconnect_task = NULL;
-  }
-  if (NULL != h->th)
+  if (NULL != h->mq)
   {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
-  while (NULL != (op = h->op_head))
+  if (NULL != h->reconnect_task)
   {
-    GNUNET_CONTAINER_DLL_remove (h->op_head,
-                                 h->op_tail,
-                                 op);
-    GNUNET_free (op);
-  }
-  if (NULL != h->client)
-  {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
+    GNUNET_SCHEDULER_cancel (h->reconnect_task);
+    h->reconnect_task = NULL;
   }
+  GNUNET_assert (NULL == h->op_head);
   GNUNET_free (h);
 }