tng: more UDP communicator backchannels
authorMartin Schanzenbach <mschanzenbach@posteo.de>
Mon, 1 Jun 2020 14:39:35 +0000 (16:39 +0200)
committerMartin Schanzenbach <mschanzenbach@posteo.de>
Mon, 1 Jun 2020 14:39:35 +0000 (16:39 +0200)
Added a new message for queue updates to indicate queue length.
Queues now may also have a priority parameter.

src/include/gnunet_protocols.h
src/include/gnunet_transport_communication_service.h
src/transport/gnunet-communicator-tcp.c
src/transport/gnunet-communicator-udp.c
src/transport/gnunet-communicator-unix.c
src/transport/test_communicator_basic.c
src/transport/transport-testing2.c
src/transport/transport-testing2.h
src/transport/transport.h
src/transport/transport_api2_communication.c

index 282bb53d1bbfc1cf9c6d8b658ea690bba27c0bc2..a9cd7466a99166ccfba55d33d92f9a9d5ffd4309 100644 (file)
@@ -3161,6 +3161,10 @@ extern "C" {
  */
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221
 
+/**
+ * @brief inform transport that a queue was updated
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE 1222
 
 /**
  * Message sent to indicate to the transport that a monitor
index 3ead03536d0a0ee1af1d269fd6da91a34a363c78..ea6b95e2ddaedf4e33b9750ddcb39f4abe74df3d 100644 (file)
@@ -50,6 +50,10 @@ extern "C" {
  */
 #define GNUNET_TRANSPORT_COMMUNICATION_VERSION 0x00000000
 
+/**
+ * Queue length
+ */
+#define GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED UINT64_MAX
 
 /**
  * Function called by the transport service to initialize a
@@ -252,6 +256,9 @@ enum GNUNET_TRANSPORT_ConnectionStatus
  * @param address address in human-readable format, 0-terminated, UTF-8
  * @param mtu maximum message size supported by queue, 0 if
  *            sending is not supported, SIZE_MAX for no MTU
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ *                 used
  * @param nt which network type does the @a address belong to?
  * @param cs what is the connection status of the queue?
  * @param mq message queue of the @a peer
@@ -263,10 +270,27 @@ GNUNET_TRANSPORT_communicator_mq_add (
   const struct GNUNET_PeerIdentity *peer,
   const char *address,
   uint32_t mtu,
+  uint64_t q_len,
+  uint32_t priority,
   enum GNUNET_NetworkType nt,
   enum GNUNET_TRANSPORT_ConnectionStatus cs,
   struct GNUNET_MQ_Handle *mq);
 
+/**
+ * Notify transport service that an MQ was updated
+ *
+ * @param ch connection to transport service
+ * @param qh the queue to update
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ *                 used
+ */
+void
+GNUNET_TRANSPORT_communicator_mq_update (
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+  const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
+  uint64_t q_len,
+  uint32_t priority);
 
 /**
  * Notify transport service that an MQ became unavailable due to a
index bbfacbffda9af893f42d0f5fe9e74e10afe3c79a..7f70c55df731098e880f6db2165d61cfd42f2bdb 100644 (file)
@@ -1547,6 +1547,8 @@ boot_queue (struct Queue *queue, enum GNUNET_TRANSPORT_ConnectionStatus cs)
                                                       &queue->target,
                                                       foreign_addr,
                                                       0 /* no MTU */,
+                                                      GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+                                                      0, /* Priority */
                                                       queue->nt,
                                                       cs,
                                                       queue->mq);
index 344ba51809f8da8767bb6cdaa277cdd0154b7d64..46d9766d02727c59f8fcf8a5fa2609b79513383e 100644 (file)
@@ -549,14 +549,24 @@ struct ReceiverAddress
   struct GNUNET_CONTAINER_HeapNode *hn;
 
   /**
-   * Message queue we are providing for the #ch.
+   * KX message queue we are providing for the #ch.
    */
-  struct GNUNET_MQ_Handle *mq;
+  struct GNUNET_MQ_Handle *kx_mq;
+
+  /**
+   * Default message queue we are providing for the #ch.
+   */
+  struct GNUNET_MQ_Handle *d_mq;
+
+  /**
+   * handle for KX queue with the #ch.
+   */
+  struct GNUNET_TRANSPORT_QueueHandle *kx_qh;
 
   /**
-   * handle for this queue with the #ch.
+   * handle for default queue with the #ch.
    */
