From a563820a60326473e74ac2431431e86e1963fd31 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 30 Apr 2019 10:58:56 +0200 Subject: [PATCH] complete CORE flow control loop --- src/transport/gnunet-service-tng.c | 234 +++++++++++++++++------------ 1 file changed, 135 insertions(+), 99 deletions(-) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 825d45522..a8f70986b 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -24,40 +24,31 @@ * * TODO: * Implement next: - * - complete flow control push back from CORE via TRANSPORT to communicators: - * + resume communicators in handle_client_recv_ok (see FIXME) - * + count transmissions to CORE and suspend communicators if window is full - * - check flow control push back from TRANSPROT to CORE: - * + check when to send ACKs - * - change transport-core API to provide proper flow control in both - * directions, allow multiple messages per peer simultaneously (tag - * confirmations with unique message ID), and replace quota-out with - * proper flow control; specify transmission preferences (latency, + * - add (more) logging + * - change transport-core API to specify transmission preferences (latency, * reliability, etc.) per message! - * - add logging - * - * Later: * - review retransmission logic, right now there is no smartness there! - * => congestion control, flow control, etc + * => congestion control, flow control, etc [PERFORMANCE-BASICS] * * Optimizations: * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do) - * => Need 128 bit hash map though! + * => Need 128 bit hash map though! [BANDWIDTH, MEMORY] * - queue_send_msg and route_message both by API design have to make copies * of the payload, and route_message on top of that requires a malloc/free. - * Change design to approximate "zero" copy better... + * Change design to approximate "zero" copy better... [CPU] * - could avoid copying body of message into each fragment and keep * fragments as just pointers into the original message and only * fully build fragments just before transmission (optimization, should - * reduce CPU and memory use) + * reduce CPU and memory use) [CPU, MEMORY] * - if messages are below MTU, consider adding ACKs and other stuff - * (requires planning at receiver, and additional MST-style demultiplex - * at receiver!) + * to the same transmission to avoid tiny messages (requires planning at + * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT] * - When we passively learned DV (with unconfirmed freshness), we * right now add the path to our list but with a zero path_valid_until * time and only use it for unconfirmed routes. However, we could consider * triggering an explicit validation mechansim ourselves, specifically routing - * a challenge-response message over the path (OPTIMIZATION-FIXME). + * a challenge-response message over the path [ROUTING] + * - Track ACK losses based on ACK-counter [ROUTING] * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -1115,6 +1106,44 @@ struct PendingMessage; */ struct DistanceVectorHop; + +/** + * Context from #handle_incoming_msg(). Closure for many + * message handlers below. + */ +struct CommunicatorMessageContext +{ + + /** + * Kept in a DLL of `struct VirtualLink` if waiting for CORE + * flow control to unchoke. + */ + struct CommunicatorMessageContext *next; + + /** + * Kept in a DLL of `struct VirtualLink` if waiting for CORE + * flow control to unchoke. + */ + struct CommunicatorMessageContext *prev; + + /** + * Which communicator provided us with the message. + */ + struct TransportClient *tc; + + /** + * Additional information for flow control and about the sender. + */ + struct GNUNET_TRANSPORT_IncomingMessage im; + + /** + * Number of hops the message has travelled (if DV-routed). + * FIXME: make use of this in ACK handling! + */ + uint16_t total_hops; +}; + + /** * A virtual link is another reachable peer that is known to CORE. It * can be either a `struct Neighbour` with at least one confirmed @@ -1130,6 +1159,18 @@ struct VirtualLink */ struct GNUNET_PeerIdentity target; + /** + * Communicators blocked for receiving on @e target as we are waiting + * on the @e core_recv_window to increase. + */ + struct CommunicatorMessageContext *cmc_head; + + /** + * Communicators blocked for receiving on @e target as we are waiting + * on the @e core_recv_window to increase. + */ + struct CommunicatorMessageContext *cmc_tail; + /** * Task scheduled to possibly notfiy core that this peer is no * longer counting as confirmed. Runs the #core_visibility_check(), @@ -1152,9 +1193,11 @@ struct VirtualLink * How many more messages can we send to core before we exhaust * the receive window of CORE for this peer? If this hits zero, * we must tell communicators to stop providing us more messages - * for this peer. + * for this peer. In fact, the window can go negative as we + * have multiple communicators, so per communicator we can go + * down by one into the negative range. */ - unsigned int core_recv_window; + int core_recv_window; }; @@ -3497,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm) /** - * Send a response to the @a pm that we have processed a - * "send" request with status @a success. We - * transmitted @a bytes_physical on the actual wire. - * Sends a confirmation to the "core" client responsible - * for the original request and free's @a pm. + * Send a response to the @a pm that we have processed a "send" + * request. Sends a confirmation to the "core" client responsible for + * the original request and free's @a pm. * * @param pm handle to the original pending message - * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR - * for transmission failure - * @param bytes_physical amount of bandwidth consumed */ static void -client_send_response (struct PendingMessage *pm, - int success, - uint32_t bytes_physical) +client_send_response (struct PendingMessage *pm) { struct TransportClient *tc = pm->client; struct Neighbour *target = pm->target; @@ -3523,10 +3559,7 @@ client_send_response (struct PendingMessage *pm, env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); som->peer = target->pid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Confirming %s transmission of %u/%u bytes to %s\n", - (GNUNET_OK == success) ? "successful" : "failed", - (unsigned int) pm->bytes_msg, - (unsigned int) bytes_physical, + "Confirming transmission to %s\n", GNUNET_i2s (&pm->target->pid)); GNUNET_MQ_send (tc->mq, env); } @@ -3826,6 +3859,31 @@ check_communicator_available ( } +/** + * Send ACK to communicator (if requested) and free @a cmc. + * + * @param cmc context for which we are done handling the message + */ +static void +finish_cmc_handling (struct CommunicatorMessageContext *cmc) +{ + if (0 != ntohl (cmc->im.fc_on)) + { + /* send ACK when done to communicator for flow control! */ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_IncomingMessageAck *ack; + + env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK); + ack->reserved = htonl (0); + ack->fc_id = cmc->im.fc_id; + ack->sender = cmc->im.sender; + GNUNET_MQ_send (cmc->tc->mq, env); + } + GNUNET_SERVICE_client_continue (cmc->tc->client); + GNUNET_free (cmc); +} + + /** * Client confirms that it is done handling message(s) to a particular * peer. We may now provide more messages to CORE for this peer. @@ -3841,6 +3899,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) struct TransportClient *tc = cls; struct VirtualLink *vl; uint32_t delta; + struct CommunicatorMessageContext *cmc; if (CT_CORE != tc->type) { @@ -3860,9 +3919,13 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) } delta = ntohl (rom->increase_window_delta); vl->core_recv_window += delta; - if (delta == vl->core_recv_window) + if (vl->core_recv_window <= 0) + return; + /* resume communicators */ + while (NULL != (cmc = vl->cmc_tail)) { - // FIXME: resume communicators! + GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc); + finish_cmc_handling (cmc); } } @@ -4683,30 +4746,6 @@ handle_del_address (void *cls, } -/** - * Context from #handle_incoming_msg(). Closure for many - * message handlers below. - */ -struct CommunicatorMessageContext -{ - /** - * Which communicator provided us with the message. - */ - struct TransportClient *tc; - - /** - * Additional information for flow control and about the sender. - */ - struct GNUNET_TRANSPORT_IncomingMessage im; - - /** - * Number of hops the message has travelled (if DV-routed). - * FIXME: make use of this in ACK handling! - */ - uint16_t total_hops; -}; - - /** * Given an inbound message @a msg from a communicator @a cmc, * demultiplex it based on the type calling the right handler. @@ -4719,31 +4758,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, const struct GNUNET_MessageHeader *msg); -/** - * Send ACK to communicator (if requested) and free @a cmc. - * - * @param cmc context for which we are done handling the message - */ -static void -finish_cmc_handling (struct CommunicatorMessageContext *cmc) -{ - if (0 != ntohl (cmc->im.fc_on)) - { - /* send ACK when done to communicator for flow control! */ - struct GNUNET_MQ_Envelope *env; - struct GNUNET_TRANSPORT_IncomingMessageAck *ack; - - env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK); - ack->reserved = htonl (0); - ack->fc_id = cmc->im.fc_id; - ack->sender = cmc->im.sender; - GNUNET_MQ_send (cmc->tc->mq, env); - } - GNUNET_SERVICE_client_continue (cmc->tc->client); - GNUNET_free (cmc); -} - - /** * Communicator gave us an unencapsulated message to pass as-is to * CORE. Process the request. @@ -4756,6 +4770,7 @@ static void handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) { struct CommunicatorMessageContext *cmc = cls; + struct VirtualLink *vl; uint16_t size = ntohs (mh->size); if ((size > UINT16_MAX - sizeof (struct InboundMessage)) || @@ -4768,6 +4783,25 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) GNUNET_SERVICE_client_drop (client); return; } + vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender); + if (NULL == vl) + { + /* FIXME: sender is giving us messages for CORE but we don't have + the link up yet! I *suspect* this can happen right now (i.e. + sender has verified us, but we didn't verify sender), but if + we pass this on, CORE would be confused (link down, messages + arrive). We should investigate more if this happens often, + or in a persistent manner, and possibly do "something" about + it. Thus logging as error for now. */ + GNUNET_break_op (0); + GNUNET_STATISTICS_update (GST_stats, + "# CORE messages droped (virtual link still down)", + 1, + GNUNET_NO); + + finish_cmc_handling (cmc); + return; + } /* Forward to all CORE clients */ for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { @@ -4781,11 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) memcpy (&im[1], mh, size); GNUNET_MQ_send (tc->mq, env); } - /* FIXME: consider doing this _only_ once the message - was drained from the CORE MQs to extend flow control to CORE! - (basically, increment counter in cmc, decrement on MQ send continuation! - */ - finish_cmc_handling (cmc); + vl->core_recv_window--; + if (vl->core_recv_window > 0) + { + finish_cmc_handling (cmc); + return; + } + /* Wait with calling #finish_cmc_handling(cmc) until the message + was processed by CORE MQs (for CORE flow control)! */ + GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc); } @@ -5345,7 +5383,8 @@ handle_reliability_ack (void *cls, } ack_counter = htonl (ra->ack_counter); - // FIXME: track ACK losses based on ack_counter somewhere! + (void) ack_counter; /* silence compiler warning for now */ + // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere! // (DV and/or Neighbour?) finish_cmc_handling (cmc); } @@ -7380,7 +7419,7 @@ reliability_box_message (struct Queue *queue, { /* failed hard */ GNUNET_break (0); - client_send_response (pm, GNUNET_NO, 0); + client_send_response (pm); return NULL; } pa = prepare_pending_acknowledgement (queue, dvh, pm); @@ -7531,7 +7570,7 @@ transmit_on_queue (void *cls) (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) { /* Full message sent, and over reliabile channel */ - client_send_response (pm, GNUNET_YES, pm->bytes_msg); + client_send_response (pm); } else if ((GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) && @@ -7556,10 +7595,7 @@ transmit_on_queue (void *cls) /* Was this the last applicable fragmment? */ if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg)) - client_send_response ( - pm, - GNUNET_YES, - pm->bytes_msg /* FIXME: calculate and add overheads! */); + client_send_response (pm); } else if (PMT_CORE != pm->pmt) { -- 2.25.1