struct GNUNET_CONTAINER_HeapNode *hn;
/**
- * Message queue we are providing for the #ch.
+ * KX message queue we are providing for the #ch.
*/
- struct GNUNET_MQ_Handle *mq;
+ struct GNUNET_MQ_Handle *kx_mq;
+
+ /**
+ * Default message queue we are providing for the #ch.
+ */
+ struct GNUNET_MQ_Handle *d_mq;
+
+ /**
+ * handle for KX queue with the #ch.
+ */
+ struct GNUNET_TRANSPORT_QueueHandle *kx_qh;
/**
- * handle for this queue with the #ch.
+ * handle for default queue with the #ch.
*/
- struct GNUNET_TRANSPORT_QueueHandle *qh;
+ struct GNUNET_TRANSPORT_QueueHandle *d_qh;
/**
* Timeout for this receiver address.
struct GNUNET_TIME_Absolute timeout;
/**
- * MTU we allowed transport for this receiver right now.
+ * MTU we allowed transport for this receiver's KX queue.
*/
- size_t mtu;
+ size_t kx_mtu;
+
+ /**
+ * MTU we allowed transport for this receiver's default queue.
+ */
+ size_t d_mtu;
/**
* Length of the DLL at @a ss_head.
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnecting receiver for peer `%s'\n",
GNUNET_i2s (&receiver->target));
- if (NULL != (mq = receiver->mq))
+ if (NULL != (mq = receiver->kx_mq))
{
- receiver->mq = NULL;
+ receiver->kx_mq = NULL;
GNUNET_MQ_destroy (mq);
}
- if (NULL != receiver->qh)
+ if (NULL != receiver->kx_qh)
{
- GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
- receiver->qh = NULL;
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+ receiver->kx_qh = NULL;
+ }
+ if (NULL != (mq = receiver->d_mq))
+ {
+ receiver->d_mq = NULL;
+ GNUNET_MQ_destroy (mq);
+ }
+ if (NULL != receiver->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+ receiver->d_qh = NULL;
}
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (receivers,
(void) pid;
for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Checking shared secrets\n");
if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode)))
{
uint32_t allowed;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found matching mac\n");
+ "Found matching mac\n");
allowed = ntohl (ack->sequence_max);
if (allowed > ss->sequence_allowed)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u > %u (%u)\n", allowed, ss->sequence_allowed,
- receiver->acks_available);
+ "%u > %u (%u)\n", allowed, ss->sequence_allowed,
+ receiver->acks_available);
receiver->acks_available += (allowed - ss->sequence_allowed);
- if ((allowed - ss->sequence_allowed) == receiver->acks_available)
- {
- /* we just incremented from zero => MTU change! */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "we just incremented from zero => MTU change!\n");
- //TODO setup_receiver_mq (receiver);
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Tell transport we have more acks!\n");
+ GNUNET_TRANSPORT_communicator_mq_update (ch,
+ receiver->d_qh,
+ (allowed - ss->sequence_allowed),
+ 1);
ss->sequence_allowed = allowed;
/* move ss to head to avoid discarding it anytime soon! */
GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss);
* @param impl_state our `struct ReceiverAddress`
*/
static void
-mq_send (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
+mq_send_kx (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
{
struct ReceiverAddress *receiver = impl_state;
uint16_t msize = ntohs (msg->size);
+ struct UdpHandshakeSignature uhs;
+ struct UDPConfirmation uc;
+ struct InitialKX kx;
+ struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
+ char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)];
+ size_t dpos;
+ gcry_cipher_hd_t out_cipher;
+ struct SharedSecret *ss;
+
- GNUNET_assert (mq == receiver->mq);
- if (msize > receiver->mtu)
+ GNUNET_assert (mq == receiver->kx_mq);
+ if (msize > receiver->kx_mtu)
{
GNUNET_break (0);
receiver_destroy (receiver);
}
reschedule_receiver_timeout (receiver);
- if (0 == receiver->acks_available)
+ /* setup key material */
+ GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+
+ ss = setup_shared_secret_enc (&epriv, receiver);
+ setup_cipher (&ss->master, 0, &out_cipher);
+ /* compute 'uc' */
+ uc.sender = my_identity;
+ uc.monotonic_time =
+ GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+ uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
+ uhs.purpose.size = htonl (sizeof(uhs));
+ uhs.sender = my_identity;
+ uhs.receiver = receiver->target;
+ GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
+ uhs.monotonic_time = uc.monotonic_time;
+ GNUNET_CRYPTO_eddsa_sign (my_private_key,
+ &uhs,
+ &uc.sender_sig);
+ /* Leave space for kx */
+ dpos = sizeof(kx);
+ /* Append encrypted uc to dgram */
+ GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
+ &dgram[dpos],
+ sizeof(uc),
+ &uc,
+ sizeof(uc)));
+ dpos += sizeof(uc);
+ /* Append encrypted payload to dgram */
+ GNUNET_assert (
+ 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
+ dpos += msize;
+ do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
+ /* Datagram starts with kx */
+ kx.ephemeral = uhs.ephemeral;
+ GNUNET_assert (
+ 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+ gcry_cipher_close (out_cipher);
+ memcpy (dgram, &kx, sizeof(kx));
+ if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
+ dgram,
+ sizeof(dgram),
+ receiver->address,
+ receiver->address_len))
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending KX to %s\n", GNUNET_a2s (receiver->address,
+ receiver->address_len));
+ GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Signature of functions implementing the sending functionality of a
+ * message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_send_d (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct ReceiverAddress *receiver = impl_state;
+ uint16_t msize = ntohs (msg->size);
+
+ GNUNET_assert (mq == receiver->d_mq);
+ if ((msize > receiver->d_mtu) ||
+ (0 == receiver->acks_available))
{
- /* use KX encryption method */
- struct UdpHandshakeSignature uhs;
- struct UDPConfirmation uc;
- struct InitialKX kx;
- struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
- char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)];
- size_t dpos;
- gcry_cipher_hd_t out_cipher;
- struct SharedSecret *ss;
+ GNUNET_break (0);
+ receiver_destroy (receiver);
+ return;
+ }
+ reschedule_receiver_timeout (receiver);
- /* setup key material */
- GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+ /* begin "BOX" encryption method, scan for ACKs from tail! */
+ for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
+ {
+ if (ss->sequence_used >= ss->sequence_allowed)
+ {
+ continue;
+ }
+ char dgram[sizeof(struct UDPBox) + receiver->d_mtu];
+ struct UDPBox *box;
+ gcry_cipher_hd_t out_cipher;
+ size_t dpos;
- ss = setup_shared_secret_enc (&epriv, receiver);
- setup_cipher (&ss->master, 0, &out_cipher);
- /* compute 'uc' */
- uc.sender = my_identity;
- uc.monotonic_time =
- GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
- uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
- uhs.purpose.size = htonl (sizeof(uhs));
- uhs.sender = my_identity;
- uhs.receiver = receiver->target;
- GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
- uhs.monotonic_time = uc.monotonic_time;
- GNUNET_CRYPTO_eddsa_sign (my_private_key,
- &uhs,
- &uc.sender_sig);
- /* Leave space for kx */
- dpos = sizeof(kx);
- /* Append encrypted uc to dgram */
- GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
- &dgram[dpos],
- sizeof(uc),
- &uc,
- sizeof(uc)));
- dpos += sizeof(uc);
+ box = (struct UDPBox *) dgram;
+ ss->sequence_used++;
+ get_kid (&ss->master, ss->sequence_used, &box->kid);
+ setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
/* Append encrypted payload to dgram */
+ dpos = sizeof(struct UDPBox);
GNUNET_assert (
0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
dpos += msize;
do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
- /* Datagram starts with kx */
- kx.ephemeral = uhs.ephemeral;
- GNUNET_assert (
- 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+ GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
+ box->gcm_tag,
+ sizeof(box->gcm_tag)));
gcry_cipher_close (out_cipher);
- memcpy (dgram, &kx, sizeof(kx));
if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
dgram,
sizeof(dgram),
receiver->address,
receiver->address_len))
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending KX to %s\n", GNUNET_a2s (receiver->address,
- receiver->address_len));
GNUNET_MQ_impl_send_continue (mq);
- return;
- } /* End of KX encryption method */
-
- /* begin "BOX" encryption method, scan for ACKs from tail! */
- for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "In non-kx mode...\n");
- if (ss->sequence_used < ss->sequence_allowed)
+ receiver->acks_available--;
+ if (0 == receiver->acks_available)
{
- char dgram[sizeof(struct UDPBox) + receiver->mtu];
- struct UDPBox *box;
- gcry_cipher_hd_t out_cipher;
- size_t dpos;
-
- box = (struct UDPBox *) dgram;
- ss->sequence_used++;
- get_kid (&ss->master, ss->sequence_used, &box->kid);
- setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
- /* Append encrypted payload to dgram */
- dpos = sizeof(struct UDPBox);
- GNUNET_assert (
- 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
- dpos += msize;
- do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
- GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
- box->gcm_tag,
- sizeof(box->gcm_tag)));
- gcry_cipher_close (out_cipher);
- if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
- dgram,
- sizeof(dgram),
- receiver->address,
- receiver->address_len))
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+ /* We have no more ACKs */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending data\n");
-
- GNUNET_MQ_impl_send_continue (mq);
- receiver->acks_available--;
- if (0 == receiver->acks_available)
- {
- /* We have no more ACKs => MTU change! */
- setup_receiver_mq (receiver);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No more acks, MTU changed\n");
- }
- return;
+ "No more acks\n");
}
}
- GNUNET_assert (0);
}
* @param impl_state our `struct ReceiverAddress`
*/
static void
-mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct ReceiverAddress *receiver = impl_state;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "MQ destroyed\n");
- if (mq == receiver->mq)
+ "Default MQ destroyed\n");
+ if (mq == receiver->d_mq)
{
- receiver->mq = NULL;
- //receiver_destroy (receiver);
+ receiver->d_mq = NULL;
+ receiver_destroy (receiver);
+ }
+}
+
+
+/**
+ * Signature of functions implementing the destruction of a message
+ * queue. Implementations must not free @a mq, but should take care
+ * of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct ReceiverAddress *receiver = impl_state;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "KX MQ destroyed\n");
+ if (mq == receiver->kx_mq)
+ {
+ receiver->kx_mq = NULL;
+ receiver_destroy (receiver);
}
}
{
size_t base_mtu;
- if (NULL != receiver->qh)
+ /*if (NULL != receiver->kx_qh)
{
- GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
- receiver->qh = NULL;
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+ receiver->kx_qh = NULL;
}
- //GNUNET_assert (NULL == receiver->mq);
+ if (NULL != receiver->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+ receiver->d_qh = NULL;
+ }*/
+ // GNUNET_assert (NULL == receiver->mq);
switch (receiver->address->sa_family)
{
case AF_INET:
GNUNET_assert (0);
break;
}
- if (0 == receiver->acks_available)
- {
- /* MTU based on full KX messages */
- receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
- - sizeof(struct UDPConfirmation); /* 104 */
- }
- else
- {
- /* MTU based on BOXed messages */
- receiver->mtu = base_mtu - sizeof(struct UDPBox);
- }
+ /* MTU based on full KX messages */
+ receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
+ - sizeof(struct UDPConfirmation); /* 104 */
+ /* MTU based on BOXed messages */
+ receiver->d_mtu = base_mtu - sizeof(struct UDPBox);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Setting up MQs and QHs\n");
/* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
1404 (IPv4 + Box) bytes, depending on circumstances... */
- if (NULL == receiver->mq)
- receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
- &mq_destroy,
- &mq_cancel,
- receiver,
- NULL,
- &mq_error,
- receiver);
- receiver->qh =
+ if (NULL == receiver->kx_mq)
+ receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx,
+ &mq_destroy_kx,
+ &mq_cancel,
+ receiver,
+ NULL,
+ &mq_error,
+ receiver);
+ if (NULL == receiver->d_mq)
+ receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
+ &mq_destroy_d,
+ &mq_cancel,
+ receiver,
+ NULL,
+ &mq_error,
+ receiver);
+
+ receiver->kx_qh =
GNUNET_TRANSPORT_communicator_mq_add (ch,
&receiver->target,
receiver->foreign_addr,
- receiver->mtu,
+ receiver->kx_mtu,
+ GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+ 0, /* Priority */
receiver->nt,
GNUNET_TRANSPORT_CS_OUTBOUND,
- receiver->mq);
+ receiver->kx_mq);
+ receiver->d_qh =
+ GNUNET_TRANSPORT_communicator_mq_add (ch,
+ &receiver->target,
+ receiver->foreign_addr,
+ receiver->d_mtu,
+ 0, /* Initialize with 0 acks */
+ 1, /* Priority */
+ receiver->nt,
+ GNUNET_TRANSPORT_CS_OUTBOUND,
+ receiver->d_mq);
+
}
#include "gnunet_hello_lib.h"
#include "gnunet_signatures.h"
#include "transport.h"
-
+#include <inttypes.h>
#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
uint32_t nt;
/**
- * Maximum transmission unit, in NBO. UINT32_MAX for unlimited.
+ * Maximum transmission unit. UINT32_MAX for unlimited.
*/
uint32_t mtu;
+ /**
+ * Queue length. UINT64_MAX for unlimited.
+ */
+ uint64_t q_len;
+
+ /**
+ * Queue prio
+ */
+ uint32_t priority;
+
/**
* An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
*/
struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
struct GNUNET_MQ_Envelope *env;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received backchannel message\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received backchannel message\n");
if (tc_h->bc_enabled != GNUNET_YES)
{
GNUNET_SERVICE_client_continue (client->client);
}
/* Find client providing this communicator */
/* Finally, deliver backchannel message to communicator */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivering backchannel message of type %u to %s\n",
- ntohs (msg->type),
- target_communicator);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Delivering backchannel message of type %u to %s\n",
+ ntohs (msg->type),
+ target_communicator);
other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
GNUNET_PeerIdentity*) &bc_msg->pid);
env = GNUNET_MQ_msg_extra (
msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
size_t payload_len = ntohs (msg->size) - sizeof (struct
GNUNET_MessageHeader);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Incoming message from communicator!\n");
-
if (NULL != tc_h->incoming_msg_cb)
{
tc_h->incoming_msg_cb (tc_h->cb_cls,
client->tc;
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
- tc_queue = tc_h->queue_head;
- if (NULL != tc_queue)
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got queue with ID %u\n", msg->qid);
+ for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
{
- while (tc_queue->qid != msg->qid)
- {
- tc_queue = tc_queue->next;
- }
+ if (tc_queue->qid == msg->qid)
+ break;
}
- else
+ if (NULL == tc_queue)
{
tc_queue =
GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
GNUNET_assert (tc_queue->qid == msg->qid);
GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
tc_queue->nt = msg->nt;
- tc_queue->mtu = msg->mtu;
+ tc_queue->mtu = ntohl (msg->mtu);
tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len = GNUNET_ntohll (msg->q_len);
if (NULL != tc_h->add_queue_cb)
{
- tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue);
+ tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
}
GNUNET_SERVICE_client_continue (client->client);
}
+/**
+ * @brief Handle new queue
+ *
+ * Store context and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_update_queue_message (void *cls,
+ const struct
+ GNUNET_TRANSPORT_UpdateQueueMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received queue update message for %u with q_len %"PRIu64"\n",
+ msg->qid, GNUNET_ntohll(msg->q_len));
+ tc_queue = tc_h->queue_head;
+ if (NULL != tc_queue)
+ {
+ while (tc_queue->qid != msg->qid)
+ {
+ tc_queue = tc_queue->next;
+ }
+ }
+ GNUNET_assert (tc_queue->qid == msg->qid);
+ GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
+ tc_queue->nt = msg->nt;
+ tc_queue->mtu = ntohl (msg->mtu);
+ tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len += GNUNET_ntohll (msg->q_len);
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
/**
* @brief Shut down the service
*
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
struct GNUNET_TRANSPORT_AddQueueMessage,
tc_h),
+ GNUNET_MQ_hd_fixed_size (update_queue_message,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
+ struct GNUNET_TRANSPORT_UpdateQueueMessage,
+ tc_h),
// GNUNET_MQ_hd_fixed_size (del_queue_message,
// GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
// struct GNUNET_TRANSPORT_DelQueueMessage,
*/
void
GNUNET_TRANSPORT_TESTING_transport_communicator_send
- (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue,
+ (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls,
const void *payload,
struct GNUNET_TRANSPORT_SendMessageTo *msg;
struct GNUNET_MQ_Envelope *env;
size_t inbox_size;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
+ tc_queue = NULL;
+ for (tc_queue_tmp = tc_h->queue_head;
+ NULL != tc_queue_tmp;
+ tc_queue_tmp = tc_queue_tmp->next)
+ {
+ if (tc_queue_tmp->q_len <= 0)
+ continue;
+ if (NULL == tc_queue)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ continue;
+ }
+ if (tc_queue->priority < tc_queue_tmp->priority)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ }
+ }
+ GNUNET_assert (NULL != tc_queue);
+ if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
+ tc_queue->q_len--;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message\n");
inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
* Maximum transmission unit for the queue.
*/
uint32_t mtu;
+
+ /**
+ * Queue length.
+ */
+ uint64_t q_len;
+ /**
+ * Queue priority.
+ */
+ uint32_t priority;
};
if (NULL == qh->ch->mq)
return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
env = GNUNET_MQ_msg_extra (aqm,
strlen (qh->address) + 1,
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
aqm->receiver = qh->peer;
aqm->nt = htonl ((uint32_t) qh->nt);
aqm->mtu = htonl (qh->mtu);
+ aqm->q_len = GNUNET_htonll (qh->q_len);
+ aqm->priority = htonl (qh->priority);
aqm->cs = htonl ((uint32_t) qh->cs);
memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
GNUNET_MQ_send (qh->ch->mq, env);
}
+/**
+ * Send message to the transport service about queue @a qh
+ * updated.
+ *
+ * @param qh queue to add
+ */
+static void
+send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
+
+ if (NULL == qh->ch->mq)
+ return;
+ env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
+ uqm->qid = htonl (qh->queue_id);
+ uqm->receiver = qh->peer;
+ uqm->nt = htonl ((uint32_t) qh->nt);
+ uqm->mtu = htonl (qh->mtu);
+ uqm->q_len = GNUNET_htonll (qh->q_len);
+ uqm->priority = htonl (qh->priority);
+ uqm->cs = htonl ((uint32_t) qh->cs);
+ GNUNET_MQ_send (qh->ch->mq, env);
+}
+
+
/**
* Send message to the transport service about queue @a qh
* @param address address in human-readable format, 0-terminated, UTF-8
* @param mtu maximum message size supported by queue, 0 if
* sending is not supported, SIZE_MAX for no MTU
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ * used
* @param nt which network type does the @a address belong to?
* @param cc what characteristics does the communicator have?
* @param cs what is the connection status of the queue?
const struct GNUNET_PeerIdentity *peer,
const char *address,
uint32_t mtu,
+ uint64_t q_len,
+ uint32_t priority,
enum GNUNET_NetworkType nt,
enum GNUNET_TRANSPORT_ConnectionStatus cs,
struct GNUNET_MQ_Handle *mq)
qh->address = GNUNET_strdup (address);
qh->nt = nt;
qh->mtu = mtu;
+ qh->q_len = q_len;
+ qh->priority = priority;
qh->cs = cs;
qh->mq = mq;
qh->queue_id = ch->queue_gen++;
}
+/**
+ * Notify transport service that an MQ was updated
+ *
+ * @param ch connection to transport service
+ * @param qh the queue to update
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ * used
+ */
+void
+GNUNET_TRANSPORT_communicator_mq_update (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
+ uint64_t q_len,
+ uint32_t priority)
+{
+ struct GNUNET_TRANSPORT_QueueHandle *qh;
+
+ for (qh = ch->queue_head; NULL != qh; qh = qh->next)
+ {
+ if (u_qh == qh)
+ break;
+ }
+ GNUNET_assert (NULL != qh);
+ qh->q_len = q_len;
+ qh->priority = priority;
+ send_update_queue (qh);
+}
+
+
+
/**
* Notify transport service that an MQ became unavailable due to a
* disconnect or timeout.