-  struct GNUNET_TRANSPORT_QueueHandle *qh;
+  struct GNUNET_TRANSPORT_QueueHandle *d_qh;
 
   /**
    * Timeout for this receiver address.
@@ -564,9 +574,14 @@ struct ReceiverAddress
   struct GNUNET_TIME_Absolute timeout;
 
   /**
-   * MTU we allowed transport for this receiver right now.
+   * MTU we allowed transport for this receiver's KX queue.
    */
-  size_t mtu;
+  size_t kx_mtu;
+
+  /**
+   * MTU we allowed transport for this receiver's default queue.
+   */
+  size_t d_mtu;
 
   /**
    * Length of the DLL at @a ss_head.
@@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Disconnecting receiver for peer `%s'\n",
               GNUNET_i2s (&receiver->target));
-  if (NULL != (mq = receiver->mq))
+  if (NULL != (mq = receiver->kx_mq))
   {
-    receiver->mq = NULL;
+    receiver->kx_mq = NULL;
     GNUNET_MQ_destroy (mq);
   }
-  if (NULL != receiver->qh)
+  if (NULL != receiver->kx_qh)
   {
-    GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
-    receiver->qh = NULL;
+    GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+    receiver->kx_qh = NULL;
+  }
+  if (NULL != (mq = receiver->d_mq))
+  {
+    receiver->d_mq = NULL;
+    GNUNET_MQ_destroy (mq);
+  }
+  if (NULL != receiver->d_qh)
+  {
+    GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+    receiver->d_qh = NULL;
   }
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (receivers,
@@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value)
   (void) pid;
   for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Checking shared secrets\n");
     if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode)))
     {
       uint32_t allowed;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Found matching mac\n");
+                  "Found matching mac\n");
 
       allowed = ntohl (ack->sequence_max);
 
       if (allowed > ss->sequence_allowed)
       {
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%u > %u (%u)\n", allowed, ss->sequence_allowed,
-                receiver->acks_available);
+                    "%u > %u (%u)\n", allowed, ss->sequence_allowed,
+                    receiver->acks_available);
 
         receiver->acks_available += (allowed - ss->sequence_allowed);
-        if ((allowed - ss->sequence_allowed) == receiver->acks_available)
-        {
-          /* we just incremented from zero => MTU change! */
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "we just incremented from zero => MTU change!\n");
-          //TODO setup_receiver_mq (receiver);
-        }
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Tell transport we have more acks!\n");
+        GNUNET_TRANSPORT_communicator_mq_update (ch,
+                                                 receiver->d_qh,
+                                                 (allowed - ss->sequence_allowed),
+                                                 1);
         ss->sequence_allowed = allowed;
         /* move ss to head to avoid discarding it anytime soon! */
         GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss);
@@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size)
  * @param impl_state our `struct ReceiverAddress`
  */
 static void
-mq_send (struct GNUNET_MQ_Handle *mq,
-         const struct GNUNET_MessageHeader *msg,
-         void *impl_state)
+mq_send_kx (struct GNUNET_MQ_Handle *mq,
+            const struct GNUNET_MessageHeader *msg,
+            void *impl_state)
 {
   struct ReceiverAddress *receiver = impl_state;
   uint16_t msize = ntohs (msg->size);
+  struct UdpHandshakeSignature uhs;
+  struct UDPConfirmation uc;
+  struct InitialKX kx;
+  struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
+  char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)];
+  size_t dpos;
+  gcry_cipher_hd_t out_cipher;
+  struct SharedSecret *ss;
+
 
-  GNUNET_assert (mq == receiver->mq);
-  if (msize > receiver->mtu)
+  GNUNET_assert (mq == receiver->kx_mq);
+  if (msize > receiver->kx_mtu)
   {
     GNUNET_break (0);
     receiver_destroy (receiver);
@@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq,
   }
   reschedule_receiver_timeout (receiver);
 
