From: Martin Schanzenbach Date: Mon, 1 Jun 2020 14:39:35 +0000 (+0200) Subject: tng: more UDP communicator backchannels X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=198c09654354d09a9b33f27cf095e0295f70826c;p=oweals%2Fgnunet.git tng: more UDP communicator backchannels Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter. --- diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 282bb53d1..a9cd7466a 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -3161,6 +3161,10 @@ extern "C" { */ #define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221 +/** + * @brief inform transport that a queue was updated + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE 1222 /** * Message sent to indicate to the transport that a monitor diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h index 3ead03536..ea6b95e2d 100644 --- a/src/include/gnunet_transport_communication_service.h +++ b/src/include/gnunet_transport_communication_service.h @@ -50,6 +50,10 @@ extern "C" { */ #define GNUNET_TRANSPORT_COMMUNICATION_VERSION 0x00000000 +/** + * Queue length + */ +#define GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED UINT64_MAX /** * Function called by the transport service to initialize a @@ -252,6 +256,9 @@ enum GNUNET_TRANSPORT_ConnectionStatus * @param address address in human-readable format, 0-terminated, UTF-8 * @param mtu maximum message size supported by queue, 0 if * sending is not supported, SIZE_MAX for no MTU + * @param q_len number of messages that can be send through this queue + * @param priority queue priority. Queues with highest priority should be + * used * @param nt which network type does the @a address belong to? * @param cs what is the connection status of the queue? * @param mq message queue of the @a peer @@ -263,10 +270,27 @@ GNUNET_TRANSPORT_communicator_mq_add ( const struct GNUNET_PeerIdentity *peer, const char *address, uint32_t mtu, + uint64_t q_len, + uint32_t priority, enum GNUNET_NetworkType nt, enum GNUNET_TRANSPORT_ConnectionStatus cs, struct GNUNET_MQ_Handle *mq); +/** + * Notify transport service that an MQ was updated + * + * @param ch connection to transport service + * @param qh the queue to update + * @param q_len number of messages that can be send through this queue + * @param priority queue priority. Queues with highest priority should be + * used + */ +void +GNUNET_TRANSPORT_communicator_mq_update ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + const struct GNUNET_TRANSPORT_QueueHandle *u_qh, + uint64_t q_len, + uint32_t priority); /** * Notify transport service that an MQ became unavailable due to a diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c index bbfacbffd..7f70c55df 100644 --- a/src/transport/gnunet-communicator-tcp.c +++ b/src/transport/gnunet-communicator-tcp.c @@ -1547,6 +1547,8 @@ boot_queue (struct Queue *queue, enum GNUNET_TRANSPORT_ConnectionStatus cs) &queue->target, foreign_addr, 0 /* no MTU */, + GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, + 0, /* Priority */ queue->nt, cs, queue->mq); diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c index 344ba5180..46d9766d0 100644 --- a/src/transport/gnunet-communicator-udp.c +++ b/src/transport/gnunet-communicator-udp.c @@ -549,14 +549,24 @@ struct ReceiverAddress struct GNUNET_CONTAINER_HeapNode *hn; /** - * Message queue we are providing for the #ch. + * KX message queue we are providing for the #ch. */ - struct GNUNET_MQ_Handle *mq; + struct GNUNET_MQ_Handle *kx_mq; + + /** + * Default message queue we are providing for the #ch. + */ + struct GNUNET_MQ_Handle *d_mq; + + /** + * handle for KX queue with the #ch. + */ + struct GNUNET_TRANSPORT_QueueHandle *kx_qh; /** - * handle for this queue with the #ch. + * handle for default queue with the #ch. */ - struct GNUNET_TRANSPORT_QueueHandle *qh; + struct GNUNET_TRANSPORT_QueueHandle *d_qh; /** * Timeout for this receiver address. @@ -564,9 +574,14 @@ struct ReceiverAddress struct GNUNET_TIME_Absolute timeout; /** - * MTU we allowed transport for this receiver right now. + * MTU we allowed transport for this receiver's KX queue. */ - size_t mtu; + size_t kx_mtu; + + /** + * MTU we allowed transport for this receiver's default queue. + */ + size_t d_mtu; /** * Length of the DLL at @a ss_head. @@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting receiver for peer `%s'\n", GNUNET_i2s (&receiver->target)); - if (NULL != (mq = receiver->mq)) + if (NULL != (mq = receiver->kx_mq)) { - receiver->mq = NULL; + receiver->kx_mq = NULL; GNUNET_MQ_destroy (mq); } - if (NULL != receiver->qh) + if (NULL != receiver->kx_qh) { - GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); - receiver->qh = NULL; + GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh); + receiver->kx_qh = NULL; + } + if (NULL != (mq = receiver->d_mq)) + { + receiver->d_mq = NULL; + GNUNET_MQ_destroy (mq); + } + if (NULL != receiver->d_qh) + { + GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh); + receiver->d_qh = NULL; } GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (receivers, @@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) (void) pid; for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Checking shared secrets\n"); if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) { uint32_t allowed; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found matching mac\n"); + "Found matching mac\n"); allowed = ntohl (ack->sequence_max); if (allowed > ss->sequence_allowed) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u > %u (%u)\n", allowed, ss->sequence_allowed, - receiver->acks_available); + "%u > %u (%u)\n", allowed, ss->sequence_allowed, + receiver->acks_available); receiver->acks_available += (allowed - ss->sequence_allowed); - if ((allowed - ss->sequence_allowed) == receiver->acks_available) - { - /* we just incremented from zero => MTU change! */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "we just incremented from zero => MTU change!\n"); - //TODO setup_receiver_mq (receiver); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Tell transport we have more acks!\n"); + GNUNET_TRANSPORT_communicator_mq_update (ch, + receiver->d_qh, + (allowed - ss->sequence_allowed), + 1); ss->sequence_allowed = allowed; /* move ss to head to avoid discarding it anytime soon! */ GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss); @@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size) * @param impl_state our `struct ReceiverAddress` */ static void -mq_send (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) +mq_send_kx (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ReceiverAddress *receiver = impl_state; uint16_t msize = ntohs (msg->size); + struct UdpHandshakeSignature uhs; + struct UDPConfirmation uc; + struct InitialKX kx; + struct GNUNET_CRYPTO_EcdhePrivateKey epriv; + char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)]; + size_t dpos; + gcry_cipher_hd_t out_cipher; + struct SharedSecret *ss; + - GNUNET_assert (mq == receiver->mq); - if (msize > receiver->mtu) + GNUNET_assert (mq == receiver->kx_mq); + if (msize > receiver->kx_mtu) { GNUNET_break (0); receiver_destroy (receiver); @@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq, } reschedule_receiver_timeout (receiver); - if (0 == receiver->acks_available) + /* setup key material */ + GNUNET_CRYPTO_ecdhe_key_create (&epriv); + + ss = setup_shared_secret_enc (&epriv, receiver); + setup_cipher (&ss->master, 0, &out_cipher); + /* compute 'uc' */ + uc.sender = my_identity; + uc.monotonic_time = + GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); + uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE); + uhs.purpose.size = htonl (sizeof(uhs)); + uhs.sender = my_identity; + uhs.receiver = receiver->target; + GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral); + uhs.monotonic_time = uc.monotonic_time; + GNUNET_CRYPTO_eddsa_sign (my_private_key, + &uhs, + &uc.sender_sig); + /* Leave space for kx */ + dpos = sizeof(kx); + /* Append encrypted uc to dgram */ + GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher, + &dgram[dpos], + sizeof(uc), + &uc, + sizeof(uc))); + dpos += sizeof(uc); + /* Append encrypted payload to dgram */ + GNUNET_assert ( + 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); + dpos += msize; + do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); + /* Datagram starts with kx */ + kx.ephemeral = uhs.ephemeral; + GNUNET_assert ( + 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag))); + gcry_cipher_close (out_cipher); + memcpy (dgram, &kx, sizeof(kx)); + if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, + dgram, + sizeof(dgram), + receiver->address, + receiver->address_len)) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending KX to %s\n", GNUNET_a2s (receiver->address, + receiver->address_len)); + GNUNET_MQ_impl_send_continue (mq); +} + + +/** + * Signature of functions implementing the sending functionality of a + * message queue. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state our `struct ReceiverAddress` + */ +static void +mq_send_d (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ + struct ReceiverAddress *receiver = impl_state; + uint16_t msize = ntohs (msg->size); + + GNUNET_assert (mq == receiver->d_mq); + if ((msize > receiver->d_mtu) || + (0 == receiver->acks_available)) { - /* use KX encryption method */ - struct UdpHandshakeSignature uhs; - struct UDPConfirmation uc; - struct InitialKX kx; - struct GNUNET_CRYPTO_EcdhePrivateKey epriv; - char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)]; - size_t dpos; - gcry_cipher_hd_t out_cipher; - struct SharedSecret *ss; + GNUNET_break (0); + receiver_destroy (receiver); + return; + } + reschedule_receiver_timeout (receiver); - /* setup key material */ - GNUNET_CRYPTO_ecdhe_key_create (&epriv); + /* begin "BOX" encryption method, scan for ACKs from tail! */ + for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) + { + if (ss->sequence_used >= ss->sequence_allowed) + { + continue; + } + char dgram[sizeof(struct UDPBox) + receiver->d_mtu]; + struct UDPBox *box; + gcry_cipher_hd_t out_cipher; + size_t dpos; - ss = setup_shared_secret_enc (&epriv, receiver); - setup_cipher (&ss->master, 0, &out_cipher); - /* compute 'uc' */ - uc.sender = my_identity; - uc.monotonic_time = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); - uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE); - uhs.purpose.size = htonl (sizeof(uhs)); - uhs.sender = my_identity; - uhs.receiver = receiver->target; - GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral); - uhs.monotonic_time = uc.monotonic_time; - GNUNET_CRYPTO_eddsa_sign (my_private_key, - &uhs, - &uc.sender_sig); - /* Leave space for kx */ - dpos = sizeof(kx); - /* Append encrypted uc to dgram */ - GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher, - &dgram[dpos], - sizeof(uc), - &uc, - sizeof(uc))); - dpos += sizeof(uc); + box = (struct UDPBox *) dgram; + ss->sequence_used++; + get_kid (&ss->master, ss->sequence_used, &box->kid); + setup_cipher (&ss->master, ss->sequence_used, &out_cipher); /* Append encrypted payload to dgram */ + dpos = sizeof(struct UDPBox); GNUNET_assert ( 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); dpos += msize; do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); - /* Datagram starts with kx */ - kx.ephemeral = uhs.ephemeral; - GNUNET_assert ( - 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag))); + GNUNET_assert (0 == gcry_cipher_gettag (out_cipher, + box->gcm_tag, + sizeof(box->gcm_tag))); gcry_cipher_close (out_cipher); - memcpy (dgram, &kx, sizeof(kx)); if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, dgram, sizeof(dgram), receiver->address, receiver->address_len)) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending KX to %s\n", GNUNET_a2s (receiver->address, - receiver->address_len)); GNUNET_MQ_impl_send_continue (mq); - return; - } /* End of KX encryption method */ - - /* begin "BOX" encryption method, scan for ACKs from tail! */ - for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "In non-kx mode...\n"); - if (ss->sequence_used < ss->sequence_allowed) + receiver->acks_available--; + if (0 == receiver->acks_available) { - char dgram[sizeof(struct UDPBox) + receiver->mtu]; - struct UDPBox *box; - gcry_cipher_hd_t out_cipher; - size_t dpos; - - box = (struct UDPBox *) dgram; - ss->sequence_used++; - get_kid (&ss->master, ss->sequence_used, &box->kid); - setup_cipher (&ss->master, ss->sequence_used, &out_cipher); - /* Append encrypted payload to dgram */ - dpos = sizeof(struct UDPBox); - GNUNET_assert ( - 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); - dpos += msize; - do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); - GNUNET_assert (0 == gcry_cipher_gettag (out_cipher, - box->gcm_tag, - sizeof(box->gcm_tag))); - gcry_cipher_close (out_cipher); - if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, - dgram, - sizeof(dgram), - receiver->address, - receiver->address_len)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); + /* We have no more ACKs */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending data\n"); - - GNUNET_MQ_impl_send_continue (mq); - receiver->acks_available--; - if (0 == receiver->acks_available) - { - /* We have no more ACKs => MTU change! */ - setup_receiver_mq (receiver); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No more acks, MTU changed\n"); - } - return; + "No more acks\n"); } } - GNUNET_assert (0); } @@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq, * @param impl_state our `struct ReceiverAddress` */ static void -mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) +mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct ReceiverAddress *receiver = impl_state; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "MQ destroyed\n"); - if (mq == receiver->mq) + "Default MQ destroyed\n"); + if (mq == receiver->d_mq) { - receiver->mq = NULL; - //receiver_destroy (receiver); + receiver->d_mq = NULL; + receiver_destroy (receiver); + } +} + + +/** + * Signature of functions implementing the destruction of a message + * queue. Implementations must not free @a mq, but should take care + * of @a impl_state. + * + * @param mq the message queue to destroy + * @param impl_state our `struct ReceiverAddress` + */ +static void +mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state) +{ + struct ReceiverAddress *receiver = impl_state; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "KX MQ destroyed\n"); + if (mq == receiver->kx_mq) + { + receiver->kx_mq = NULL; + receiver_destroy (receiver); } } @@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver) { size_t base_mtu; - if (NULL != receiver->qh) + /*if (NULL != receiver->kx_qh) { - GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); - receiver->qh = NULL; + GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh); + receiver->kx_qh = NULL; } - //GNUNET_assert (NULL == receiver->mq); + if (NULL != receiver->d_qh) + { + GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh); + receiver->d_qh = NULL; + }*/ + // GNUNET_assert (NULL == receiver->mq); switch (receiver->address->sa_family) { case AF_INET: @@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver) GNUNET_assert (0); break; } - if (0 == receiver->acks_available) - { - /* MTU based on full KX messages */ - receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ - - sizeof(struct UDPConfirmation); /* 104 */ - } - else - { - /* MTU based on BOXed messages */ - receiver->mtu = base_mtu - sizeof(struct UDPBox); - } + /* MTU based on full KX messages */ + receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ + - sizeof(struct UDPConfirmation); /* 104 */ + /* MTU based on BOXed messages */ + receiver->d_mtu = base_mtu - sizeof(struct UDPBox); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Setting up MQs and QHs\n"); /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to 1404 (IPv4 + Box) bytes, depending on circumstances... */ - if (NULL == receiver->mq) - receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send, - &mq_destroy, - &mq_cancel, - receiver, - NULL, - &mq_error, - receiver); - receiver->qh = + if (NULL == receiver->kx_mq) + receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx, + &mq_destroy_kx, + &mq_cancel, + receiver, + NULL, + &mq_error, + receiver); + if (NULL == receiver->d_mq) + receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d, + &mq_destroy_d, + &mq_cancel, + receiver, + NULL, + &mq_error, + receiver); + + receiver->kx_qh = GNUNET_TRANSPORT_communicator_mq_add (ch, &receiver->target, receiver->foreign_addr, - receiver->mtu, + receiver->kx_mtu, + GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, + 0, /* Priority */ receiver->nt, GNUNET_TRANSPORT_CS_OUTBOUND, - receiver->mq); + receiver->kx_mq); + receiver->d_qh = + GNUNET_TRANSPORT_communicator_mq_add (ch, + &receiver->target, + receiver->foreign_addr, + receiver->d_mtu, + 0, /* Initialize with 0 acks */ + 1, /* Priority */ + receiver->nt, + GNUNET_TRANSPORT_CS_OUTBOUND, + receiver->d_mq); + } diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index 31d2e4ed3..27dda7281 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c @@ -670,6 +670,8 @@ setup_queue (const struct GNUNET_PeerIdentity *target, &queue->target, foreign_addr, UNIX_MTU, + GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, + 0, GNUNET_NT_LOOPBACK, cs, queue->mq); diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index 1dfcf2371..1ea79fa19 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c @@ -58,19 +58,21 @@ static char *cfg_peers_name[NUM_PEERS]; static int ret; +static size_t long_message_size; + static struct GNUNET_TIME_Absolute start_short; static struct GNUNET_TIME_Absolute start_long; static struct GNUNET_TIME_Absolute timeout; -static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc; +static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc; #define SHORT_MESSAGE_SIZE 128 -#define LONG_MESSAGE_SIZE 32000 +#define LONG_MESSAGE_SIZE 32000 /* FIXME */ -#define BURST_PACKETS 50 +#define BURST_PACKETS 500 #define TOTAL_ITERATIONS 1 @@ -88,6 +90,7 @@ static unsigned int iterations_left = TOTAL_ITERATIONS; enum TestPhase { + TP_INIT, TP_BURST_SHORT, TP_BURST_LONG, TP_SIZE_CHECK @@ -230,15 +233,18 @@ static void size_test (void *cls) { char *payload; + size_t max_size = 64000; GNUNET_assert (TP_SIZE_CHECK == phase); - if (ack >= 64000) + if (LONG_MESSAGE_SIZE != long_message_size) + max_size = long_message_size; + if (ack >= max_size) return; /* Leave some room for our protocol, so not 2^16 exactly */ payload = make_payload (ack); ack += 5; num_sent++; GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, - (ack < 64000) + (ack < max_size) ? &size_test : NULL, NULL, @@ -254,7 +260,7 @@ long_test (void *cls) { char *payload; - payload = make_payload (LONG_MESSAGE_SIZE); + payload = make_payload (long_message_size); num_sent++; GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, (BURST_PACKETS == @@ -263,7 +269,7 @@ long_test (void *cls) : &long_test, NULL, payload, - LONG_MESSAGE_SIZE); + long_message_size); GNUNET_free (payload); timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); } @@ -288,6 +294,7 @@ short_test (void *cls) timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); } + static int test_prepared = GNUNET_NO; /** @@ -316,7 +323,6 @@ prepare_test (void *cls) } - /** * @brief Handle opening of queue * @@ -332,18 +338,25 @@ static void add_queue_cb (void *cls, struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue * - tc_queue) + tc_queue, + size_t mtu) { + if (TP_INIT != phase) + return; if (0 != strcmp ((char*) cls, cfg_peers_name[0])) return; // TODO? LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue established, starting test...\n"); start_short = GNUNET_TIME_absolute_get (); - my_tc = tc_queue; + my_tc = tc_h; + if (0 != mtu) + long_message_size = mtu; + else + long_message_size = LONG_MESSAGE_SIZE; phase = TP_BURST_SHORT; - timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); + timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES); GNUNET_assert (NULL == to_task); - to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &latency_timeout, NULL); prepare_test (NULL); @@ -395,6 +408,9 @@ incoming_message_cb (void *cls, timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); switch (phase) { + case TP_INIT: + GNUNET_break (0); + break; case TP_BURST_SHORT: { GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len); @@ -428,7 +444,7 @@ incoming_message_cb (void *cls, } case TP_BURST_LONG: { - if (LONG_MESSAGE_SIZE != payload_len) + if (long_message_size != payload_len) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Ignoring packet with wrong length\n"); @@ -441,7 +457,7 @@ incoming_message_cb (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Long size packet test done.\n"); - char *goodput = GNUNET_STRINGS_byte_size_fancy ((LONG_MESSAGE_SIZE + char *goodput = GNUNET_STRINGS_byte_size_fancy ((long_message_size * num_received * 1000 * 1000) / duration.rel_value_us); @@ -553,6 +569,7 @@ main (int argc, char *test_name; char *cfg_peer; + phase = TP_INIT; ret = 1; test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]); communicator_name = strchr (test_name, '-'); diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index fc6d13590..8250027f7 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -33,7 +33,7 @@ #include "gnunet_hello_lib.h" #include "gnunet_signatures.h" #include "transport.h" - +#include #define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) @@ -227,10 +227,20 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue uint32_t nt; /** - * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. + * Maximum transmission unit. UINT32_MAX for unlimited. */ uint32_t mtu; + /** + * Queue length. UINT64_MAX for unlimited. + */ + uint64_t q_len; + + /** + * Queue prio + */ + uint32_t priority; + /** * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. */ @@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls, struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; struct GNUNET_MQ_Envelope *env; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received backchannel message\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received backchannel message\n"); if (tc_h->bc_enabled != GNUNET_YES) { GNUNET_SERVICE_client_continue (client->client); @@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls, } /* Find client providing this communicator */ /* Finally, deliver backchannel message to communicator */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Delivering backchannel message of type %u to %s\n", - ntohs (msg->type), - target_communicator); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Delivering backchannel message of type %u to %s\n", + ntohs (msg->type), + target_communicator); other_tc_h = tc_h->bc_cb (tc_h, msg, (struct GNUNET_PeerIdentity*) &bc_msg->pid); env = GNUNET_MQ_msg_extra ( @@ -496,9 +506,6 @@ handle_incoming_msg (void *cls, msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; size_t payload_len = ntohs (msg->size) - sizeof (struct GNUNET_MessageHeader); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Incoming message from communicator!\n"); - if (NULL != tc_h->incoming_msg_cb) { tc_h->incoming_msg_cb (tc_h->cb_cls, @@ -608,15 +615,14 @@ handle_add_queue_message (void *cls, client->tc; struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; - tc_queue = tc_h->queue_head; - if (NULL != tc_queue) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got queue with ID %u\n", msg->qid); + for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next) { - while (tc_queue->qid != msg->qid) - { - tc_queue = tc_queue->next; - } + if (tc_queue->qid == msg->qid) + break; } - else + if (NULL == tc_queue) { tc_queue = GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); @@ -628,16 +634,58 @@ handle_add_queue_message (void *cls, GNUNET_assert (tc_queue->qid == msg->qid); GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); tc_queue->nt = msg->nt; - tc_queue->mtu = msg->mtu; + tc_queue->mtu = ntohl (msg->mtu); tc_queue->cs = msg->cs; + tc_queue->priority = ntohl (msg->priority); + tc_queue->q_len = GNUNET_ntohll (msg->q_len); if (NULL != tc_h->add_queue_cb) { - tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue); + tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu); } GNUNET_SERVICE_client_continue (client->client); } +/** + * @brief Handle new queue + * + * Store context and call client callback. + * + * @param cls Closure - communicator handle + * @param msg Message struct + */ +static void +handle_update_queue_message (void *cls, + const struct + GNUNET_TRANSPORT_UpdateQueueMessage *msg) +{ + struct MyClient *client = cls; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = + client->tc; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received queue update message for %u with q_len %"PRIu64"\n", + msg->qid, GNUNET_ntohll(msg->q_len)); + tc_queue = tc_h->queue_head; + if (NULL != tc_queue) + { + while (tc_queue->qid != msg->qid) + { + tc_queue = tc_queue->next; + } + } + GNUNET_assert (tc_queue->qid == msg->qid); + GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); + tc_queue->nt = msg->nt; + tc_queue->mtu = ntohl (msg->mtu); + tc_queue->cs = msg->cs; + tc_queue->priority = ntohl (msg->priority); + tc_queue->q_len += GNUNET_ntohll (msg->q_len); + GNUNET_SERVICE_client_continue (client->client); +} + + /** * @brief Shut down the service * @@ -789,6 +837,10 @@ transport_communicator_start ( GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, struct GNUNET_TRANSPORT_AddQueueMessage, tc_h), + GNUNET_MQ_hd_fixed_size (update_queue_message, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE, + struct GNUNET_TRANSPORT_UpdateQueueMessage, + tc_h), // GNUNET_MQ_hd_fixed_size (del_queue_message, // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, // struct GNUNET_TRANSPORT_DelQueueMessage, @@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue ( */ void GNUNET_TRANSPORT_TESTING_transport_communicator_send - (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, + (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls, const void *payload, @@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send struct GNUNET_TRANSPORT_SendMessageTo *msg; struct GNUNET_MQ_Envelope *env; size_t inbox_size; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp; + tc_queue = NULL; + for (tc_queue_tmp = tc_h->queue_head; + NULL != tc_queue_tmp; + tc_queue_tmp = tc_queue_tmp->next) + { + if (tc_queue_tmp->q_len <= 0) + continue; + if (NULL == tc_queue) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n", + tc_queue_tmp->priority, + tc_queue_tmp->q_len, + tc_queue_tmp->mtu); + tc_queue = tc_queue_tmp; + continue; + } + if (tc_queue->priority < tc_queue_tmp->priority) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n", + tc_queue_tmp->priority, + tc_queue_tmp->q_len, + tc_queue_tmp->mtu); + tc_queue = tc_queue_tmp; + } + } + GNUNET_assert (NULL != tc_queue); + if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED) + tc_queue->q_len--; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message\n"); inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index 7a449f081..b77125e82 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h @@ -132,7 +132,8 @@ typedef void *tc_h, struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue - *tc_queue); + *tc_queue, + size_t mtu); /** @@ -215,8 +216,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (struct */ void GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct - GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue - *tc_queue, + GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle + *tc_h, GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls, diff --git a/src/transport/transport.h b/src/transport/transport.h index 36182d8d7..a64ffd5c6 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -835,6 +835,17 @@ struct GNUNET_TRANSPORT_AddQueueMessage */ uint32_t mtu; + /** + * Queue length, in NBO. Defines how many messages may be + * send through this queue. UINT64_MAX for unlimited. + */ + uint64_t q_len; + + /** + * Priority of the queue in relation to other queues. + */ + uint32_t priority; + /** * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. */ @@ -844,6 +855,55 @@ struct GNUNET_TRANSPORT_AddQueueMessage }; +/** + * Update queue + */ +struct GNUNET_TRANSPORT_UpdateQueueMessage +{ + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP. + */ + struct GNUNET_MessageHeader header; + + /** + * Queue identifier (used to identify the queue). + */ + uint32_t qid GNUNET_PACKED; + + /** + * Receiver that can be addressed via the queue. + */ + struct GNUNET_PeerIdentity receiver; + + /** + * An `enum GNUNET_NetworkType` in NBO. + */ + uint32_t nt; + + /** + * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. + */ + uint32_t mtu; + + /** + * Queue length, in NBO. Defines how many messages may be + * send through this queue. UINT64_MAX for unlimited. + */ + uint64_t q_len; + + /** + * Priority of the queue in relation to other queues. + */ + uint32_t priority; + + /** + * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. + */ + uint32_t cs; +}; + + + /** * Remove queue, it is no longer available. */ diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index e80cd5c03..cfa144415 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -280,6 +280,15 @@ struct GNUNET_TRANSPORT_QueueHandle * Maximum transmission unit for the queue. */ uint32_t mtu; + + /** + * Queue length. + */ + uint64_t q_len; + /** + * Queue priority. + */ + uint32_t priority; }; @@ -395,6 +404,8 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) if (NULL == qh->ch->mq) return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n"); env = GNUNET_MQ_msg_extra (aqm, strlen (qh->address) + 1, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); @@ -402,11 +413,39 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) aqm->receiver = qh->peer; aqm->nt = htonl ((uint32_t) qh->nt); aqm->mtu = htonl (qh->mtu); + aqm->q_len = GNUNET_htonll (qh->q_len); + aqm->priority = htonl (qh->priority); aqm->cs = htonl ((uint32_t) qh->cs); memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); GNUNET_MQ_send (qh->ch->mq, env); } +/** + * Send message to the transport service about queue @a qh + * updated. + * + * @param qh queue to add + */ +static void +send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm; + + if (NULL == qh->ch->mq) + return; + env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE); + uqm->qid = htonl (qh->queue_id); + uqm->receiver = qh->peer; + uqm->nt = htonl ((uint32_t) qh->nt); + uqm->mtu = htonl (qh->mtu); + uqm->q_len = GNUNET_htonll (qh->q_len); + uqm->priority = htonl (qh->priority); + uqm->cs = htonl ((uint32_t) qh->cs); + GNUNET_MQ_send (qh->ch->mq, env); +} + + /** * Send message to the transport service about queue @a qh @@ -924,6 +963,9 @@ GNUNET_TRANSPORT_communicator_receive ( * @param address address in human-readable format, 0-terminated, UTF-8 * @param mtu maximum message size supported by queue, 0 if * sending is not supported, SIZE_MAX for no MTU + * @param q_len number of messages that can be send through this queue + * @param priority queue priority. Queues with highest priority should be + * used * @param nt which network type does the @a address belong to? * @param cc what characteristics does the communicator have? * @param cs what is the connection status of the queue? @@ -936,6 +978,8 @@ GNUNET_TRANSPORT_communicator_mq_add ( const struct GNUNET_PeerIdentity *peer, const char *address, uint32_t mtu, + uint64_t q_len, + uint32_t priority, enum GNUNET_NetworkType nt, enum GNUNET_TRANSPORT_ConnectionStatus cs, struct GNUNET_MQ_Handle *mq) @@ -948,6 +992,8 @@ GNUNET_TRANSPORT_communicator_mq_add ( qh->address = GNUNET_strdup (address); qh->nt = nt; qh->mtu = mtu; + qh->q_len = q_len; + qh->priority = priority; qh->cs = cs; qh->mq = mq; qh->queue_id = ch->queue_gen++; @@ -957,6 +1003,37 @@ GNUNET_TRANSPORT_communicator_mq_add ( } +/** + * Notify transport service that an MQ was updated + * + * @param ch connection to transport service + * @param qh the queue to update + * @param q_len number of messages that can be send through this queue + * @param priority queue priority. Queues with highest priority should be + * used + */ +void +GNUNET_TRANSPORT_communicator_mq_update ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + const struct GNUNET_TRANSPORT_QueueHandle *u_qh, + uint64_t q_len, + uint32_t priority) +{ + struct GNUNET_TRANSPORT_QueueHandle *qh; + + for (qh = ch->queue_head; NULL != qh; qh = qh->next) + { + if (u_qh == qh) + break; + } + GNUNET_assert (NULL != qh); + qh->q_len = q_len; + qh->priority = priority; + send_update_queue (qh); +} + + + /** * Notify transport service that an MQ became unavailable due to a * disconnect or timeout.