From 3d9f1acfb62cd53fc7d1441eb33e1341c9ff3790 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 2 Jun 2019 21:58:37 +0200 Subject: [PATCH] generate and handle TRANSPORT_FLOW_CONTROL messages (TNG) --- src/include/gnunet_protocols.h | 5 + src/transport/gnunet-service-tng.c | 410 +++++++++++++++++++---------- 2 files changed, 272 insertions(+), 143 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index a00ddacca..d93e12bfb 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -3149,6 +3149,11 @@ extern "C" { */ #define GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING 1220 +/** + * Transport signalling incoming backchannel message to a communicator. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221 + /** * Message sent to indicate to the transport that a monitor diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 471ded644..7cc9f193c 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -28,10 +28,7 @@ * communicators do not offer flow control). * We do transmit FC window sizes now. Left: * for SENDING) - * - Throttle sending if "outbound_fc_window_size_used" reaches limit - * - Send *new* challenge when we get close to the limit (including - * at the beginning when the limit is zero!) - * - Retransmit challenge if it goes unanswered! + * - need to call consider_sending_fc() periodically if it goes unanswered! * * for DV) * - send challenges via DV (when DVH is confirmed *and* we care about @@ -78,10 +75,6 @@ * and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls] * - Set last_window_consum_limit promise properly based on * latency and bandwidth of the respective connection [GOODPUT / utilization / stalls] - * - re-sending challenge response without a challenge when we have - * significantly increased the FC window (upon CORE being done with messages) - * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] - * Also can re-use signature in this case [CPU]. Marked with "TODO-M1" * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -1325,6 +1318,13 @@ struct VirtualLink */ struct GNUNET_TIME_Absolute n_challenge_time; + /** + * When did we last send a + * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message? + * Used to determine whether it is time to re-transmit the message. + */ + struct GNUNET_TIME_Absolute last_fc_transmission; + /** * Sender timestamp of the last * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have @@ -1388,8 +1388,7 @@ struct VirtualLink /** * Our current flow control window size in bytes. We - * are allowed to transmit this many bytes to @a n as per - * our @e my_challenge "account". + * are allowed to transmit this many bytes to @a n. */ uint64_t outbound_fc_window_size; @@ -3966,138 +3965,6 @@ pick_random_dv_hops (const struct DistanceVector *dv, } -/** - * There is a message at the head of the pending messages for @a vl - * which may be ready for transmission. Check if a queue is ready to - * take it. - * - * This function must (1) check for flow control to ensure that we can - * right now send to @a vl, (2) check that the pending message in the - * queue is actually eligible, (3) determine if any applicable queue - * (direct neighbour or DVH path) is ready to accept messages, and - * (4) prioritize based on the preferences associated with the - * pending message. - * - * So yeah, easy. - * - * @param vl virtual link where we should check for transmission - */ -static void -check_vl_transmission (struct VirtualLink *vl) -{ - struct Neighbour *n = vl->n; - struct DistanceVector *dv = vl->dv; - struct GNUNET_TIME_Absolute now; - int elig; - - /* FIXME-FC: need to implement virtual link flow control! */ - - /* Check that we have an eligible pending message! - (cheaper than having #transmit_on_queue() find out!) */ - elig = GNUNET_NO; - for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; - pm = pm->next_vl) - { - if (NULL != pm->qe) - continue; /* not eligible, is in a queue! */ - elig = GNUNET_YES; - break; - } - if (GNUNET_NO == elig) - return; - - /* Notify queues at direct neighbours that we are interested */ - now = GNUNET_TIME_absolute_get (); - if (NULL != n) - { - for (struct Queue *queue = n->queue_head; NULL != queue; - queue = queue->next_neighbour) - if ((GNUNET_YES == queue->idle) && - (queue->validated_until.abs_value_us > now.abs_value_us)) - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); - } - /* Notify queues via DV that we are interested */ - if (NULL != dv) - { - /* Do DV with lower scheduler priority, which effectively means that - IF a neighbour exists and is available, we prefer it. */ - for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; - pos = pos->next_dv) - { - struct Neighbour *nh = pos->next_hop; - - if (pos->path_valid_until.abs_value_us <= now.abs_value_us) - continue; /* skip this one: path not validated */ - for (struct Queue *queue = nh->queue_head; NULL != queue; - queue = queue->next_neighbour) - if ((GNUNET_YES == queue->idle) && - (queue->validated_until.abs_value_us > now.abs_value_us)) - schedule_transmit_on_queue (queue, - GNUNET_SCHEDULER_PRIORITY_BACKGROUND); - } - } -} - - -/** - * Client asked for transmission to a peer. Process the request. - * - * @param cls the client - * @param obm the send message that was sent - */ -static void -handle_client_send (void *cls, const struct OutboundMessage *obm) -{ - struct TransportClient *tc = cls; - struct PendingMessage *pm; - const struct GNUNET_MessageHeader *obmm; - uint32_t bytes_msg; - struct VirtualLink *vl; - enum GNUNET_MQ_PriorityPreferences pp; - - GNUNET_assert (CT_CORE == tc->type); - obmm = (const struct GNUNET_MessageHeader *) &obm[1]; - bytes_msg = ntohs (obmm->size); - pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority); - vl = lookup_virtual_link (&obm->peer); - if (NULL == vl) - { - /* Failure: don't have this peer as a neighbour (anymore). - Might have gone down asynchronously, so this is NOT - a protocol violation by CORE. Still count the event, - as this should be rare. */ - GNUNET_SERVICE_client_continue (tc->client); - GNUNET_STATISTICS_update (GST_stats, - "# messages dropped (neighbour unknown)", - 1, - GNUNET_NO); - return; - } - - pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); - pm->logging_uuid = logging_uuid_gen++; - pm->prefs = pp; - pm->client = tc; - pm->vl = vl; - pm->bytes_msg = bytes_msg; - memcpy (&pm[1], obmm, bytes_msg); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes as <%llu> to %s\n", - bytes_msg, - pm->logging_uuid, - GNUNET_i2s (&obm->peer)); - GNUNET_CONTAINER_MDLL_insert (client, - tc->details.core.pending_msg_head, - tc->details.core.pending_msg_tail, - pm); - GNUNET_CONTAINER_MDLL_insert (vl, - vl->pending_msg_head, - vl->pending_msg_tail, - pm); - check_vl_transmission (vl); -} - - /** * Communicator started. Test message is well-formed. * @@ -4853,6 +4720,187 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, } +/** + * Something changed on the virtual link with respect to flow + * control. Consider retransmitting the FC window size. + * + * @param vl virtual link to work with + */ +static void +consider_sending_fc (struct VirtualLink *vl) +{ + struct GNUNET_TIME_Absolute monotime; + struct TransportFlowControlMessage fc; + struct GNUNET_TIME_Relative duration; + + duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission); + /* FIXME: decide sane criteria on when to do this, instead of doing + it always! */ + /* For example, we should probably ONLY do this if a bit more than + an RTT has passed, or if the window changed "significantly" since + then. */ + (void) duration; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending FC seq %u to %s with new window %llu\n", + (unsigned int) vl->fc_seq_gen, + GNUNET_i2s (&vl->target), + (unsigned long long) vl->incoming_fc_window_size); + monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg); + vl->last_fc_transmission = monotime; + fc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL); + fc.header.size = htons (sizeof (fc)); + fc.seq = htonl (vl->fc_seq_gen++); + fc.inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size); + fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used); + fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size); + fc.sender_time = GNUNET_TIME_absolute_hton (monotime); + route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE); +} + + +/** + * There is a message at the head of the pending messages for @a vl + * which may be ready for transmission. Check if a queue is ready to + * take it. + * + * This function must (1) check for flow control to ensure that we can + * right now send to @a vl, (2) check that the pending message in the + * queue is actually eligible, (3) determine if any applicable queue + * (direct neighbour or DVH path) is ready to accept messages, and + * (4) prioritize based on the preferences associated with the + * pending message. + * + * So yeah, easy. + * + * @param vl virtual link where we should check for transmission + */ +static void +check_vl_transmission (struct VirtualLink *vl) +{ + struct Neighbour *n = vl->n; + struct DistanceVector *dv = vl->dv; + struct GNUNET_TIME_Absolute now; + int elig; + + /* Check that we have an eligible pending message! + (cheaper than having #transmit_on_queue() find out!) */ + elig = GNUNET_NO; + for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; + pm = pm->next_vl) + { + if (NULL != pm->qe) + continue; /* not eligible, is in a queue! */ + if (pm->bytes_msg + vl->outbound_fc_window_size_used > + vl->outbound_fc_window_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Stalled transmision on VL %s due to flow control: %llu < %llu\n", + GNUNET_i2s (&vl->target), + (unsigned long long) vl->outbound_fc_window_size, + (unsigned long long) (pm->bytes_msg + + vl->outbound_fc_window_size_used)); + consider_sending_fc (vl); + return; /* We have a message, but flow control says "nope" */ + } + elig = GNUNET_YES; + break; + } + if (GNUNET_NO == elig) + return; + + /* Notify queues at direct neighbours that we are interested */ + now = GNUNET_TIME_absolute_get (); + if (NULL != n) + { + for (struct Queue *queue = n->queue_head; NULL != queue; + queue = queue->next_neighbour) + if ((GNUNET_YES == queue->idle) && + (queue->validated_until.abs_value_us > now.abs_value_us)) + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + } + /* Notify queues via DV that we are interested */ + if (NULL != dv) + { + /* Do DV with lower scheduler priority, which effectively means that + IF a neighbour exists and is available, we prefer it. */ + for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; + pos = pos->next_dv) + { + struct Neighbour *nh = pos->next_hop; + + if (pos->path_valid_until.abs_value_us <= now.abs_value_us) + continue; /* skip this one: path not validated */ + for (struct Queue *queue = nh->queue_head; NULL != queue; + queue = queue->next_neighbour) + if ((GNUNET_YES == queue->idle) && + (queue->validated_until.abs_value_us > now.abs_value_us)) + schedule_transmit_on_queue (queue, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND); + } + } +} + + +/** + * Client asked for transmission to a peer. Process the request. + * + * @param cls the client + * @param obm the send message that was sent + */ +static void +handle_client_send (void *cls, const struct OutboundMessage *obm) +{ + struct TransportClient *tc = cls; + struct PendingMessage *pm; + const struct GNUNET_MessageHeader *obmm; + uint32_t bytes_msg; + struct VirtualLink *vl; + enum GNUNET_MQ_PriorityPreferences pp; + + GNUNET_assert (CT_CORE == tc->type); + obmm = (const struct GNUNET_MessageHeader *) &obm[1]; + bytes_msg = ntohs (obmm->size); + pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority); + vl = lookup_virtual_link (&obm->peer); + if (NULL == vl) + { + /* Failure: don't have this peer as a neighbour (anymore). + Might have gone down asynchronously, so this is NOT + a protocol violation by CORE. Still count the event, + as this should be rare. */ + GNUNET_SERVICE_client_continue (tc->client); + GNUNET_STATISTICS_update (GST_stats, + "# messages dropped (neighbour unknown)", + 1, + GNUNET_NO); + return; + } + + pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); + pm->logging_uuid = logging_uuid_gen++; + pm->prefs = pp; + pm->client = tc; + pm->vl = vl; + pm->bytes_msg = bytes_msg; + memcpy (&pm[1], obmm, bytes_msg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending %u bytes as <%llu> to %s\n", + bytes_msg, + pm->logging_uuid, + GNUNET_i2s (&obm->peer)); + GNUNET_CONTAINER_MDLL_insert (client, + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); + GNUNET_CONTAINER_MDLL_insert (vl, + vl->pending_msg_head, + vl->pending_msg_tail, + pm); + check_vl_transmission (vl); +} + + /** * Communicator requests backchannel transmission. Process the request. * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *` @@ -5113,7 +5161,7 @@ core_env_sent_cb (void *cls) GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size); vl->incoming_fc_window_size_ram -= ctx->size; vl->incoming_fc_window_size_used += ctx->isize; - /* TODO-M1 */ + consider_sending_fc (vl); GNUNET_free (ctx); } @@ -6046,6 +6094,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) &vl->target, vl, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + consider_sending_fc (vl); /* We lacked a confirmed connection to the target before, so tell CORE about it (finally!) */ cores_send_connect_info (&dv->target); @@ -8031,6 +8080,7 @@ handle_validation_response ( &vl->target, vl, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + consider_sending_fc (vl); /* We lacked a confirmed connection to the target before, so tell CORE about it (finally!) */ cores_send_connect_info (&n->pid); @@ -8059,6 +8109,76 @@ handle_incoming_msg (void *cls, } +/** + * Communicator gave us a transport address validation response. Process the + * request. + * + * @param cls a `struct CommunicatorMessageContext` (must call + * #finish_cmc_handling() when done) + * @param fc the message that was received + */ +static void +handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) +{ + struct CommunicatorMessageContext *cmc = cls; + struct VirtualLink *vl; + uint32_t seq; + struct GNUNET_TIME_Absolute st; + uint64_t os; + uint64_t wnd; + + vl = lookup_virtual_link (&cmc->im.sender); + if (NULL == vl) + { + GNUNET_STATISTICS_update (GST_stats, + "# FC dropped: virtual link unknown", + 1, + GNUNET_NO); + finish_cmc_handling (cmc); + return; + } + st = GNUNET_TIME_absolute_ntoh (fc->sender_time); + if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us) + { + /* out of order, drop */ + GNUNET_STATISTICS_update (GST_stats, + "# FC dropped: message out of order", + 1, + GNUNET_NO); + finish_cmc_handling (cmc); + return; + } + seq = ntohl (fc->seq); + if (seq < vl->last_fc_seq) + { + /* Wrap-around/reset of other peer; start all counters from zero */ + vl->outbound_fc_window_size_used = 0; + } + vl->last_fc_seq = seq; + vl->last_fc_timestamp = st; + vl->outbound_fc_window_size = GNUNET_ntohll (fc->inbound_window_size); + os = GNUNET_ntohll (fc->outbound_sent); + vl->incoming_fc_window_size_loss = + (int64_t) (os - vl->incoming_fc_window_size_used); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received FC from %s, seq %u, new window %llu (loss at %lld)\n", + GNUNET_i2s (&vl->target), + (unsigned int) seq, + (unsigned long long) vl->outbound_fc_window_size, + (long long) vl->incoming_fc_window_size_loss); + wnd = GNUNET_ntohll (fc->outbound_window_size); + if (wnd < vl->incoming_fc_window_size) + { + /* Consider re-sending our FC message, as clearly the + other peer's idea of the window is not up-to-date */ + consider_sending_fc (vl); + } + /* FC window likely increased, check transmission possibilities! */ + check_vl_transmission (vl); + finish_cmc_handling (cmc); +} + + /** * Given an inbound message @a msg from a communicator @a cmc, * demultiplex it based on the type calling the right handler. @@ -8100,6 +8220,10 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE, struct TransportValidationChallengeMessage, &cmc), + GNUNET_MQ_hd_fixed_size (flow_control, + GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL, + struct TransportFlowControlMessage, + &cmc), GNUNET_MQ_hd_fixed_size ( validation_response, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE, -- 2.25.1