-  if (0 == receiver->acks_available)
+  /* setup key material */
+  GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+
+  ss = setup_shared_secret_enc (&epriv, receiver);
+  setup_cipher (&ss->master, 0, &out_cipher);
+  /* compute 'uc' */
+  uc.sender = my_identity;
+  uc.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+  uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
+  uhs.purpose.size = htonl (sizeof(uhs));
+  uhs.sender = my_identity;
+  uhs.receiver = receiver->target;
+  GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
+  uhs.monotonic_time = uc.monotonic_time;
+  GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                            &uhs,
+                            &uc.sender_sig);
+  /* Leave space for kx */
+  dpos = sizeof(kx);
+  /* Append encrypted uc to dgram */
+  GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
+                                           &dgram[dpos],
+                                           sizeof(uc),
+                                           &uc,
+                                           sizeof(uc)));
+  dpos += sizeof(uc);
+  /* Append encrypted payload to dgram */
+  GNUNET_assert (
+    0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
+  dpos += msize;
+  do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
+  /* Datagram starts with kx */
+  kx.ephemeral = uhs.ephemeral;
+  GNUNET_assert (
+    0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+  gcry_cipher_close (out_cipher);
+  memcpy (dgram, &kx, sizeof(kx));
+  if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
+                                          dgram,
+                                          sizeof(dgram),
+                                          receiver->address,
+                                          receiver->address_len))
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending KX to %s\n", GNUNET_a2s (receiver->address,
+                                                receiver->address_len));
+  GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Signature of functions implementing the sending functionality of a
+ * message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_send_d (struct GNUNET_MQ_Handle *mq,
+           const struct GNUNET_MessageHeader *msg,
+           void *impl_state)
+{
+  struct ReceiverAddress *receiver = impl_state;
+  uint16_t msize = ntohs (msg->size);
+
+  GNUNET_assert (mq == receiver->d_mq);
+  if ((msize > receiver->d_mtu) ||
+      (0 == receiver->acks_available))
   {
-    /* use KX encryption method */
-    struct UdpHandshakeSignature uhs;
-    struct UDPConfirmation uc;
-    struct InitialKX kx;
-    struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
-    char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)];
-    size_t dpos;
-    gcry_cipher_hd_t out_cipher;
-    struct SharedSecret *ss;
+    GNUNET_break (0);
+    receiver_destroy (receiver);
+    return;
+  }
+  reschedule_receiver_timeout (receiver);
 
-    /* setup key material */
-    GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+  /* begin "BOX" encryption method, scan for ACKs from tail! */
+  for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
+  {
+    if (ss->sequence_used >= ss->sequence_allowed)
+    {
+      continue;
+    }
+    char dgram[sizeof(struct UDPBox) + receiver->d_mtu];
+    struct UDPBox *box;
+    gcry_cipher_hd_t out_cipher;
+    size_t dpos;
 
-    ss = setup_shared_secret_enc (&epriv, receiver);
-    setup_cipher (&ss->master, 0, &out_cipher);
-    /* compute 'uc' */
-    uc.sender = my_identity;
-    uc.monotonic_time =
-      GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-    uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
-    uhs.purpose.size = htonl (sizeof(uhs));
-    uhs.sender = my_identity;
-    uhs.receiver = receiver->target;
-    GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
-    uhs.monotonic_time = uc.monotonic_time;
-    GNUNET_CRYPTO_eddsa_sign (my_private_key,
-                              &uhs,
-                              &uc.sender_sig);
-    /* Leave space for kx */
-    dpos = sizeof(kx);
-    /* Append encrypted uc to dgram */
-    GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
-                                             &dgram[dpos],
-                                             sizeof(uc),
-                                             &uc,
-                                             sizeof(uc)));
-    dpos += sizeof(uc);
+    box = (struct UDPBox *) dgram;
+    ss->sequence_used++;
+    get_kid (&ss->master, ss->sequence_used, &box->kid);
+    setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
     /* Append encrypted payload to dgram */
+    dpos = sizeof(struct UDPBox);
     GNUNET_assert (
       0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
     dpos += msize;
     do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
-    /* Datagram starts with kx */
-    kx.ephemeral = uhs.ephemeral;
-    GNUNET_assert (
-      0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+    GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
+                                            box->gcm_tag,
+                                            sizeof(box->gcm_tag)));
     gcry_cipher_close (out_cipher);
-    memcpy (dgram, &kx, sizeof(kx));
     if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
                                             dgram,
                                             sizeof(dgram),
                                             receiver->address,
                                             receiver->address_len))
       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sending KX to %s\n", GNUNET_a2s (receiver->address,
-                                                  receiver->address_len));
     GNUNET_MQ_impl_send_continue (mq);
