From 5c0d8339072c343e7f5d5578acecdd99d0fe842b Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 9 Jun 2019 18:38:21 +0200 Subject: [PATCH] add FC retransmission logic --- src/transport/gnunet-service-tng.c | 138 +++++++++++++++++++++++------ 1 file changed, 113 insertions(+), 25 deletions(-) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 3e4d750a2..bb477fc1e 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -26,9 +26,7 @@ * Implement next: * - FIXME-FC: realize transport-to-transport flow control (needed in case * communicators do not offer flow control). - * We do transmit FC window sizes now. Left: - * for SENDING) - * - need to call consider_sending_fc() periodically if it goes unanswered! + * We do transmit FC window sizes now. * * for DV) * - send challenges via DV (when DVH is confirmed *and* we care about @@ -117,6 +115,20 @@ */ #define MAX_CUMMULATIVE_ACKS 64 +/** + * What is the 1:n chance that we send a Flow control response when + * receiving a flow control message that did not change anything for + * us? Basically, this is used in the case where both peers are stuck + * on flow control (no window changes), but one might continue sending + * flow control messages to the other peer as the first FC message + * when things stalled got lost, and then subsequently the other peer + * does *usually* not respond as nothing changed. So to ensure that + * eventually the FC messages stop, we do send with 1/8th probability + * an FC message even if nothing changed. That prevents one peer + * being stuck in sending (useless) FC messages "forever". + */ +#define FC_NO_CHANGE_REPLY_PROBABILITY 8 + /** * What is the size we assume for a read operation in the * absence of an MTU for the purpose of flow control? @@ -1289,6 +1301,12 @@ struct VirtualLink */ struct GNUNET_SCHEDULER_Task *visibility_task; + /** + * Task scheduled to periodically retransmit FC messages (in + * case one got lost). + */ + struct GNUNET_SCHEDULER_Task *fc_retransmit_task; + /** * Neighbour used by this virtual link, NULL if @e dv is used. */ @@ -1334,6 +1352,12 @@ struct VirtualLink */ struct GNUNET_TIME_Absolute last_fc_timestamp; + /** + * Expected RTT from the last FC transmission. (Zero if the last + * attempt failed, but could theoretically be zero even on success.) + */ + struct GNUNET_TIME_Relative last_fc_rtt; + /** * Used to generate unique UUIDs for messages that are being * fragmented. @@ -1399,6 +1423,17 @@ struct VirtualLink */ uint64_t outbound_fc_window_size_used; + /** + * What is the most recent FC window the other peer sent us + * in `outbound_window_size`? This is basically the window + * size value the other peer has definitively received from + * us. If it matches @e incoming_fc_window_size, we should + * not send a FC message to increase the FC window. However, + * we may still send an FC message to notify the other peer + * that we received the other peer's FC message. + */ + uint64_t last_outbound_window_size_received; + /** * Generator for the sequence numbers of * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL messages we send. @@ -2987,6 +3022,11 @@ free_virtual_link (struct VirtualLink *vl) GNUNET_SCHEDULER_cancel (vl->visibility_task); vl->visibility_task = NULL; } + if (NULL != vl->fc_retransmit_task) + { + GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); + vl->fc_retransmit_task = NULL; + } while (NULL != (csc = vl->csc_head)) { GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc); @@ -4237,8 +4277,9 @@ queue_send_msg (struct Queue *queue, * @param hdr message to send as payload * @param options whether queues must be confirmed or not, * and whether we may pick multiple (2) queues + * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed */ -static void +static struct GNUNET_TIME_Relative route_via_neighbour (const struct Neighbour *n, const struct GNUNET_MessageHeader *hdr, enum RouteMessageOptions options) @@ -4247,6 +4288,7 @@ route_via_neighbour (const struct Neighbour *n, unsigned int candidates; unsigned int sel1; unsigned int sel2; + struct GNUNET_TIME_Relative rtt; /* Pick one or two 'random' queues from n (under constraints of options) */ now = GNUNET_TIME_absolute_get (); @@ -4274,9 +4316,10 @@ route_via_neighbour (const struct Neighbour *n, "# route selection failed (all no valid queue)", 1, GNUNET_NO); - return; + return GNUNET_TIME_UNIT_FOREVER_REL; } + rtt = GNUNET_TIME_UNIT_FOREVER_REL; sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); if (0 == (options & RMO_REDUNDANT)) sel2 = candidates; /* picks none! */ @@ -4297,11 +4340,13 @@ route_via_neighbour (const struct Neighbour *n, GNUNET_i2s (&n->pid), pos->address, (sel1 == candidates) ? 1 : 2); + rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt); queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); } candidates++; } } + return rtt; } @@ -4523,8 +4568,9 @@ typedef void (*DVMessageHandler) (void *cls, * @param use function to call with the encapsulated message * @param use_cls closure for @a use * @param options whether path must be confirmed or not, to be passed to @a use + * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed */ -static void +static struct GNUNET_TIME_Relative encapsulate_for_dv (struct DistanceVector *dv, unsigned int num_dvhs, struct DistanceVectorHop **dvhs, @@ -4540,6 +4586,7 @@ encapsulate_for_dv (struct DistanceVector *dv, struct TransportDVBoxPayloadP *enc_payload_hdr = (struct TransportDVBoxPayloadP *) enc; struct DVKeyState key; + struct GNUNET_TIME_Relative rtt; /* Encrypt payload */ box_hdr.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX); @@ -4560,7 +4607,7 @@ encapsulate_for_dv (struct DistanceVector *dv, enc_body_size); dv_hmac (&key, &box_hdr.hmac, enc, sizeof (enc)); dv_key_clean (&key); - + rtt = GNUNET_TIME_UNIT_FOREVER_REL; /* For each selected path, take the pre-computed header and body and add the path in the middle of the message; then send it. */ for (unsigned int i = 0; i < num_dvhs; i++) @@ -4603,13 +4650,14 @@ encapsulate_for_dv (struct DistanceVector *dv, path); GNUNET_free (path); } - + rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt); memcpy (&dhops[num_hops], enc, sizeof (enc)); use (use_cls, dvh->next_hop, (const struct GNUNET_MessageHeader *) buf, options); } + return rtt; } @@ -4629,7 +4677,7 @@ send_dv_to_neighbour (void *cls, enum RouteMessageOptions options) { (void) cls; - route_via_neighbour (next_hop, hdr, options); + (void) route_via_neighbour (next_hop, hdr, options); } @@ -4642,8 +4690,9 @@ send_dv_to_neighbour (void *cls, * @param target peer to receive @a hdr * @param hdr header of the message to route and #GNUNET_free() * @param options which transmission channels are allowed + * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed */ -static void +static struct GNUNET_TIME_Relative route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, const struct GNUNET_MessageHeader *hdr, enum RouteMessageOptions options) @@ -4651,6 +4700,8 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, struct VirtualLink *vl; struct Neighbour *n; struct DistanceVector *dv; + struct GNUNET_TIME_Relative rtt1; + struct GNUNET_TIME_Relative rtt2; vl = lookup_virtual_link (target); GNUNET_assert (NULL != vl); @@ -4675,7 +4726,7 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, "# Messages dropped in routing: no acceptable method", 1, GNUNET_NO); - return; + return GNUNET_TIME_UNIT_FOREVER_REL; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing message of type %u to %s with options %X\n", @@ -4694,9 +4745,11 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, if ((NULL != n) && (NULL != dv)) options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's enough for redunancy, so clear the flag. */ + rtt1 = GNUNET_TIME_UNIT_FOREVER_REL; + rtt2 = GNUNET_TIME_UNIT_FOREVER_REL; if (NULL != n) { - route_via_neighbour (n, hdr, options); + rtt1 = route_via_neighbour (n, hdr, options); } if (NULL != dv) { @@ -4711,16 +4764,17 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to route message, could not determine DV path\n"); - return; + return rtt1; } - encapsulate_for_dv (dv, - res, - hops, - hdr, - &send_dv_to_neighbour, - NULL, - options & (~RMO_REDUNDANT)); + rtt2 = encapsulate_for_dv (dv, + res, + hops, + hdr, + &send_dv_to_neighbour, + NULL, + options & (~RMO_REDUNDANT)); } + return GNUNET_TIME_relative_min (rtt1, rtt2); } @@ -4728,21 +4782,23 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, * Something changed on the virtual link with respect to flow * control. Consider retransmitting the FC window size. * - * @param vl virtual link to work with + * @param cls a `struct VirtualLink` to work with */ static void -consider_sending_fc (struct VirtualLink *vl) +consider_sending_fc (void *cls) { + struct VirtualLink *vl = cls; struct GNUNET_TIME_Absolute monotime; struct TransportFlowControlMessage fc; struct GNUNET_TIME_Relative duration; + struct GNUNET_TIME_Relative rtt; duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission); /* FIXME: decide sane criteria on when to do this, instead of doing it always! */ /* For example, we should probably ONLY do this if a bit more than an RTT has passed, or if the window changed "significantly" since - then. */ + then. See vl->last_fc_rtt! */ (void) duration; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -4759,7 +4815,24 @@ consider_sending_fc (struct VirtualLink *vl) fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used); fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size); fc.sender_time = GNUNET_TIME_absolute_hton (monotime); - route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE); + rtt = route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE); + if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us) + { + rtt = GNUNET_TIME_UNIT_SECONDS; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "FC retransmission to %s failed, will retry in %s\n", + GNUNET_i2s (&vl->target), + GNUNET_STRINGS_relative_time_to_string (rtt, GNUNET_YES)); + vl->last_fc_rtt = GNUNET_TIME_UNIT_ZERO; + } + else + { + vl->last_fc_rtt = rtt; + } + if (NULL != vl->fc_retransmit_task) + GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); + vl->fc_retransmit_task = + GNUNET_SCHEDULER_add_delayed (rtt, &consider_sending_fc, vl); } @@ -8181,12 +8254,27 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) (unsigned long long) vl->outbound_fc_window_size, (long long) vl->incoming_fc_window_size_loss); wnd = GNUNET_ntohll (fc->outbound_window_size); - if (wnd < vl->incoming_fc_window_size) + if ((wnd < vl->incoming_fc_window_size) || + (vl->last_outbound_window_size_received != wnd) || + (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX) % + FC_NO_CHANGE_REPLY_PROBABILITY)) { /* Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date */ consider_sending_fc (vl); } + if ((wnd == vl->incoming_fc_window_size) && + (vl->last_outbound_window_size_received == wnd) && + (NULL != vl->fc_retransmit_task)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Stopping FC retransmission to %s: peer is current at window %llu\n", + GNUNET_i2s (&vl->target), + (unsigned long long) wnd); + GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); + vl->fc_retransmit_task = NULL; + } + vl->last_outbound_window_size_received = wnd; /* FC window likely increased, check transmission possibilities! */ check_vl_transmission (vl); finish_cmc_handling (cmc); -- 2.25.1