From 80d6555ec30182b9a8a59778339f5cbe7929ce60 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 25 Jan 2017 19:29:45 +0100 Subject: [PATCH] towards proper DATA_ACK handling --- src/cadet/gnunet-service-cadet-new_channel.c | 263 +++++++++++-------- 1 file changed, 150 insertions(+), 113 deletions(-) diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index dc3d4352c..e561f1992 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -25,6 +25,7 @@ * @author Christian Grothoff * * TODO: + * - Optimize ACKs by using 'mid_futures' properly! * - introduce shutdown so we can have half-closed channels, modify * destroy to include MID to have FIN-ACK equivalents, etc. * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! @@ -287,6 +288,8 @@ struct CadetChannel /** * Bitfield of already-received messages past @e mid_recv. + * + * FIXME: not yet properly used (bits here are never set!) */ uint64_t mid_futures; @@ -769,6 +772,11 @@ send_channel_data_ack (struct CadetChannel *ch) msg.futures = GNUNET_htonll (ch->mid_futures); if (NULL != ch->last_control_qe) GCT_send_cancel (ch->last_control_qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending DATA_ACK %u:%llX via %s\n", + (unsigned int) ntohl (msg.mid.mid), + (unsigned long long) ch->mid_futures, + GCCH_2s (ch)); ch->last_control_qe = GCT_send (ch->t, &msg.header, &send_ack_cb, @@ -1076,23 +1084,11 @@ is_before (void *cls, if (delta > (uint32_t) INT_MAX) { /* in overflow range, we can safely assume we wrapped around */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u > %u => %p > %p\n", - (unsigned int) v1, - (unsigned int) v2, - m1, - m2); return GNUNET_NO; } else { /* result is small, thus v2 > v1, thus e1 < e2 */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u < %u => %p < %p\n", - (unsigned int) v1, - (unsigned int) v2, - m1, - m2); return GNUNET_YES; } } @@ -1113,6 +1109,11 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, struct GNUNET_CADET_LocalData *ld; struct CadetChannelClient *ccc; size_t payload_size; + struct CadetOutOfOrderMessage *com; + int duplicate; + uint32_t mid_min; + uint32_t mid_max; + uint32_t mid_msg; GNUNET_assert (GNUNET_NO == ch->is_loopback); if ( (GNUNET_YES == ch->destroy) && @@ -1153,72 +1154,75 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, env); ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); ch->mid_futures >>= 1; + if (GNUNET_YES == ch->reliable) + send_channel_data_ack (ch); + return; } - else + + /* check if message ought to be dropped because it is anicent/too distant/duplicate */ + mid_min = ntohl (ch->mid_recv.mid); + mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE; + mid_msg = ntohl (msg->mid.mid); + if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) || + ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) ) { - struct CadetOutOfOrderMessage *com; - int duplicate; - uint32_t mid_min; - uint32_t mid_max; - uint32_t mid_msg; - - mid_min = ntohl (ch->mid_recv.mid); - mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE; - mid_msg = ntohl (msg->mid.mid); - if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) || - ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) ) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n", - (unsigned int) payload_size, - GCCH_2s (ch), - ntohl (msg->mid.mid)); - GNUNET_STATISTICS_update (stats, - "# duplicate DATA (ancient or future)", - 1, - GNUNET_NO); - GNUNET_MQ_discard (env); - return; - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (msg->mid.mid)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA (ancient or future)", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + return; + } - com = GNUNET_new (struct CadetOutOfOrderMessage); - com->mid = msg->mid; - com->env = env; - duplicate = GNUNET_NO; - GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, - is_before, - &duplicate, - ccc->head_recv, - ccc->tail_recv, - com); - if (GNUNET_YES == duplicate) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Duplicate payload of %u bytes on %s (mid %u) dropped\n", - (unsigned int) payload_size, - GCCH_2s (ch), - ntohl (msg->mid.mid)); - GNUNET_STATISTICS_update (stats, - "# duplicate DATA", - 1, - GNUNET_NO); - GNUNET_CONTAINER_DLL_remove (ccc->head_recv, - ccc->tail_recv, - com); - GNUNET_MQ_discard (com->env); - GNUNET_free (com); - return; - } + /* Insert message into sorted out-of-order queue */ + com = GNUNET_new (struct CadetOutOfOrderMessage); + com->mid = msg->mid; + com->env = env; + duplicate = GNUNET_NO; + GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, + is_before, + &duplicate, + ccc->head_recv, + ccc->tail_recv, + com); + if (GNUNET_YES == duplicate) + { + /* Duplicate within the queue, drop also */ LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n", - (GNUNET_YES == ccc->client_ready) - ? "out-of-order" - : "client-not-ready", + "Duplicate payload of %u bytes on %s (mid %u) dropped\n", (unsigned int) payload_size, GCCH_2s (ch), - ntohl (msg->mid.mid), - ntohl (ch->mid_recv.mid)); + ntohl (msg->mid.mid)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA", + 1, + GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + if (GNUNET_YES == ch->reliable) + send_channel_data_ack (ch); + return; } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n", + (GNUNET_YES == ccc->client_ready) + ? "out-of-order" + : "client-not-ready", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (ccc->ccn.channel_of_client), + ccc, + ntohl (msg->mid.mid), + ntohl (ch->mid_recv.mid)); + send_channel_data_ack (ch); } @@ -1260,6 +1264,36 @@ retry_transmission (void *cls) } +/** + * We got an ACK for a message in our queue, remove it from + * the queue and tell our client that it can send more. + * + * @param ch the channel that got the ACK + * @param crm the message that got acknowledged + */ +static void +handle_matching_ack (struct CadetChannel *ch, + struct CadetReliableMessage *crm) +{ + GNUNET_CONTAINER_DLL_remove (ch->head_sent, + ch->tail_sent, + crm); + ch->pending_messages--; + GNUNET_assert (ch->pending_messages < ch->max_pending_messages); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", + GCCH_2s (ch), + (unsigned int) ntohl (crm->data_message->mid.mid), + ch->pending_messages); + GNUNET_free (crm->data_message); + GNUNET_free (crm); + send_ack_to_client (ch, + (NULL == ch->owner) + ? GNUNET_NO + : GNUNET_YES); +} + + /** * We got an acknowledgement for payload data for a channel. * Possibly resume transmissions. @@ -1272,7 +1306,11 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, const struct GNUNET_CADET_ChannelDataAckMessage *ack) { struct CadetReliableMessage *crm; - int was_head; + struct CadetReliableMessage *crmn; + int found; + uint32_t mid_base; + uint64_t mid_mask; + unsigned int delta; GNUNET_break (GNUNET_NO == ch->is_loopback); if (GNUNET_NO == ch->reliable) @@ -1281,12 +1319,28 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, GNUNET_break_op (0); return; } + mid_base = ntohl (ack->mid.mid); + mid_mask = GNUNET_htonll (ack->futures); + found = GNUNET_NO; for (crm = ch->head_sent; NULL != crm; - crm = crm->next) + crm = crmn) + { + crmn = crm->next; if (ack->mid.mid == crm->data_message->mid.mid) - break; - if (NULL == crm) + { + handle_matching_ack (ch, + crm); + continue; + } + delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1; + if (delta >= 64) + continue; + if (0 != (mid_mask & (1LLU << delta))) + handle_matching_ack (ch, + crm); + } + if (GNUNET_NO == found) { /* ACK for message we already dropped, might have been a duplicate ACK? Ignore. */ @@ -1299,36 +1353,16 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, GNUNET_NO); return; } - was_head = (crm == ch->head_sent); - GNUNET_CONTAINER_DLL_remove (ch->head_sent, - ch->tail_sent, - crm); - GNUNET_free (crm->data_message); - GNUNET_free (crm); - ch->pending_messages--; - GNUNET_assert (ch->pending_messages < ch->max_pending_messages); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", - GCCH_2s (ch), - (unsigned int) ntohl (ack->mid.mid), - ch->pending_messages); - send_ack_to_client (ch, - (NULL == ch->owner) - ? GNUNET_NO - : GNUNET_YES); - if (was_head) + if (NULL != ch->retry_data_task) { - if (NULL != ch->retry_data_task) - { - GNUNET_SCHEDULER_cancel (ch->retry_data_task); - ch->retry_data_task = NULL; - } - if (NULL != ch->head_sent) - ch->retry_data_task - = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, - &retry_transmission, - ch); + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task = NULL; } + if (NULL != ch->head_sent) + ch->retry_data_task + = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, + &retry_transmission, + ch); } @@ -1591,10 +1625,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch, if (NULL == com) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending on %s)!\n", + "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n", GSC_2s (ccc->c), + ntohl (client_ccn.channel_of_client), + GCCH_2s (ch), ntohl (ccc->ccn.channel_of_client), - GCCH_2s (ch)); + ccc); return; /* none pending */ } if (GNUNET_YES == ch->is_loopback) @@ -1637,11 +1673,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch, return; /* missing next one in-order */ } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got LOCAL ACK, passing payload message to %s-%X on %s\n", - GSC_2s (ccc->c), - ntohl (ccc->ccn.channel_of_client), - GCCH_2s (ch)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n", + ntohl (com->mid.mid), + GSC_2s (ccc->c), + ntohl (ccc->ccn.channel_of_client), + GCCH_2s (ch)); /* all good, pass next message to client */ GNUNET_CONTAINER_DLL_remove (ccc->head_recv, @@ -1663,9 +1700,9 @@ GCCH_handle_local_ack (struct CadetChannel *ch, urgently waiting for an ACK from us. (As we have an inherent maximum of 64 bits, and 15 is getting too close for comfort.) So we should send one now. */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sender on %s likely blocked on flow-control, sending ACK now.\n", - GCCH_2s (ch)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sender on %s likely blocked on flow-control, sending ACK now.\n", + GCCH_2s (ch)); if (GNUNET_YES == ch->reliable) send_channel_data_ack (ch); } -- 2.25.1