-    return;
-  }   /* End of KX encryption method */
-
-  /* begin "BOX" encryption method, scan for ACKs from tail! */
-  for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "In non-kx mode...\n");
-    if (ss->sequence_used < ss->sequence_allowed)
+    receiver->acks_available--;
+    if (0 == receiver->acks_available)
     {
-      char dgram[sizeof(struct UDPBox) + receiver->mtu];
-      struct UDPBox *box;
-      gcry_cipher_hd_t out_cipher;
-      size_t dpos;
-
-      box = (struct UDPBox *) dgram;
-      ss->sequence_used++;
-      get_kid (&ss->master, ss->sequence_used, &box->kid);
-      setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
-      /* Append encrypted payload to dgram */
-      dpos = sizeof(struct UDPBox);
-      GNUNET_assert (
-        0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
-      dpos += msize;
-      do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
-      GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
-                                              box->gcm_tag,
-                                              sizeof(box->gcm_tag)));
-      gcry_cipher_close (out_cipher);
-      if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
-                                              dgram,
-                                              sizeof(dgram),
-                                              receiver->address,
-                                              receiver->address_len))
-        GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+      /* We have no more ACKs */
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Sending data\n");
-
-      GNUNET_MQ_impl_send_continue (mq);
-      receiver->acks_available--;
-      if (0 == receiver->acks_available)
-      {
-        /* We have no more ACKs => MTU change! */
-        setup_receiver_mq (receiver);
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "No more acks, MTU changed\n");
-      }
-      return;
+                  "No more acks\n");
     }
   }
-  GNUNET_assert (0);
 }
 
 
@@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq,
  * @param impl_state our `struct ReceiverAddress`
  */
 static void
-mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
   struct ReceiverAddress *receiver = impl_state;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "MQ destroyed\n");
-  if (mq == receiver->mq)
+              "Default MQ destroyed\n");
+  if (mq == receiver->d_mq)
   {
-    receiver->mq = NULL;
-    //receiver_destroy (receiver);
+    receiver->d_mq = NULL;
+    receiver_destroy (receiver);
+  }
+}
+
+
+/**
+ * Signature of functions implementing the destruction of a message
+ * queue.  Implementations must not free @a mq, but should take care
+ * of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct ReceiverAddress *receiver = impl_state;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "KX MQ destroyed\n");
+  if (mq == receiver->kx_mq)
+  {
+    receiver->kx_mq = NULL;
+    receiver_destroy (receiver);
   }
 }
 
@@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
 {
   size_t base_mtu;
 
-  if (NULL != receiver->qh)
+  /*if (NULL != receiver->kx_qh)
   {
-    GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
-    receiver->qh = NULL;
+    GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+    receiver->kx_qh = NULL;
   }
-  //GNUNET_assert (NULL == receiver->mq);
+  if (NULL != receiver->d_qh)
+  {
+    GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+    receiver->d_qh = NULL;
+  }*/
+  // GNUNET_assert (NULL == receiver->mq);
   switch (receiver->address->sa_family)
   {
   case AF_INET:
@@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
     GNUNET_assert (0);
     break;
   }
-  if (0 == receiver->acks_available)
-  {
-    /* MTU based on full KX messages */
-    receiver->mtu = base_mtu - sizeof(struct InitialKX)   /* 48 */
-                    - sizeof(struct UDPConfirmation);   /* 104 */
-  }
-  else
-  {
-    /* MTU based on BOXed messages */
-    receiver->mtu = base_mtu - sizeof(struct UDPBox);
-  }
+  /* MTU based on full KX messages */
+  receiver->kx_mtu = base_mtu - sizeof(struct InitialKX)   /* 48 */
+                     - sizeof(struct UDPConfirmation); /* 104 */
+  /* MTU based on BOXed messages */
+  receiver->d_mtu = base_mtu - sizeof(struct UDPBox);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Setting up MQs and QHs\n");
   /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
      1404 (IPv4 + Box) bytes, depending on circumstances... */
-  if (NULL == receiver->mq)
-    receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
-                                                  &mq_destroy,
-                                                  &mq_cancel,
-                                                  receiver,
-                                                  NULL,
-                                                  &mq_error,
-                                                  receiver);
-  receiver->qh =
+  if (NULL == receiver->kx_mq)
+    receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx,
+                                                     &mq_destroy_kx,
+                                                     &mq_cancel,
+                                                     receiver,
+                                                     NULL,
+                                                     &mq_error,
+                                                     receiver);
+  if (NULL == receiver->d_mq)
+    receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
+                                                    &mq_destroy_d,
+                                                    &mq_cancel,
+                                                    receiver,
+                                                    NULL,
+                                                    &mq_error,
+                                                    receiver);
+
+  receiver->kx_qh =
     GNUNET_TRANSPORT_communicator_mq_add (ch,
                                           &receiver->target,
                                           receiver->foreign_addr,
-                                          receiver->mtu,
+                                          receiver->kx_mtu,
+                                          GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+                                          0, /* Priority */
                                           receiver->nt,
                                           GNUNET_TRANSPORT_CS_OUTBOUND,
-                                          receiver->mq);
+                                          receiver->kx_mq);
+  receiver->d_qh =
+    GNUNET_TRANSPORT_communicator_mq_add (ch,
+                                          &receiver->target,
+                                          receiver->foreign_addr,
+                                          receiver->d_mtu,
+                                          0, /* Initialize with 0 acks */
+                                          1, /* Priority */
+                                          receiver->nt,
+                                          GNUNET_TRANSPORT_CS_OUTBOUND,
+                                          receiver->d_mq);
+
 }
 
 
