From 0c5090e7d76fd7c85b92f0c4f918cf5420bd68f8 Mon Sep 17 00:00:00 2001 From: "Schanzenbach, Martin" Date: Wed, 1 Jan 2020 13:35:28 +0900 Subject: [PATCH] fix backchannel --- src/transport/gnunet-communicator-udp.c | 27 +++++++------- src/transport/test_communicator_basic.c | 37 +++++++++++++++----- src/transport/transport-testing2.c | 20 +++++++++-- src/transport/transport-testing2.h | 16 ++++++++- src/transport/transport_api2_communication.c | 1 - 5 files changed, 75 insertions(+), 26 deletions(-) diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c index 5abf42588..e931bd2e7 100644 --- a/src/transport/gnunet-communicator-udp.c +++ b/src/transport/gnunet-communicator-udp.c @@ -1094,21 +1094,21 @@ pass_plaintext_to_core (struct SenderAddress *sender, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Giving %u bytes to TNG\n", ntohs (hdr->size)); GNUNET_assert (GNUNET_SYSERR != - GNUNET_TRANSPORT_communicator_receive (ch, - &sender->target, - hdr, - ADDRESS_VALIDITY_PERIOD, - NULL /* no flow control possible */ - , - NULL)); + GNUNET_TRANSPORT_communicator_receive (ch, + &sender->target, + hdr, + ADDRESS_VALIDITY_PERIOD, + NULL /* no flow control possible */ + , + NULL)); /* move on to next message, if any */ plaintext_len -= ntohs (hdr->size); if (plaintext_len < sizeof(*hdr)) break; pos += ntohs (hdr->size); - hdr = (const struct GNUNET_MessageHeader *)pos; - //TODO for now..., we do not actually sen >1msg or have a way of telling - //if we are done + hdr = (const struct GNUNET_MessageHeader *) pos; + // TODO for now..., we do not actually sen >1msg or have a way of telling + // if we are done break; } GNUNET_STATISTICS_update (stats, @@ -1958,7 +1958,8 @@ mq_send (struct GNUNET_MQ_Handle *mq, receiver->address_len)) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending KX to %s\n", GNUNET_a2s (receiver->address, receiver->address_len)); + "Sending KX to %s\n", GNUNET_a2s (receiver->address, + receiver->address_len)); GNUNET_MQ_impl_send_continue (mq); return; } /* End of KX encryption method */ @@ -1966,6 +1967,8 @@ mq_send (struct GNUNET_MQ_Handle *mq, /* begin "BOX" encryption method, scan for ACKs from tail! */ for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "In non-kx mode...\n"); if (ss->sequence_used < ss->sequence_allowed) { char dgram[sizeof(struct UDPBox) + receiver->mtu]; @@ -1994,7 +1997,7 @@ mq_send (struct GNUNET_MQ_Handle *mq, receiver->address_len)) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending data\n"); + "Sending data\n"); GNUNET_MQ_impl_send_continue (mq); receiver->acks_available--; diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index 4699b8dd1..c469a55a1 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c @@ -76,6 +76,10 @@ static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc; #define TOTAL_ITERATIONS 5 +#define PEER_A 0 + +#define PEER_B 1 + static unsigned int iterations_left = TOTAL_ITERATIONS; #define SHORT_BURST_WINDOW \ @@ -136,14 +140,12 @@ add_address_cb (void *cls, aid, nt); // addresses[1] = GNUNET_strdup (address); - if ((0 == strcmp ((char*) cls, cfg_peers_name[NUM_PEERS - 1])) && + if ((0 == strcmp ((char*) cls, cfg_peers_name[PEER_B])) && (GNUNET_NO == queue_est)) { queue_est = GNUNET_YES; - GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (tc_hs[0], - &peer_id[ - NUM_PEERS - - 1], + GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (tc_hs[PEER_A], + &peer_id[PEER_B], address); } } @@ -176,6 +178,20 @@ queue_create_reply_cb (void *cls, } +static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle * +handle_backchannel_cb (void *cls, + struct GNUNET_MessageHeader *msg, + struct GNUNET_PeerIdentity *pid) +{ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Handling BC message...\n"); + if (0 == memcmp (&peer_id[PEER_A], pid, sizeof (*pid))) + return tc_hs[PEER_A]; + else + return tc_hs[PEER_B]; +} + + static char* make_payload (size_t payload_size) { @@ -366,8 +382,9 @@ incoming_message_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Short size packet test done.\n"); char *goodput = GNUNET_STRINGS_byte_size_fancy ((SHORT_MESSAGE_SIZE - * num_received * 1000 * 1000) - / duration.rel_value_us); + * num_received * 1000 + * 1000) + / duration.rel_value_us); GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "%lu/%lu packets in %llu us (%s/s) -- avg latency: %llu us\n", (unsigned long) num_received, @@ -402,8 +419,9 @@ incoming_message_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Long size packet test done.\n"); char *goodput = GNUNET_STRINGS_byte_size_fancy ((LONG_MESSAGE_SIZE - * num_received * 1000 * 1000) - / duration.rel_value_us); + * num_received * 1000 + * 1000) + / duration.rel_value_us); GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "%lu/%lu packets in %llu us (%s/s) -- avg latency: %llu us\n", @@ -496,6 +514,7 @@ run (void *cls) &queue_create_reply_cb, &add_queue_cb, &incoming_message_cb, + &handle_backchannel_cb, cfg_peers_name[i]); /* cls */ } GNUNET_SCHEDULER_add_shutdown (&do_shutdown, diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index 893579bc5..75864294b 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -141,6 +141,11 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle */ GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_msg_cb; + /** + * @brief Backchannel callback + */ + GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb; + /** * Our service handle */ @@ -323,12 +328,15 @@ handle_communicator_backchannel (void *cls, bc_msg) { struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *other_tc_h; struct GNUNET_MessageHeader *msg; msg = (struct GNUNET_MessageHeader *) &bc_msg[1]; - struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; - struct GNUNET_MQ_Envelope *env; uint16_t isize = ntohs (msg->size); const char *target_communicator = ((const char *) msg) + isize; + struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; + struct GNUNET_MQ_Envelope *env; + + if (tc_h->bc_enabled != GNUNET_YES) { GNUNET_SERVICE_client_continue (tc_h->client); @@ -340,13 +348,17 @@ handle_communicator_backchannel (void *cls, "Delivering backchannel message of type %u to %s\n", ntohs (msg->type), target_communicator); + other_tc_h = tc_h->bc_cb (tc_h, msg, (struct + GNUNET_PeerIdentity*) &bc_msg->pid); env = GNUNET_MQ_msg_extra ( cbi, isize, GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING); cbi->pid = bc_msg->pid; memcpy (&cbi[1], msg, isize); - GNUNET_MQ_send (tc_h->c_mq, env); + + + GNUNET_MQ_send (other_tc_h->c_mq, env); GNUNET_SERVICE_client_continue (tc_h->client); } @@ -879,6 +891,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb, GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb, GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_message_cb, + GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb, void *cb_cls) { struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h; @@ -905,6 +918,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( tc_h->queue_create_reply_cb = queue_create_reply_cb; tc_h->add_queue_cb = add_queue_cb; tc_h->incoming_msg_cb = incoming_message_cb; + tc_h->bc_cb = bc_cb; tc_h->cb_cls = cb_cls; /* Start communicator part of service */ diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index 4e047828e..e7602e3e2 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h @@ -47,6 +47,19 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue; */ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission; +/** + * @brief Function signature for callbacks that are called when new + * backchannel message arrived + * + * @param cls Closure + * @param msg Backchannel message + * @param pid Target peer + */ +typedef struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle * +(*GNUNET_TRANSPORT_TESTING_BackchannelCallback)(void *cls, + struct GNUNET_MessageHeader *msg, + struct GNUNET_PeerIdentity *pid); + /** * @brief Function signature for callbacks that are called when new @@ -134,7 +147,7 @@ typedef void struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, - const char* payload, + const char*payload, size_t payload_len); @@ -164,6 +177,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb, GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb, GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_message_cb, + GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb, void *cb_cls); diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index 01a2447fa..e80cd5c03 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -694,7 +694,6 @@ handle_backchannel_incoming ( const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) { struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; - if (NULL != ch->notify_cb) ch->notify_cb (ch->notify_cb_cls, &bi->pid, -- 2.25.1