From adf28e389d663529be51e41a96a867fe58f251c0 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 26 Jan 2017 21:01:23 +0100 Subject: [PATCH] use 'futures' bitfield in ACKs properly, revisit unbuffered/out-of-order transmission --- src/cadet/cadet_protocol.h | 12 +- src/cadet/gnunet-service-cadet-new_channel.c | 215 +++++++++++------- .../gnunet-service-cadet-new_connection.c | 3 +- src/cadet/gnunet-service-cadet-new_paths.c | 2 +- src/cadet/gnunet-service-cadet-new_peer.c | 1 + src/cadet/gnunet-service-cadet-new_tunnels.c | 18 +- 6 files changed, 154 insertions(+), 97 deletions(-) diff --git a/src/cadet/cadet_protocol.h b/src/cadet/cadet_protocol.h index 5ec34f7d7..8fb260dfd 100644 --- a/src/cadet/cadet_protocol.h +++ b/src/cadet/cadet_protocol.h @@ -462,7 +462,7 @@ struct GNUNET_CADET_ChannelDataAckMessage struct GNUNET_CADET_ChannelTunnelNumber ctn; /** - * Bitfield of already-received newer messages + * Bitfield of already-received messages past @e mid. * pid + 1 @ LSB * pid + 64 @ MSB */ @@ -532,14 +532,16 @@ struct GNUNET_CADET_ChannelDataAckMessage struct GNUNET_CADET_ChannelTunnelNumber ctn; /** - * Bitfield of already-received messages past @e mid. - * pid + 1 @ LSB - * pid + 64 @ MSB + * Bitfield of already-received newer messages. Note that bit 0 + * corresponds to @e mid + 1. + * + * pid + 0 @ LSB + * pid + 63 @ MSB */ uint64_t futures GNUNET_PACKED; /** - * Last message ID received. + * Next message ID expected. */ struct ChannelMessageIdentifier mid; }; diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index a923f19dc..f5e310cfc 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -25,21 +25,16 @@ * @author Christian Grothoff * * TODO: - * - Optimize ACKs by using 'mid_futures' properly! - * - calculate current RTT if possible, use that for initial retransmissions - * (NOTE: needs us to learn which connection the tunnel uses for the message!) - * - introduce shutdown so we can have half-closed channels, modify - * destroy to include MID to have FIN-ACK equivalents, etc. - * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! - * (and figure out how/where to use this!) - * - check that '0xFFULL' really is sufficient for flow control! - * (this is right now a big HACK!) - * - revisit handling of 'unreliable' traffic! - * (has not seen enough review) + * - Congestion/flow control: + * + calculate current RTT if possible, use that for initial retransmissions + * (NOTE: needs us to learn which connection the tunnel uses for the message!) + * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL! + * (and figure out how/where to use this!) + * + figure out flow control without ACKs (unreliable traffic!) * - revisit handling of 'unbuffered' traffic! - * (has not seen enough review) - * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'. - * - figure out flow control without ACKs (unreliable traffic!) + * (need to push down through tunnel into connection selection) + * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe + * reserve more bits in 'options' to allow for buffer size control? */ #include "platform.h" #include "gnunet_util_lib.h" @@ -215,6 +210,11 @@ struct CadetChannelClient */ struct GNUNET_CADET_ClientChannelNumber ccn; + /** + * Number of entries currently in @a head_recv DLL. + */ + unsigned int num_recv; + /** * Can we send data to the client? */ @@ -295,8 +295,6 @@ struct CadetChannel /** * Bitfield of already-received messages past @e mid_recv. - * - * FIXME: not yet properly used (bits here are never set!) */ uint64_t mid_futures; @@ -331,6 +329,12 @@ struct CadetChannel */ enum CadetChannelState state; + /** + * Count how many ACKs we skipped, used to prevent long + * sequences of ACK skipping. + */ + unsigned int skip_ack_series; + /** * Is the tunnel bufferless (minimum latency)? */ @@ -416,6 +420,7 @@ free_channel_client (struct CadetChannelClient *ccc) GNUNET_CONTAINER_DLL_remove (ccc->head_recv, ccc->tail_recv, com); + ccc->num_recv--; GNUNET_MQ_discard (com->env); GNUNET_free (com); } @@ -772,10 +777,12 @@ send_channel_data_ack (struct CadetChannel *ch) { struct GNUNET_CADET_ChannelDataAckMessage msg; + if (GNUNET_NO == ch->reliable) + return; /* no ACKs */ msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); msg.header.size = htons (sizeof (msg)); msg.ctn = ch->ctn; - msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1); + msg.mid.mid = htonl (ntohl (ch->mid_recv.mid)); msg.futures = GNUNET_htonll (ch->mid_futures); if (NULL != ch->last_control_qe) GCT_send_cancel (ch->last_control_qe); @@ -1128,6 +1135,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, uint32_t mid_min; uint32_t mid_max; uint32_t mid_msg; + uint32_t delta; GNUNET_assert (GNUNET_NO == ch->is_loopback); if ( (GNUNET_YES == ch->destroy) && @@ -1140,8 +1148,9 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping incoming payload on %s as this end is already closed\n", GCCH_2s (ch)); - /* FIXME: send back ACK/NACK/Closed notification - to stop retransmissions! */ + /* send back DESTROY notification to stop further retransmissions! */ + GCT_send_channel_destroy (ch->t, + ch->ctn); return; } payload_size = ntohs (msg->header.size) - sizeof (*msg); @@ -1168,31 +1177,80 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, env); ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); ch->mid_futures >>= 1; - if (GNUNET_YES == ch->reliable) - send_channel_data_ack (ch); + send_channel_data_ack (ch); return; } - /* check if message ought to be dropped because it is anicent/too distant/duplicate */ - mid_min = ntohl (ch->mid_recv.mid); - mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE; - mid_msg = ntohl (msg->mid.mid); - if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) || - ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) ) + if (GNUNET_YES == ch->reliable) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n", - (unsigned int) payload_size, - GCCH_2s (ch), - ntohl (msg->mid.mid)); - GNUNET_STATISTICS_update (stats, - "# duplicate DATA (ancient or future)", - 1, - GNUNET_NO); - GNUNET_MQ_discard (env); - if (GNUNET_YES == ch->reliable) + /* check if message ought to be dropped because it is ancient/too distant/duplicate */ + mid_min = ntohl (ch->mid_recv.mid); + mid_max = mid_min + ch->max_pending_messages; + mid_msg = ntohl (msg->mid.mid); + if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) || + ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s at %u drops ancient or far-future message %u\n", + GCCH_2s (ch), + (unsigned int) mid_min, + ntohl (msg->mid.mid)); + + GNUNET_STATISTICS_update (stats, + "# duplicate DATA (ancient or future)", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); send_channel_data_ack (ch); - return; + return; + } + /* mark bit for future ACKs */ + delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */ + if (delta < 64) + { + if (0 != (ch->mid_futures & (1LLU << delta))) + { + /* Duplicate within the queue, drop also */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate payload of %u bytes on %s (mid %u) dropped\n", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (msg->mid.mid)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + send_channel_data_ack (ch); + return; + } + ch->mid_futures |= (1LLU << delta); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Marked bit %llX for mid %u (base: %u); now: %llX\n", + (1LLU << delta), + mid_msg, + mid_min, + ch->mid_futures); + } + } + else /* ! ch->reliable */ + { + /* Channel is unreliable, so we do not ACK. But we also cannot + allow buffering everything, so check if we have space... */ + if (ccc->num_recv >= ch->max_pending_messages) + { + struct CadetOutOfOrderMessage *drop; + + /* Yep, need to drop. Drop the oldest message in + the buffer. */ + drop = ccc->head_recv; + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + drop); + ccc->num_recv--; + GNUNET_MQ_discard (drop->env); + GNUNET_free (drop); + } } /* Insert message into sorted out-of-order queue */ @@ -1206,9 +1264,14 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, ccc->head_recv, ccc->tail_recv, com); + ccc->num_recv++; if (GNUNET_YES == duplicate) { - /* Duplicate within the queue, drop also */ + /* Duplicate within the queue, drop also (this is not covered by + the case above if "delta" >= 64, which could be the case if + max_pending_messages is also >= 64 or if our client is unready + and we are seeing retransmissions of the message our client is + blocked on. */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Duplicate payload of %u bytes on %s (mid %u) dropped\n", (unsigned int) payload_size, @@ -1221,10 +1284,10 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, GNUNET_CONTAINER_DLL_remove (ccc->head_recv, ccc->tail_recv, com); + ccc->num_recv--; GNUNET_MQ_discard (com->env); GNUNET_free (com); - if (GNUNET_YES == ch->reliable) - send_channel_data_ack (ch); + send_channel_data_ack (ch); return; } LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1238,6 +1301,10 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, ccc, ntohl (msg->mid.mid), ntohl (ch->mid_recv.mid)); + /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and + the sender may already be transmitting the previous one. Needs + experimental evaluation to see if/when this ACK helps or + hurts. (We might even want another option.) */ send_channel_data_ack (ch); } @@ -1340,6 +1407,9 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, GNUNET_break_op (0); return; } + /* mid_base is the MID of the next message that the + other peer expects (i.e. that is missing!), everything + LOWER (but excluding mid_base itself) was received. */ mid_base = ntohl (ack->mid.mid); mid_mask = GNUNET_htonll (ack->futures); found = GNUNET_NO; @@ -1348,24 +1418,12 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, crm = crmn) { crmn = crm->next; - if (ack->mid.mid == crm->data_message->mid.mid) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got DATA_ACK with base %u matching message %u on %s\n", - (unsigned int) mid_base, - ntohl (crm->data_message->mid.mid), - GCCH_2s (ch)); - handle_matching_ack (ch, - crm); - found = GNUNET_YES; - continue; - } - delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1; + delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base); if (delta >= UINT_MAX - ch->max_pending_messages) { - /* overflow, means crm was way in the past, so this ACK counts for it. */ + /* overflow, means crm was a bit in the past, so this ACK counts for it. */ LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got DATA_ACK with base %u past %u on %s\n", + "Got DATA_ACK with base %u satisfying past message %u on %s\n", (unsigned int) mid_base, ntohl (crm->data_message->mid.mid), GCCH_2s (ch)); @@ -1374,8 +1432,14 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, found = GNUNET_YES; continue; } + delta--; if (delta >= 64) continue; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Testing bit %llX for mid %u (base: %u)\n", + (1LLU << delta), + ntohl (crm->data_message->mid.mid), + mid_base); if (0 != (mid_mask & (1LLU << delta))) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1405,7 +1469,8 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, GNUNET_SCHEDULER_cancel (ch->retry_data_task); ch->retry_data_task = NULL; } - if (NULL != ch->head_sent) + if ( (NULL != ch->head_sent) && + (NULL == ch->head_sent->qe) ) ch->retry_data_task = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, &retry_transmission, @@ -1606,6 +1671,7 @@ GCCH_handle_local_data (struct CadetChannel *ch, GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv, receiver->tail_recv, oom); + receiver->num_recv++; } return GNUNET_OK; } @@ -1623,14 +1689,14 @@ GCCH_handle_local_data (struct CadetChannel *ch, GNUNET_memcpy (&crm->data_message[1], buf, buf_len); - GNUNET_CONTAINER_DLL_insert (ch->head_sent, - ch->tail_sent, - crm); + GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent, + ch->tail_sent, + crm); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes from local client to %s with MID %u\n", - buf_len, + "Sending message %u from local client to %s with %u bytes\n", + ntohl (crm->data_message->mid.mid), GCCH_2s (ch), - ntohl (crm->data_message->mid.mid)); + buf_len); if (NULL != ch->retry_data_task) { GNUNET_SCHEDULER_cancel (ch->retry_data_task); @@ -1688,6 +1754,7 @@ GCCH_handle_local_ack (struct CadetChannel *ch, GNUNET_CONTAINER_DLL_remove (ccc->head_recv, ccc->tail_recv, com); + ccc->num_recv--; GSC_send_to_client (ccc->c, com->env); /* Notify sender that we can receive more */ @@ -1721,7 +1788,7 @@ GCCH_handle_local_ack (struct CadetChannel *ch, } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got LOCAL_ACK, passing payload message %u to %s-%X on %s\n", + "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n", ntohl (com->mid.mid), GSC_2s (ccc->c), ntohl (ccc->ccn.channel_of_client), @@ -1731,29 +1798,17 @@ GCCH_handle_local_ack (struct CadetChannel *ch, GNUNET_CONTAINER_DLL_remove (ccc->head_recv, ccc->tail_recv, com); + ccc->num_recv--; /* FIXME: if unreliable, this is not aggressive enough, as it would be OK to have lost some! */ + ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); ch->mid_futures >>= 1; /* equivalent to division by 2 */ ccc->client_ready = GNUNET_NO; GSC_send_to_client (ccc->c, com->env); GNUNET_free (com); - if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && - (GNUNET_YES == ch->reliable) ) - { - /* The next 15 messages were also already received (0xFF), this - suggests that the sender may be blocked on flow control - urgently waiting for an ACK from us. (As we have an inherent - maximum of 64 bits, and 15 is getting too close for comfort.) - So we should send one now. */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sender on %s likely blocked on flow-control, sending ACK now.\n", - GCCH_2s (ch)); - if (GNUNET_YES == ch->reliable) - send_channel_data_ack (ch); - } - + send_channel_data_ack (ch); if (NULL != ccc->head_recv) return; if (GNUNET_NO == ch->destroy) diff --git a/src/cadet/gnunet-service-cadet-new_connection.c b/src/cadet/gnunet-service-cadet-new_connection.c index 58922bc1e..5f1ff61e1 100644 --- a/src/cadet/gnunet-service-cadet-new_connection.c +++ b/src/cadet/gnunet-service-cadet-new_connection.c @@ -27,8 +27,7 @@ * @author Christian Grothoff * * TODO: - * - Implement: keepalive messages / timeout (timeout to be done @ peer level!) - * - Optimization: keep performance metrics (?) + * - Optimization: keep per-connection performance metrics (?) */ #include "platform.h" #include "gnunet-service-cadet-new.h" diff --git a/src/cadet/gnunet-service-cadet-new_paths.c b/src/cadet/gnunet-service-cadet-new_paths.c index 8a4d7bbf8..a5d201e0b 100644 --- a/src/cadet/gnunet-service-cadet-new_paths.c +++ b/src/cadet/gnunet-service-cadet-new_paths.c @@ -24,7 +24,7 @@ * @author Christian Grothoff * * TODO: - * - currently only allowing one unique connection per path, + * - BUG: currently only allowing one unique connection per path, * but need to allow 2 in case WE are establishing one from A to B * while at the same time B establishes one to A. * Also, must not ASSERT if B establishes a 2nd one to us. diff --git a/src/cadet/gnunet-service-cadet-new_peer.c b/src/cadet/gnunet-service-cadet-new_peer.c index 97bb1378e..180fdab54 100644 --- a/src/cadet/gnunet-service-cadet-new_peer.c +++ b/src/cadet/gnunet-service-cadet-new_peer.c @@ -25,6 +25,7 @@ * @author Christian Grothoff * * TODO: + * - timeout for routes * - optimize stopping/restarting DHT search to situations * where we actually need it (i.e. not if we have a direct connection, * or if we already have plenty of good short ones, or maybe even diff --git a/src/cadet/gnunet-service-cadet-new_tunnels.c b/src/cadet/gnunet-service-cadet-new_tunnels.c index 592a8c683..e677a7436 100644 --- a/src/cadet/gnunet-service-cadet-new_tunnels.c +++ b/src/cadet/gnunet-service-cadet-new_tunnels.c @@ -24,15 +24,15 @@ * @author Christian Grothoff * * FIXME: - * - implement keepalive - * - implement rekeying - * - check KX estate machine -- make sure it is never stuck! - * - clean up KX logic, including adding sender authentication - * - implement connection management (evaluate, kill old ones, - * search for new ones) - * - when managing connections, distinguish those that - * have (recently) had traffic from those that were - * never ready (or not recently) + * - KX: + * + implement rekeying + * + check KX estate machine -- make sure it is never stuck! + * + clean up KX logic, including adding sender authentication + * - connection management + * + properly (evaluate, kill old ones, search for new ones) + * + when managing connections, distinguish those that + * have (recently) had traffic from those that were + * never ready (or not recently) */ #include "platform.h" #include "gnunet_util_lib.h" -- 2.25.1