index 31d2e4ed3940e493b25c23828facc85d52f0ce2e..27dda72814eebd090881e51b64bd74335793143a 100644 (file)
@@ -670,6 +670,8 @@ setup_queue (const struct GNUNET_PeerIdentity *target,
                                                       &queue->target,
                                                       foreign_addr,
                                                       UNIX_MTU,
+                                                      GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+                                                      0,
                                                       GNUNET_NT_LOOPBACK,
                                                       cs,
                                                       queue->mq);
index 1dfcf2371a1b2a6e6a447284d2972702c10c1f26..1ea79fa19d65bfa86b9e3d6f8aed7a5a60dc547b 100644 (file)
@@ -58,19 +58,21 @@ static char *cfg_peers_name[NUM_PEERS];
 
 static int ret;
 
+static size_t long_message_size;
+
 static struct GNUNET_TIME_Absolute start_short;
 
 static struct GNUNET_TIME_Absolute start_long;
 
 static struct GNUNET_TIME_Absolute timeout;
 
-static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc;
+static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
 
 #define SHORT_MESSAGE_SIZE 128
 
-#define LONG_MESSAGE_SIZE 32000
+#define LONG_MESSAGE_SIZE 32000 /* FIXME */
 
-#define BURST_PACKETS 50
+#define BURST_PACKETS 500
 
 #define TOTAL_ITERATIONS 1
 
@@ -88,6 +90,7 @@ static unsigned int iterations_left = TOTAL_ITERATIONS;
 
 enum TestPhase
 {
+  TP_INIT,
   TP_BURST_SHORT,
   TP_BURST_LONG,
   TP_SIZE_CHECK
@@ -230,15 +233,18 @@ static void
 size_test (void *cls)
 {
   char *payload;
+  size_t max_size = 64000;
 
   GNUNET_assert (TP_SIZE_CHECK == phase);
-  if (ack >= 64000)
+  if (LONG_MESSAGE_SIZE != long_message_size)
+    max_size = long_message_size;
+  if (ack >= max_size)
     return; /* Leave some room for our protocol, so not 2^16 exactly */
   payload = make_payload (ack);
   ack += 5;
   num_sent++;
   GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
-                                                        (ack < 64000)
+                                                        (ack < max_size)
                                                         ? &size_test
                                                         : NULL,
                                                         NULL,
@@ -254,7 +260,7 @@ long_test (void *cls)
 {
   char *payload;
 
-  payload = make_payload (LONG_MESSAGE_SIZE);
+  payload = make_payload (long_message_size);
   num_sent++;
   GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
                                                         (BURST_PACKETS ==
@@ -263,7 +269,7 @@ long_test (void *cls)
                                                         : &long_test,
                                                         NULL,
                                                         payload,
-                                                        LONG_MESSAGE_SIZE);
+                                                        long_message_size);
   GNUNET_free (payload);
   timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
 }
@@ -288,6 +294,7 @@ short_test (void *cls)
   timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
 }
 
