From a325c3eaa8450d325fe57959eac29da5496cfd6d Mon Sep 17 00:00:00 2001 From: Martin Schanzenbach Date: Sat, 30 May 2020 17:45:38 +0200 Subject: [PATCH] towards UDP backchannels --- src/transport/gnunet-communicator-udp.c | 32 +++++++++++++++++++--- src/transport/test_communicator_basic.c | 35 ++++++++++++++++++++++--- src/transport/transport-testing2.c | 11 ++++++-- src/transport/transport-testing2.h | 1 + 4 files changed, 70 insertions(+), 9 deletions(-) diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c index 72e84567a..344ba5180 100644 --- a/src/transport/gnunet-communicator-udp.c +++ b/src/transport/gnunet-communicator-udp.c @@ -1030,6 +1030,8 @@ check_timeouts (void *cls) rt = GNUNET_TIME_absolute_get_remaining (receiver->timeout); if (0 != rt.rel_value_us) break; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Receiver timed out\n"); receiver_destroy (receiver); } st = GNUNET_TIME_UNIT_FOREVER_REL; @@ -1257,23 +1259,35 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) { const struct UDPAck *ack = cls; struct ReceiverAddress *receiver = value; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "in handle ack\n"); (void) pid; for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Checking shared secrets\n"); if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) { uint32_t allowed; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found matching mac\n"); allowed = ntohl (ack->sequence_max); if (allowed > ss->sequence_allowed) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%u > %u (%u)\n", allowed, ss->sequence_allowed, + receiver->acks_available); + receiver->acks_available += (allowed - ss->sequence_allowed); if ((allowed - ss->sequence_allowed) == receiver->acks_available) { /* we just incremented from zero => MTU change! */ - setup_receiver_mq (receiver); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "we just incremented from zero => MTU change!\n"); + //TODO setup_receiver_mq (receiver); } ss->sequence_allowed = allowed; /* move ss to head to avoid discarding it anytime soon! */ @@ -1361,6 +1375,9 @@ consider_ss_ack (struct SharedSecret *ss) ack.header.size = htons (sizeof(ack)); ack.sequence_max = htonl (ss->sequence_allowed); ack.cmac = ss->cmac; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying transport of UDPAck %s\n", + GNUNET_i2s_full (&ss->sender->target)); GNUNET_TRANSPORT_communicator_notify (ch, &ss->sender->target, COMMUNICATOR_ADDRESS_PREFIX, @@ -2031,11 +2048,12 @@ static void mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct ReceiverAddress *receiver = impl_state; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "MQ destroyed\n"); if (mq == receiver->mq) { receiver->mq = NULL; - receiver_destroy (receiver); + //receiver_destroy (receiver); } } @@ -2093,7 +2111,7 @@ setup_receiver_mq (struct ReceiverAddress *receiver) GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); receiver->qh = NULL; } - GNUNET_assert (NULL == receiver->mq); + //GNUNET_assert (NULL == receiver->mq); switch (receiver->address->sa_family) { case AF_INET: @@ -2190,6 +2208,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address) &receiver->target, receiver, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Added %s to receivers\n", + GNUNET_i2s_full (&receiver->target)); receiver->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); receiver->hn = GNUNET_CONTAINER_heap_insert (receivers_heap, @@ -2336,6 +2357,9 @@ enc_notify_cb (void *cls, const struct UDPAck *ack; (void) cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing UDPAck received from backchannel from %s\n", + GNUNET_i2s_full (sender)); if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_ACK) || (ntohs (msg->size) != sizeof(struct UDPAck))) { diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index e99db7cfb..1dfcf2371 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c @@ -70,9 +70,9 @@ static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc; #define LONG_MESSAGE_SIZE 32000 -#define BURST_PACKETS 5000 +#define BURST_PACKETS 50 -#define TOTAL_ITERATIONS 5 +#define TOTAL_ITERATIONS 1 #define PEER_A 0 @@ -288,6 +288,34 @@ short_test (void *cls) timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); } +static int test_prepared = GNUNET_NO; + +/** + * This helps establishing the backchannel + */ +static void +prepare_test (void *cls) +{ + char *payload; + + if (GNUNET_YES == test_prepared) + { + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &short_test, + NULL); + return; + } + test_prepared = GNUNET_YES; + payload = make_payload (SHORT_MESSAGE_SIZE); + GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, + &prepare_test, + NULL, + payload, + SHORT_MESSAGE_SIZE); + GNUNET_free (payload); +} + + /** * @brief Handle opening of queue @@ -318,7 +346,7 @@ add_queue_cb (void *cls, to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &latency_timeout, NULL); - short_test (NULL); + prepare_test (NULL); } @@ -501,6 +529,7 @@ run (void *cls) "transport", communicator_binary, cfg_peers_name[i], + &peer_id[i], &communicator_available_cb, &add_address_cb, &queue_create_reply_cb, diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index fe2f28f54..fc6d13590 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -84,6 +84,8 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle */ char *cfg_filename; + struct GNUNET_PeerIdentity peer_id; + /** * @brief Handle to the transport service */ @@ -368,7 +370,8 @@ handle_communicator_backchannel (void *cls, struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; struct GNUNET_MQ_Envelope *env; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received backchannel message\n"); if (tc_h->bc_enabled != GNUNET_YES) { GNUNET_SERVICE_client_continue (client->client); @@ -386,7 +389,7 @@ handle_communicator_backchannel (void *cls, cbi, isize, GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING); - cbi->pid = bc_msg->pid; + cbi->pid = tc_h->peer_id; memcpy (&cbi[1], msg, isize); @@ -934,6 +937,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( const char *service_name, const char *binary_name, const char *cfg_filename, + const struct GNUNET_PeerIdentity *peer_id, GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb, GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb, @@ -971,6 +975,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( tc_h->add_queue_cb = add_queue_cb; tc_h->incoming_msg_cb = incoming_message_cb; tc_h->bc_cb = bc_cb; + tc_h->peer_id = *peer_id; tc_h->cb_cls = cb_cls; /* Start communicator part of service */ @@ -1069,6 +1074,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send struct GNUNET_MQ_Envelope *env; size_t inbox_size; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message\n"); inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; env = GNUNET_MQ_msg_extra (msg, inbox_size, diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index 96a08a193..7a449f081 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h @@ -171,6 +171,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( const char *service_name, const char *binary_name, const char *cfg_filename, + const struct GNUNET_PeerIdentity *peer_id, GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb, GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb, -- 2.25.1