X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcadet%2Fgnunet-service-cadet-new_channel.c;h=9d9edc28dae8b5db3303288c9845b7533e3d9180;hb=dd5a385e16168f9d4fe43dde53e49b77a15afa6e;hp=3bcf5ad0b2a8a23e45c5bd893c6fd6283477b4c3;hpb=212addcc5d177059cd11183034dcd5cf26a413ee;p=oweals%2Fgnunet.git diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 3bcf5ad0b..9d9edc28d 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -25,8 +25,6 @@ * * TODO: * - Congestion/flow control: - * + calculate current RTT if possible, use that for initial retransmissions - * (NOTE: needs us to learn which connection the tunnel uses for the message!) * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL! * (and figure out how/where to use this!) * + figure out flow control without ACKs (unreliable traffic!) @@ -128,6 +126,11 @@ struct CadetReliableMessage */ struct CadetTunnelQueueEntry *qe; + /** + * Data message we are trying to send. + */ + struct GNUNET_CADET_ChannelAppDataMessage *data_message; + /** * How soon should we retry if we fail to get an ACK? * Messages in the queue are sorted by this value. @@ -141,9 +144,23 @@ struct CadetReliableMessage struct GNUNET_TIME_Relative retry_delay; /** - * Data message we are trying to send. + * Time when we first successfully transmitted the message + * (that is, set @e num_transmissions to 1). */ - struct GNUNET_CADET_ChannelAppDataMessage *data_message; + struct GNUNET_TIME_Absolute first_transmission_time; + + /** + * Identifier of the connection that this message took when it + * was first transmitted. Only useful if @e num_transmissions is 1. + */ + struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken; + + /** + * How often was this message transmitted? #GNUNET_SYSERR if there + * was an error transmitting the message, #GNUNET_NO if it was not + * yet transmitted ever, otherwise the number of (re) transmissions. + */ + int num_transmissions; }; @@ -287,11 +304,6 @@ struct CadetChannel */ struct GNUNET_TIME_Relative retry_time; - /** - * How long does it usually take to get an ACK. - */ - struct GNUNET_TIME_Relative expected_delay; - /** * Bitfield of already-received messages past @e mid_recv. */ @@ -501,9 +513,12 @@ send_channel_open (void *cls); * create message. Delays for a bit until we retry. * * @param cls our `struct CadetChannel`. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -channel_open_sent_cb (void *cls) +channel_open_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetChannel *ch = cls; @@ -551,6 +566,8 @@ send_channel_open (void *cls) msgcc.port = ch->port; msgcc.ctn = ch->ctn; ch->state = CADET_CHANNEL_OPEN_SENT; + if (NULL != ch->last_control_qe) + GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = GCT_send (ch->t, &msgcc.header, &channel_open_sent_cb, @@ -755,9 +772,12 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, * ACKs for ACKs ;-). * * @param cls our `struct CadetChannel`. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -send_ack_cb (void *cls) +send_ack_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetChannel *ch = cls; @@ -783,13 +803,13 @@ send_channel_data_ack (struct CadetChannel *ch) msg.ctn = ch->ctn; msg.mid.mid = htonl (ntohl (ch->mid_recv.mid)); msg.futures = GNUNET_htonll (ch->mid_futures); - if (NULL != ch->last_control_qe) - GCT_send_cancel (ch->last_control_qe); LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending DATA_ACK %u:%llX via %s\n", (unsigned int) ntohl (msg.mid.mid), (unsigned long long) ch->mid_futures, GCCH_2s (ch)); + if (NULL != ch->last_control_qe) + GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = GCT_send (ch->t, &msg.header, &send_ack_cb, @@ -832,26 +852,28 @@ send_open_ack (void *cls) * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK. * * @param ch channel that got the duplicate open + * @param cti identifier of the connection that delivered the message */ void -GCCH_handle_duplicate_open (struct CadetChannel *ch) +GCCH_handle_duplicate_open (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { if (NULL == ch->dest) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring duplicate channel OPEN on %s: port is closed\n", + "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n", GCCH_2s (ch)); return; } if (NULL != ch->retry_control_task) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring duplicate channel OPEN on %s: control message is pending\n", + "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n", GCCH_2s (ch)); return; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Retransmitting OPEN_ACK on %s\n", + "Retransmitting CHANNEL_OPEN_ACK on %s\n", GCCH_2s (ch)); ch->retry_control_task = GNUNET_SCHEDULER_add_now (&send_open_ack, @@ -949,7 +971,8 @@ GCCH_bind (struct CadetChannel *ch, if (GNUNET_YES == ch->is_loopback) { ch->state = CADET_CHANNEL_OPEN_SENT; - GCCH_handle_channel_open_ack (ch); + GCCH_handle_channel_open_ack (ch, + NULL); } else { @@ -1021,7 +1044,10 @@ GCCH_channel_local_destroy (struct CadetChannel *ch, return; } /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */ - if (CADET_CHANNEL_NEW != ch->state) + if (CADET_CHANNEL_NEW == ch->state) + GSC_drop_loose_channel (&ch->port, + ch); + else GCT_send_channel_destroy (ch->t, ch->ctn); /* Nothing left to do, just finish destruction */ @@ -1034,9 +1060,11 @@ GCCH_channel_local_destroy (struct CadetChannel *ch, * (the port is open on the other side). Begin transmissions. * * @param ch channel to destroy + * @param cti identifier of the connection that delivered the message */ void -GCCH_handle_channel_open_ack (struct CadetChannel *ch) +GCCH_handle_channel_open_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { switch (ch->state) { @@ -1119,10 +1147,12 @@ is_before (void *cls, * and send an ACK to the other end (once flow control allows it!) * * @param ch channel that got data + * @param cti identifier of the connection that delivered the message * @param msg message that was received */ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, const struct GNUNET_CADET_ChannelAppDataMessage *msg) { struct GNUNET_MQ_Envelope *env; @@ -1242,6 +1272,13 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, /* Yep, need to drop. Drop the oldest message in the buffer. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue full due slow client on %s, dropping oldest message\n", + GCCH_2s (ch)); + GNUNET_STATISTICS_update (stats, + "# messages dropped due to slow client", + 1, + GNUNET_NO); drop = ccc->head_recv; GNUNET_CONTAINER_DLL_remove (ccc->head_recv, ccc->tail_recv, @@ -1315,9 +1352,12 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, * wait for ACK (or retransmit). * * @param cls the `struct CadetReliableMessage` that was sent + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -data_sent_cb (void *cls); +data_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid); /** @@ -1351,10 +1391,12 @@ retry_transmission (void *cls) * the queue and tell our client that it can send more. * * @param ch the channel that got the PLAINTEXT_DATA_ACK + * @param cti identifier of the connection that delivered the message * @param crm the message that got acknowledged */ static void handle_matching_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, struct CadetReliableMessage *crm) { GNUNET_CONTAINER_DLL_remove (ch->head_sent, @@ -1372,6 +1414,18 @@ handle_matching_ack (struct CadetChannel *ch, GCT_send_cancel (crm->qe); crm->qe = NULL; } + if ( (1 == crm->num_transmissions) && + (NULL != cti) ) + { + GCC_ack_observed (cti); + if (0 == memcmp (cti, + &crm->connection_taken, + sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier))) + { + GCC_latency_observed (cti, + GNUNET_TIME_absolute_get_duration (crm->first_transmission_time)); + } + } GNUNET_free (crm->data_message); GNUNET_free (crm); send_ack_to_client (ch, @@ -1386,10 +1440,12 @@ handle_matching_ack (struct CadetChannel *ch, * Possibly resume transmissions. * * @param ch channel that got the ack + * @param cti identifier of the connection that delivered the message * @param ack details about what was received */ void GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, const struct GNUNET_CADET_ChannelDataAckMessage *ack) { struct CadetReliableMessage *crm; @@ -1427,6 +1483,7 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, ntohl (crm->data_message->mid.mid), GCCH_2s (ch)); handle_matching_ack (ch, + cti, crm); found = GNUNET_YES; continue; @@ -1446,6 +1503,7 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, ntohl (crm->data_message->mid.mid), GCCH_2s (ch)); handle_matching_ack (ch, + cti, crm); found = GNUNET_YES; } @@ -1483,9 +1541,12 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, * the tunnel. * * @param ch channel to destroy + * @param cti identifier of the connection that delivered the message, + * NULL if we are simulating receiving a destroy due to shutdown */ void -GCCH_handle_remote_destroy (struct CadetChannel *ch) +GCCH_handle_remote_destroy (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { struct CadetChannelClient *ccc; @@ -1542,9 +1603,12 @@ cmp_crm_by_next_retry (void *cls, * wait for ACK (or retransmit). * * @param cls the `struct CadetReliableMessage` that was sent + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -data_sent_cb (void *cls) +data_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetReliableMessage *crm = cls; struct CadetChannel *ch = crm->ch; @@ -1566,10 +1630,34 @@ data_sent_cb (void *cls) : GNUNET_YES); return; } - if (0 == crm->retry_delay.rel_value_us) - crm->retry_delay = ch->expected_delay; - else - crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay); + if (NULL == cid) + { + /* There was an error sending. */ + crm->num_transmissions = GNUNET_SYSERR; + } + else if (GNUNET_SYSERR != crm->num_transmissions) + { + /* Increment transmission counter, and possibly store @a cid + if this was the first transmission. */ + crm->num_transmissions++; + if (1 == crm->num_transmissions) + { + crm->first_transmission_time = GNUNET_TIME_absolute_get (); + crm->connection_taken = *cid; + GCC_ack_expected (cid); + } + } + if ( (0 == crm->retry_delay.rel_value_us) && + (NULL != cid) ) + { + struct CadetConnection *cc = GCC_lookup (cid); + + if (NULL != cc) + crm->retry_delay = GCC_get_metrics (cc)->aged_latency; + else + crm->retry_delay = ch->retry_time; + } + crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay); crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay, MIN_RTT_DELAY); crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay); @@ -1625,6 +1713,11 @@ GCCH_handle_local_data (struct CadetChannel *ch, GNUNET_break (0); return GNUNET_SYSERR; } + if (GNUNET_YES == ch->destroy) + { + /* we are going down, drop messages */ + return GNUNET_OK; + } ch->pending_messages++; if (GNUNET_YES == ch->is_loopback) @@ -1637,19 +1730,25 @@ GCCH_handle_local_data (struct CadetChannel *ch, env = GNUNET_MQ_msg_extra (ld, buf_len, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - if (sender_ccn.channel_of_client == - ch->owner->ccn.channel_of_client) + if ( (NULL != ch->owner) && + (sender_ccn.channel_of_client == + ch->owner->ccn.channel_of_client) ) { receiver = ch->dest; to_owner = GNUNET_NO; } - else + else if ( (NULL != ch->dest) && + (sender_ccn.channel_of_client == + ch->dest->ccn.channel_of_client) ) { - GNUNET_assert (sender_ccn.channel_of_client == - ch->dest->ccn.channel_of_client); receiver = ch->owner; to_owner = GNUNET_YES; } + else + { + GNUNET_break (0); + return GNUNET_SYSERR; + } ld->ccn = receiver->ccn; GNUNET_memcpy (&ld[1], buf, @@ -1775,7 +1874,8 @@ GCCH_handle_local_ack (struct CadetChannel *ch, } if ( (com->mid.mid != ch->mid_recv.mid) && - (GNUNET_NO == ch->out_of_order) ) + (GNUNET_NO == ch->out_of_order) && + (GNUNET_YES == ch->reliable) ) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",