+
 static int test_prepared = GNUNET_NO;
 
 /**
@@ -316,7 +323,6 @@ prepare_test (void *cls)
 }
 
 
-
 /**
  * @brief Handle opening of queue
  *
@@ -332,18 +338,25 @@ static void
 add_queue_cb (void *cls,
               struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
               struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *
-              tc_queue)
+              tc_queue,
+              size_t mtu)
 {
+  if (TP_INIT != phase)
+    return;
   if (0 != strcmp ((char*) cls, cfg_peers_name[0]))
     return; // TODO?
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queue established, starting test...\n");
   start_short = GNUNET_TIME_absolute_get ();
-  my_tc = tc_queue;
+  my_tc = tc_h;
+  if (0 != mtu)
+    long_message_size = mtu;
+  else
+    long_message_size = LONG_MESSAGE_SIZE;
   phase = TP_BURST_SHORT;
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
+  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES);
   GNUNET_assert (NULL == to_task);
-  to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+  to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
                                           &latency_timeout,
                                           NULL);
   prepare_test (NULL);
@@ -395,6 +408,9 @@ incoming_message_cb (void *cls,
   timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
   switch (phase)
   {
+  case TP_INIT:
+    GNUNET_break (0);
+    break;
   case TP_BURST_SHORT:
     {
       GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
@@ -428,7 +444,7 @@ incoming_message_cb (void *cls,
     }
   case TP_BURST_LONG:
     {
-      if (LONG_MESSAGE_SIZE != payload_len)
+      if (long_message_size != payload_len)
       {
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                     "Ignoring packet with wrong length\n");
@@ -441,7 +457,7 @@ incoming_message_cb (void *cls,
       {
         GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
                     "Long size packet test done.\n");
-        char *goodput = GNUNET_STRINGS_byte_size_fancy ((LONG_MESSAGE_SIZE
+        char *goodput = GNUNET_STRINGS_byte_size_fancy ((long_message_size
                                                          * num_received * 1000
                                                          * 1000)
                                                         / duration.rel_value_us);
@@ -553,6 +569,7 @@ main (int argc,
   char *test_name;
   char *cfg_peer;
 
+  phase = TP_INIT;
   ret = 1;
   test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
   communicator_name = strchr (test_name, '-');
index fc6d13590b88606fddd7374ef43abae8713c5726..8250027f7c065eddaceabdc991e2b2232fc7059f 100644 (file)
@@ -33,7 +33,7 @@
 #include "gnunet_hello_lib.h"
 #include "gnunet_signatures.h"
 #include "transport.h"
-
+#include <inttypes.h>
 
 #define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
 
@@ -227,10 +227,20 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
   uint32_t nt;
 
   /**
-   * Maximum transmission unit, in NBO.  UINT32_MAX for unlimited.
+   * Maximum transmission unit.  UINT32_MAX for unlimited.
    */
   uint32_t mtu;
 
+  /**
+   * Queue length.  UINT64_MAX for unlimited.
+   */
+  uint64_t q_len;
+
+  /**
+   * Queue prio
+   */
+  uint32_t priority;
+
   /**
    * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
    */
@@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls,
   struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
   struct GNUNET_MQ_Envelope *env;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received backchannel message\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received backchannel message\n");
   if (tc_h->bc_enabled != GNUNET_YES)
   {
     GNUNET_SERVICE_client_continue (client->client);
@@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls,
   }
   /* Find client providing this communicator */
   /* Finally, deliver backchannel message to communicator */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Delivering backchannel message of type %u to %s\n",
-              ntohs (msg->type),
-              target_communicator);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Delivering backchannel message of type %u to %s\n",
+       ntohs (msg->type),
+       target_communicator);
   other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
                                         GNUNET_PeerIdentity*) &bc_msg->pid);
   env = GNUNET_MQ_msg_extra (
@@ -496,9 +506,6 @@ handle_incoming_msg (void *cls,
   msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
   size_t payload_len = ntohs (msg->size) - sizeof (struct
                                                    GNUNET_MessageHeader);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Incoming message from communicator!\n");
-
   if (NULL != tc_h->incoming_msg_cb)
   {
     tc_h->incoming_msg_cb (tc_h->cb_cls,
@@ -608,15 +615,14 @@ handle_add_queue_message (void *cls,
     client->tc;
   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
 
-  tc_queue = tc_h->queue_head;
-  if (NULL != tc_queue)
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got queue with ID %u\n", msg->qid);
+  for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
   {
-    while (tc_queue->qid != msg->qid)
-    {
-      tc_queue = tc_queue->next;
-    }
+    if (tc_queue->qid == msg->qid)
+      break;
   }
-  else
+  if (NULL == tc_queue)
   {
     tc_queue =
       GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
@@ -628,16 +634,58 @@ handle_add_queue_message (void *cls,
   GNUNET_assert (tc_queue->qid == msg->qid);
   GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
   tc_queue->nt = msg->nt;
-  tc_queue->mtu = msg->mtu;
+  tc_queue->mtu = ntohl (msg->mtu);
   tc_queue->cs = msg->cs;
+  tc_queue->priority = ntohl (msg->priority);
+  tc_queue->q_len = GNUNET_ntohll (msg->q_len);
   if (NULL != tc_h->add_queue_cb)
   {
-    tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue);
+    tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
   }
   GNUNET_SERVICE_client_continue (client->client);
 }
 
 
+/**
+ * @brief Handle new queue
+ *
+ * Store context and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_update_queue_message (void *cls,
+                             const struct
+                             GNUNET_TRANSPORT_UpdateQueueMessage *msg)
+{
+  struct MyClient *client = cls;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+    client->tc;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received queue update message for %u with q_len %"PRIu64"\n",
+       msg->qid, GNUNET_ntohll(msg->q_len));
+  tc_queue = tc_h->queue_head;
+  if (NULL != tc_queue)
+  {
+    while (tc_queue->qid != msg->qid)
+    {
+      tc_queue = tc_queue->next;
+    }
+  }
+  GNUNET_assert (tc_queue->qid == msg->qid);
+  GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
+  tc_queue->nt = msg->nt;
+  tc_queue->mtu = ntohl (msg->mtu);
+  tc_queue->cs = msg->cs;
+  tc_queue->priority = ntohl (msg->priority);
+  tc_queue->q_len += GNUNET_ntohll (msg->q_len);
+  GNUNET_SERVICE_client_continue (client->client);
+}
+
+
 /**
  * @brief Shut down the service
  *
@@ -789,6 +837,10 @@ transport_communicator_start (
                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
                            struct GNUNET_TRANSPORT_AddQueueMessage,
                            tc_h),
+    GNUNET_MQ_hd_fixed_size (update_queue_message,
+                             GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
+                             struct GNUNET_TRANSPORT_UpdateQueueMessage,
+                             tc_h),
     // GNUNET_MQ_hd_fixed_size (del_queue_message,
     //                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
     //                         struct GNUNET_TRANSPORT_DelQueueMessage,
@@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
  */
 void
 GNUNET_TRANSPORT_TESTING_transport_communicator_send
-  (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue,
+  (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
   GNUNET_SCHEDULER_TaskCallback cont,
   void *cont_cls,
   const void *payload,
@@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
   struct GNUNET_TRANSPORT_SendMessageTo *msg;
   struct GNUNET_MQ_Envelope *env;
   size_t inbox_size;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
 
+  tc_queue = NULL;
+  for (tc_queue_tmp = tc_h->queue_head;
+       NULL != tc_queue_tmp;
+       tc_queue_tmp = tc_queue_tmp->next)
+  {
+    if (tc_queue_tmp->q_len <= 0)
+      continue;
+    if (NULL == tc_queue)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+           tc_queue_tmp->priority,
+           tc_queue_tmp->q_len,
+           tc_queue_tmp->mtu);
+      tc_queue = tc_queue_tmp;
+      continue;
+    }
+    if (tc_queue->priority < tc_queue_tmp->priority)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+           tc_queue_tmp->priority,
+           tc_queue_tmp->q_len,
+           tc_queue_tmp->mtu);
+      tc_queue = tc_queue_tmp;
+    }
+  }
+  GNUNET_assert (NULL != tc_queue);
+  if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
+    tc_queue->q_len--;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending message\n");
   inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
index 7a449f0812eb49447328897b16ca8c6f68354e3a..b77125e826f541ecbb62d9eaeeac6965cf9e443f 100644 (file)
@@ -132,7 +132,8 @@ typedef void
                                              *tc_h,
                                              struct
                                              GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
-                                             *tc_queue);
+                                             *tc_queue,
+                                             size_t mtu);
 
 
 /**
@@ -215,8 +216,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (struct
  */
 void
 GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct
-                                                      GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
-                                                      *tc_queue,
+                                                      GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
+                                                      *tc_h,
                                                       GNUNET_SCHEDULER_TaskCallback
                                                       cont,
                                                       void *cont_cls,
index 36182d8d760e520630256238f65d696cd3fa4aeb..a64ffd5c6f93ab31a6c6f99d8e18b4e637d26a67 100644 (file)
@@ -835,6 +835,17 @@ struct GNUNET_TRANSPORT_AddQueueMessage
    */
   uint32_t mtu;
 
+  /**
+   * Queue length, in NBO. Defines how many messages may be
+   * send through this queue. UINT64_MAX for unlimited.
+   */
+  uint64_t q_len;
+
+  /**
+   * Priority of the queue in relation to other queues.
+   */
+  uint32_t priority;
+
   /**
    * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
    */
@@ -844,6 +855,55 @@ struct GNUNET_TRANSPORT_AddQueueMessage
 };
 
 
+/**
+ * Update queue
+ */
+struct GNUNET_TRANSPORT_UpdateQueueMessage
+{
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Queue identifier (used to identify the queue).
+   */
+  uint32_t qid GNUNET_PACKED;
+
+  /**
+   * Receiver that can be addressed via the queue.
+   */
+  struct GNUNET_PeerIdentity receiver;
+
+  /**
+   * An `enum GNUNET_NetworkType` in NBO.
+   */
+  uint32_t nt;
+
+  /**
+   * Maximum transmission unit, in NBO.  UINT32_MAX for unlimited.
+   */
+  uint32_t mtu;
+
+  /**
+   * Queue length, in NBO. Defines how many messages may be
+   * send through this queue. UINT64_MAX for unlimited.
+   */
+  uint64_t q_len;
+
+  /**
+   * Priority of the queue in relation to other queues.
+   */
+  uint32_t priority;
+
+  /**
+   * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
+   */
+  uint32_t cs;
+};
+
+
+
 /**
  * Remove queue, it is no longer available.
  */
index e80cd5c03389b8268ddd4eaa416eff49dbd6a234..cfa144415cb55c5ebaefcae3cd72c65cf62efde2 100644 (file)
@@ -280,6 +280,15 @@ struct GNUNET_TRANSPORT_QueueHandle
    * Maximum transmission unit for the queue.
    */
   uint32_t mtu;
+
+  /**
+   * Queue length.
+   */
+  uint64_t q_len;
+  /**
+   * Queue priority.
+   */
+  uint32_t priority;
 };
 
 
@@ -395,6 +404,8 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
 
   if (NULL == qh->ch->mq)
     return;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
   env = GNUNET_MQ_msg_extra (aqm,
                              strlen (qh->address) + 1,
                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
@@ -402,11 +413,39 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
   aqm->receiver = qh->peer;
   aqm->nt = htonl ((uint32_t) qh->nt);
   aqm->mtu = htonl (qh->mtu);
+  aqm->q_len = GNUNET_htonll (qh->q_len);
+  aqm->priority = htonl (qh->priority);
   aqm->cs = htonl ((uint32_t) qh->cs);
   memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
   GNUNET_MQ_send (qh->ch->mq, env);
 }
 
+/**
+ * Send message to the transport service about queue @a qh
+ * updated.
+ *
+ * @param qh queue to add
+ */
+static void
+send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
+
+  if (NULL == qh->ch->mq)
+    return;
+  env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
+  uqm->qid = htonl (qh->queue_id);
+  uqm->receiver = qh->peer;
+  uqm->nt = htonl ((uint32_t) qh->nt);
+  uqm->mtu = htonl (qh->mtu);
+  uqm->q_len = GNUNET_htonll (qh->q_len);
+  uqm->priority = htonl (qh->priority);
+  uqm->cs = htonl ((uint32_t) qh->cs);
+  GNUNET_MQ_send (qh->ch->mq, env);
+}
+
+
 
 /**
  * Send message to the transport service about queue @a qh
@@ -924,6 +963,9 @@ GNUNET_TRANSPORT_communicator_receive (
  * @param address address in human-readable format, 0-terminated, UTF-8
  * @param mtu maximum message size supported by queue, 0 if
  *            sending is not supported, SIZE_MAX for no MTU
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ *                 used
  * @param nt which network type does the @a address belong to?
  * @param cc what characteristics does the communicator have?
  * @param cs what is the connection status of the queue?
@@ -936,6 +978,8 @@ GNUNET_TRANSPORT_communicator_mq_add (
   const struct GNUNET_PeerIdentity *peer,
   const char *address,
   uint32_t mtu,
+  uint64_t q_len,
+  uint32_t priority,
   enum GNUNET_NetworkType nt,
   enum GNUNET_TRANSPORT_ConnectionStatus cs,
   struct GNUNET_MQ_Handle *mq)
@@ -948,6 +992,8 @@ GNUNET_TRANSPORT_communicator_mq_add (
   qh->address = GNUNET_strdup (address);
   qh->nt = nt;
   qh->mtu = mtu;
+  qh->q_len = q_len;
+  qh->priority = priority;
   qh->cs = cs;
   qh->mq = mq;
   qh->queue_id = ch->queue_gen++;
@@ -957,6 +1003,37 @@ GNUNET_TRANSPORT_communicator_mq_add (
 }
 
 
+/**
+ * Notify transport service that an MQ was updated
+ *
+ * @param ch connection to transport service
+ * @param qh the queue to update
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ *                 used
+ */
+void
+GNUNET_TRANSPORT_communicator_mq_update (
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+  const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
+  uint64_t q_len,
+  uint32_t priority)
+{
+  struct GNUNET_TRANSPORT_QueueHandle *qh;
+
+  for (qh = ch->queue_head; NULL != qh; qh = qh->next)
+  {
+    if (u_qh == qh)
+      break;
+  }
+  GNUNET_assert (NULL != qh);
+  qh->q_len = q_len;
+  qh->priority = priority;
+  send_update_queue (qh);
+}
+
+
+
 /**
  * Notify transport service that an MQ became unavailable due to a
  * disconnect or timeout.