X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcadet%2Fgnunet-service-cadet-new_channel.c;h=9d9edc28dae8b5db3303288c9845b7533e3d9180;hb=dd5a385e16168f9d4fe43dde53e49b77a15afa6e;hp=6f57538acdcc1f18bd1521788188c3ba517fcacc;hpb=f8f51150a4f28fecd7f0997227c5c15edfe9dd02;p=oweals%2Fgnunet.git diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 6f57538ac..9d9edc28d 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -1,4 +1,3 @@ - /* This file is part of GNUnet. Copyright (C) 2001-2017 GNUnet e.V. @@ -25,13 +24,14 @@ * @author Christian Grothoff * * TODO: - * - introduce shutdown so we can have half-closed channels, modify - * destroy to include MID to have FIN-ACK equivalents, etc. - * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! - * - check that '0xFFULL' really is sufficient for flow control! - * - revisit handling of 'unreliable' traffic! - * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'. - * - figure out flow control without ACKs (unreliable traffic!) + * - Congestion/flow control: + * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL! + * (and figure out how/where to use this!) + * + figure out flow control without ACKs (unreliable traffic!) + * - revisit handling of 'unbuffered' traffic! + * (need to push down through tunnel into connection selection) + * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe + * reserve more bits in 'options' to allow for buffer size control? */ #include "platform.h" #include "gnunet_util_lib.h" @@ -57,6 +57,23 @@ */ #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) +/** + * How long do we wait at least before retransmitting ever? + */ +#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75) + +/** + * Maximum message ID into the future we accept for out-of-order messages. + * If the message is more than this into the future, we drop it. This is + * important both to detect values that are actually in the past, as well + * as to limit adversarially triggerable memory consumption. + * + * Note that right now we have "max_pending_messages = 4" hard-coded in + * the logic below, so a value of 4 would suffice here. But we plan to + * allow larger windows in the future... + */ +#define MAX_OUT_OF_ORDER_DISTANCE 1024 + /** * All the states a connection can be in. @@ -71,7 +88,7 @@ enum CadetChannelState /** * Connection create message sent, waiting for ACK. */ - CADET_CHANNEL_CREATE_SENT, + CADET_CHANNEL_OPEN_SENT, /** * Connection confirmed, ready to carry traffic. @@ -109,6 +126,11 @@ struct CadetReliableMessage */ struct CadetTunnelQueueEntry *qe; + /** + * Data message we are trying to send. + */ + struct GNUNET_CADET_ChannelAppDataMessage *data_message; + /** * How soon should we retry if we fail to get an ACK? * Messages in the queue are sorted by this value. @@ -122,11 +144,24 @@ struct CadetReliableMessage struct GNUNET_TIME_Relative retry_delay; /** - * Data message we are trying to send. + * Time when we first successfully transmitted the message + * (that is, set @e num_transmissions to 1). + */ + struct GNUNET_TIME_Absolute first_transmission_time; + + /** + * Identifier of the connection that this message took when it + * was first transmitted. Only useful if @e num_transmissions is 1. */ - struct GNUNET_CADET_ChannelAppDataMessage data_message; + struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken; + + /** + * How often was this message transmitted? #GNUNET_SYSERR if there + * was an error transmitting the message, #GNUNET_NO if it was not + * yet transmitted ever, otherwise the number of (re) transmissions. + */ + int num_transmissions; - /* followed by variable-size payload */ }; @@ -159,6 +194,51 @@ struct CadetOutOfOrderMessage }; +/** + * Client endpoint of a `struct CadetChannel`. A channel may be a + * loopback channel, in which case it has two of these endpoints. + * Note that flow control also is required in both directions. + */ +struct CadetChannelClient +{ + /** + * Client handle. Not by itself sufficient to designate + * the client endpoint, as the same client handle may + * be used for both the owner and the destination, and + * we thus also need the channel ID to identify the client. + */ + struct CadetClient *c; + + /** + * Head of DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *head_recv; + + /** + * Tail DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *tail_recv; + + /** + * Local tunnel number for this client. + * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI, + * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) + */ + struct GNUNET_CADET_ClientChannelNumber ccn; + + /** + * Number of entries currently in @a head_recv DLL. + */ + unsigned int num_recv; + + /** + * Can we send data to the client? + */ + int client_ready; + +}; + + /** * Struct containing all information regarding a channel to a remote client. */ @@ -169,25 +249,25 @@ struct CadetChannel */ struct CadetTunnel *t; - /** - * Last entry in the tunnel's queue relating to control messages - * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or - * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel - * transmission in case we receive updated information. - */ - struct CadetTunnelQueueEntry *last_control_qe; - /** * Client owner of the tunnel, if any. * (Used if this channel represends the initiating end of the tunnel.) */ - struct CadetClient *owner; + struct CadetChannelClient *owner; /** * Client destination of the tunnel, if any. * (Used if this channel represents the listening end of the tunnel.) */ - struct CadetClient *dest; + struct CadetChannelClient *dest; + + /** + * Last entry in the tunnel's queue relating to control messages + * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or + * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel + * transmission in case we receive updated information. + */ + struct CadetTunnelQueueEntry *last_control_qe; /** * Head of DLL of messages sent and not yet ACK'd. @@ -200,19 +280,14 @@ struct CadetChannel struct CadetReliableMessage *tail_sent; /** - * Head of DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *head_recv; - - /** - * Tail DLL of messages received out of order or while client was unready. + * Task to resend/poll in case no ACK is received. */ - struct CadetOutOfOrderMessage *tail_recv; + struct GNUNET_SCHEDULER_Task *retry_control_task; /** * Task to resend/poll in case no ACK is received. */ - struct GNUNET_SCHEDULER_Task *retry_task; + struct GNUNET_SCHEDULER_Task *retry_data_task; /** * Last time the channel was used @@ -229,11 +304,6 @@ struct CadetChannel */ struct GNUNET_TIME_Relative retry_time; - /** - * How long does it usually take to get an ACK. - */ - struct GNUNET_TIME_Relative expected_delay; - /** * Bitfield of already-received messages past @e mid_recv. */ @@ -265,26 +335,16 @@ struct CadetChannel */ struct GNUNET_CADET_ChannelTunnelNumber ctn; - /** - * Local tunnel number for local client owning the channel. - * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 ) - */ - struct GNUNET_CADET_ClientChannelNumber ccn; - /** * Channel state. */ enum CadetChannelState state; /** - * Can we send data to the client? + * Count how many ACKs we skipped, used to prevent long + * sequences of ACK skipping. */ - int client_ready; - - /** - * Can the client send data to us? - */ - int client_allowed; + unsigned int skip_ack_series; /** * Is the tunnel bufferless (minimum latency)? @@ -301,6 +361,11 @@ struct CadetChannel */ int out_of_order; + /** + * Is this channel a loopback channel, where the destination is us again? + */ + int is_loopback; + /** * Flag to signal the destruction of the channel. If this is set to * #GNUNET_YES the channel will be destroyed once the queue is @@ -323,15 +388,16 @@ GCCH_2s (const struct CadetChannel *ch) { static char buf[128]; - if (NULL == ch) - return "(NULL Channel)"; GNUNET_snprintf (buf, sizeof (buf), - "%s:%s ctn:%X (%X)", - GCT_2s (ch->t), + "Channel %s:%s ctn:%X(%X/%X)", + (GNUNET_YES == ch->is_loopback) + ? "loopback" + : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), GNUNET_h2s (&ch->port), ch->ctn, - ntohl (ch->ccn.channel_of_client)); + (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client), + (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client)); return buf; } @@ -350,6 +416,29 @@ GCCH_get_id (const struct CadetChannel *ch) } +/** + * Release memory associated with @a ccc + * + * @param ccc data structure to clean up + */ +static void +free_channel_client (struct CadetChannelClient *ccc) +{ + struct CadetOutOfOrderMessage *com; + + while (NULL != (com = ccc->head_recv)) + { + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv--; + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + } + GNUNET_free (ccc); +} + + /** * Destroy the given channel. * @@ -359,7 +448,6 @@ static void channel_destroy (struct CadetChannel *ch) { struct CadetReliableMessage *crm; - struct CadetOutOfOrderMessage *com; while (NULL != (crm = ch->head_sent)) { @@ -372,29 +460,41 @@ channel_destroy (struct CadetChannel *ch) GNUNET_CONTAINER_DLL_remove (ch->head_sent, ch->tail_sent, crm); + GNUNET_free (crm->data_message); GNUNET_free (crm); } - while (NULL != (com = ch->head_recv)) + if (NULL != ch->owner) { - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, - com); - GNUNET_MQ_discard (com->env); - GNUNET_free (com); + free_channel_client (ch->owner); + ch->owner = NULL; + } + if (NULL != ch->dest) + { + free_channel_client (ch->dest); + ch->dest = NULL; } if (NULL != ch->last_control_qe) { GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = NULL; } - if (NULL != ch->retry_task) + if (NULL != ch->retry_data_task) { - GNUNET_SCHEDULER_cancel (ch->retry_task); - ch->retry_task = NULL; + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task = NULL; + } + if (NULL != ch->retry_control_task) + { + GNUNET_SCHEDULER_cancel (ch->retry_control_task); + ch->retry_control_task = NULL; + } + if (GNUNET_NO == ch->is_loopback) + { + GCT_remove_channel (ch->t, + ch, + ch->ctn); + ch->t = NULL; } - GCT_remove_channel (ch->t, - ch, - ch->ctn); GNUNET_free (ch); } @@ -413,17 +513,27 @@ send_channel_open (void *cls); * create message. Delays for a bit until we retry. * * @param cls our `struct CadetChannel`. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -channel_open_sent_cb (void *cls) +channel_open_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetChannel *ch = cls; + GNUNET_assert (NULL != ch->last_control_qe); ch->last_control_qe = NULL; ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); - ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time, - &send_channel_open, - ch); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n", + GCCH_2s (ch), + GNUNET_STRINGS_relative_time_to_string (ch->retry_time, + GNUNET_YES)); + ch->retry_control_task + = GNUNET_SCHEDULER_add_delayed (ch->retry_time, + &send_channel_open, + ch); } @@ -439,7 +549,10 @@ send_channel_open (void *cls) struct GNUNET_CADET_ChannelOpenMessage msgcc; uint32_t options; - ch->retry_task = NULL; + ch->retry_control_task = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending CHANNEL_OPEN message for %s\n", + GCCH_2s (ch)); options = 0; if (ch->nobuffer) options |= GNUNET_CADET_OPTION_NOBUFFER; @@ -452,14 +565,37 @@ send_channel_open (void *cls) msgcc.opt = htonl (options); msgcc.port = ch->port; msgcc.ctn = ch->ctn; - ch->state = CADET_CHANNEL_CREATE_SENT; + ch->state = CADET_CHANNEL_OPEN_SENT; + if (NULL != ch->last_control_qe) + GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = GCT_send (ch->t, &msgcc.header, &channel_open_sent_cb, ch); + GNUNET_assert (NULL == ch->retry_control_task); +} + + +/** + * Function called once and only once after a channel was bound + * to its tunnel via #GCT_add_channel() is ready for transmission. + * Note that this is only the case for channels that this peer + * initiates, as for incoming channels we assume that they are + * ready for transmission immediately upon receiving the open + * message. Used to bootstrap the #GCT_send() process. + * + * @param ch the channel for which the tunnel is now ready + */ +void +GCCH_tunnel_up (struct CadetChannel *ch) +{ + GNUNET_assert (NULL == ch->retry_control_task); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending CHANNEL_OPEN message for channel %s\n", + "Tunnel up, sending CHANNEL_OPEN on %s now\n", GCCH_2s (ch)); + ch->retry_control_task + = GNUNET_SCHEDULER_add_now (&send_channel_open, + ch); } @@ -481,32 +617,68 @@ GCCH_channel_local_new (struct CadetClient *owner, uint32_t options) { struct CadetChannel *ch; + struct CadetChannelClient *ccco; + + ccco = GNUNET_new (struct CadetChannelClient); + ccco->c = owner; + ccco->ccn = ccn; + ccco->client_ready = GNUNET_YES; ch = GNUNET_new (struct CadetChannel); + ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */ ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); - ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */ - ch->owner = owner; - ch->ccn = ccn; + ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ + ch->owner = ccco; ch->port = *port; - ch->t = GCP_get_tunnel (destination, - GNUNET_YES); - ch->ctn = GCT_add_channel (ch->t, - ch); - ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; - ch->retry_task = GNUNET_SCHEDULER_add_now (&send_channel_open, - ch); + if (0 == memcmp (&my_full_id, + GCP_get_id (destination), + sizeof (struct GNUNET_PeerIdentity))) + { + struct CadetClient *c; + + ch->is_loopback = GNUNET_YES; + c = GNUNET_CONTAINER_multihashmap_get (open_ports, + port); + if (NULL == c) + { + /* port closed, wait for it to possibly open */ + (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, + port, + ch, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Created loose incoming loopback channel to port %s\n", + GNUNET_h2s (&ch->port)); + } + else + { + ch->dest = GNUNET_new (struct CadetChannelClient); + ch->dest->c = c; + ch->dest->client_ready = GNUNET_YES; + GCCH_bind (ch, + ch->dest->c); + } + } + else + { + ch->t = GCP_get_tunnel (destination, + GNUNET_YES); + ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; + ch->ctn = GCT_add_channel (ch->t, + ch); + } GNUNET_STATISTICS_update (stats, "# channels", 1, GNUNET_NO); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Created channel to port %s at peer %s for client %s using tunnel %s\n", + "Created channel to port %s at peer %s for %s using %s\n", GNUNET_h2s (port), GCP_2s (destination), GSC_2s (owner), - GCT_2s (ch->t)); + (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t)); return ch; } @@ -522,7 +694,7 @@ timeout_closed_cb (void *cls) { struct CadetChannel *ch = cls; - ch->retry_task = NULL; + ch->retry_control_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Closing incoming channel to port %s from peer %s due to timeout\n", GNUNET_h2s (&ch->port), @@ -557,7 +729,7 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); - ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */ + ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ GNUNET_STATISTICS_update (stats, "# channels", 1, @@ -572,9 +744,10 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, port, ch, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT, - &timeout_closed_cb, - ch); + ch->retry_control_task + = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT, + &timeout_closed_cb, + ch); LOG (GNUNET_ERROR_TYPE_DEBUG, "Created loose incoming channel to port %s from peer %s\n", GNUNET_h2s (&ch->port), @@ -599,31 +772,42 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, * ACKs for ACKs ;-). * * @param cls our `struct CadetChannel`. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -send_ack_cb (void *cls) +send_ack_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetChannel *ch = cls; + GNUNET_assert (NULL != ch->last_control_qe); ch->last_control_qe = NULL; } /** - * Compute and send the current ACK to the other peer. + * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer. * - * @param ch channel to send the ACK for + * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for */ static void send_channel_data_ack (struct CadetChannel *ch) { struct GNUNET_CADET_ChannelDataAckMessage msg; + if (GNUNET_NO == ch->reliable) + return; /* no ACKs */ msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); msg.header.size = htons (sizeof (msg)); msg.ctn = ch->ctn; - msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1); + msg.mid.mid = htonl (ntohl (ch->mid_recv.mid)); msg.futures = GNUNET_htonll (ch->mid_futures); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending DATA_ACK %u:%llX via %s\n", + (unsigned int) ntohl (msg.mid.mid), + (unsigned long long) ch->mid_futures, + GCCH_2s (ch)); if (NULL != ch->last_control_qe) GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = GCT_send (ch->t, @@ -634,38 +818,103 @@ send_channel_data_ack (struct CadetChannel *ch) /** - * Send our initial ACK to the client confirming that the + * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the * connection is up. * * @param cls the `struct CadetChannel` */ static void -send_connect_ack (void *cls) +send_open_ack (void *cls) { struct CadetChannel *ch = cls; + struct GNUNET_CADET_ChannelManageMessage msg; - ch->retry_task = NULL; - send_channel_data_ack (ch); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending CHANNEL_OPEN_ACK on %s\n", + GCCH_2s (ch)); + ch->retry_control_task = NULL; + msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK); + msg.header.size = htons (sizeof (msg)); + msg.reserved = htonl (0); + msg.ctn = ch->ctn; + if (NULL != ch->last_control_qe) + GCT_send_cancel (ch->last_control_qe); + ch->last_control_qe = GCT_send (ch->t, + &msg.header, + &send_ack_cb, + ch); +} + + +/** + * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for + * this channel. If the binding was successful, (re)transmit the + * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK. + * + * @param ch channel that got the duplicate open + * @param cti identifier of the connection that delivered the message + */ +void +GCCH_handle_duplicate_open (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) +{ + if (NULL == ch->dest) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n", + GCCH_2s (ch)); + return; + } + if (NULL != ch->retry_control_task) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n", + GCCH_2s (ch)); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Retransmitting CHANNEL_OPEN_ACK on %s\n", + GCCH_2s (ch)); + ch->retry_control_task + = GNUNET_SCHEDULER_add_now (&send_open_ack, + ch); } /** - * Send a LOCAL ACK to the client to solicit more messages. + * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages. * * @param ch channel the ack is for - * @param c client to send the ACK to + * @param to_owner #GNUNET_YES to send to owner, + * #GNUNET_NO to send to dest */ static void send_ack_to_client (struct CadetChannel *ch, - struct CadetClient *c) + int to_owner) { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalAck *ack; + struct CadetChannelClient *ccc; + ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest; + if (NULL == ccc) + { + /* This can happen if we are just getting ACKs after + our local client already disconnected. */ + GNUNET_assert (GNUNET_YES == ch->destroy); + return; + } env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); - ack->ccn = ch->ccn; - GSC_send_to_client (c, + ack->ccn = ccc->ccn; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n", + GSC_2s (ccc->c), + (GNUNET_YES == to_owner) ? "owner" : "dest", + ntohl (ack->ccn.channel_of_client), + ch->pending_messages, + ch->max_pending_messages); + GSC_send_to_client (ccc->c, env); } @@ -682,21 +931,20 @@ void GCCH_bind (struct CadetChannel *ch, struct CadetClient *c) { - struct GNUNET_MQ_Envelope *env; - struct GNUNET_CADET_LocalChannelCreateMessage *tcm; uint32_t options; + struct CadetChannelClient *cccd; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Binding channel %s from tunnel %s to port %s of client %s\n", + "Binding %s from %s to port %s of %s\n", GCCH_2s (ch), GCT_2s (ch->t), GNUNET_h2s (&ch->port), GSC_2s (c)); - if (NULL != ch->retry_task) + if (NULL != ch->retry_control_task) { /* there might be a timeout task here */ - GNUNET_SCHEDULER_cancel (ch->retry_task); - ch->retry_task = NULL; + GNUNET_SCHEDULER_cancel (ch->retry_control_task); + ch->retry_control_task = NULL; } options = 0; if (ch->nobuffer) @@ -705,86 +953,104 @@ GCCH_bind (struct CadetChannel *ch, options |= GNUNET_CADET_OPTION_RELIABLE; if (ch->out_of_order) options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; - ch->dest = c; - ch->ccn = GSC_bind (c, - ch, - GCT_get_destination (ch->t), - &ch->port, - options); - ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ - - /* notify other peer that we accepted the connection */ - ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack, - ch); + cccd = GNUNET_new (struct CadetChannelClient); + ch->dest = cccd; + cccd->c = c; + cccd->client_ready = GNUNET_YES; + cccd->ccn = GSC_bind (c, + ch, + (GNUNET_YES == ch->is_loopback) + ? GCP_get (&my_full_id, + GNUNET_YES) + : GCT_get_destination (ch->t), + &ch->port, + options); + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < + GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); + ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */ + if (GNUNET_YES == ch->is_loopback) + { + ch->state = CADET_CHANNEL_OPEN_SENT; + GCCH_handle_channel_open_ack (ch, + NULL); + } + else + { + /* notify other peer that we accepted the connection */ + ch->retry_control_task + = GNUNET_SCHEDULER_add_now (&send_open_ack, + ch); + } /* give client it's initial supply of ACKs */ - env = GNUNET_MQ_msg (tcm, - GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE); - tcm->ccn = ch->ccn; - tcm->peer = *GCP_get_id (GCT_get_destination (ch->t)); - tcm->port = ch->port; - tcm->opt = htonl (options); - GSC_send_to_client (ch->dest, - env); + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < + GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); for (unsigned int i=0;imax_pending_messages;i++) send_ack_to_client (ch, - ch->owner); + GNUNET_NO); } /** - * Destroy locally created channel. Called by the - * local client, so no need to tell the client. + * Destroy locally created channel. Called by the local client, so no + * need to tell the client. * * @param ch channel to destroy + * @param c client that caused the destruction + * @param ccn client number of the client @a c */ void -GCCH_channel_local_destroy (struct CadetChannel *ch) +GCCH_channel_local_destroy (struct CadetChannel *ch, + struct CadetClient *c, + struct GNUNET_CADET_ClientChannelNumber ccn) { - if (GNUNET_YES == ch->destroy) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s asks for destruction of %s\n", + GSC_2s (c), + GCCH_2s (ch)); + GNUNET_assert (NULL != c); + if ( (NULL != ch->owner) && + (c == ch->owner->c) && + (ccn.channel_of_client == ch->owner->ccn.channel_of_client) ) { - /* other end already destroyed, with the local client gone, no need - to finish transmissions, just destroy immediately. */ - channel_destroy (ch); - return; + free_channel_client (ch->owner); + ch->owner = NULL; } - if (NULL != ch->head_sent) + else if ( (NULL != ch->dest) && + (c == ch->dest->c) && + (ccn.channel_of_client == ch->dest->ccn.channel_of_client) ) { - /* allow send queue to train first */ - ch->destroy = GNUNET_YES; - return; + free_channel_client (ch->dest); + ch->dest = NULL; + } + else + { + GNUNET_assert (0); } - /* Nothing left to do, just finish destruction */ - GCT_send_channel_destroy (ch->t, - ch->ctn); - channel_destroy (ch); -} - -/** - * Destroy channel that was incoming. Called by the - * local client, so no need to tell the client. - * - * @param ch channel to destroy - */ -void -GCCH_channel_incoming_destroy (struct CadetChannel *ch) -{ if (GNUNET_YES == ch->destroy) { - /* other end already destroyed, with the remote client gone, no need + /* other end already destroyed, with the local client gone, no need to finish transmissions, just destroy immediately. */ channel_destroy (ch); return; } - if (NULL != ch->head_recv) + if ( (NULL != ch->head_sent) || + (NULL != ch->owner) || + (NULL != ch->dest) ) { - /* allow local client to see all data first */ + /* Wait for other end to destroy us as well, + and otherwise allow send queue to be transmitted first */ ch->destroy = GNUNET_YES; return; } + /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */ + if (CADET_CHANNEL_NEW == ch->state) + GSC_drop_loose_channel (&ch->port, + ch); + else + GCT_send_channel_destroy (ch->t, + ch->ctn); /* Nothing left to do, just finish destruction */ - GCT_send_channel_destroy (ch->t, - ch->ctn); channel_destroy (ch); } @@ -794,9 +1060,11 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch) * (the port is open on the other side). Begin transmissions. * * @param ch channel to destroy + * @param cti identifier of the connection that delivered the message */ void -GCCH_handle_channel_open_ack (struct CadetChannel *ch) +GCCH_handle_channel_open_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { switch (ch->state) { @@ -804,7 +1072,7 @@ GCCH_handle_channel_open_ack (struct CadetChannel *ch) /* this should be impossible */ GNUNET_break (0); break; - case CADET_CHANNEL_CREATE_SENT: + case CADET_CHANNEL_OPEN_SENT: if (NULL == ch->owner) { /* We're not the owner, wrong direction! */ @@ -812,19 +1080,24 @@ GCCH_handle_channel_open_ack (struct CadetChannel *ch) return; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received channel OPEN_ACK for waiting channel %s, entering READY state\n", + "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n", GCCH_2s (ch)); + if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */ + { + GNUNET_SCHEDULER_cancel (ch->retry_control_task); + ch->retry_control_task = NULL; + } ch->state = CADET_CHANNEL_READY; /* On first connect, send client as many ACKs as we allow messages to be buffered! */ for (unsigned int i=0;imax_pending_messages;i++) send_ack_to_client (ch, - ch->owner); + GNUNET_YES); break; case CADET_CHANNEL_READY: /* duplicate ACK, maybe we retried the CREATE. Ignore. */ LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received duplicate channel OPEN_ACK for channel %s\n", + "Received duplicate channel OPEN_ACK for %s\n", GCCH_2s (ch)); GNUNET_STATISTICS_update (stats, "# duplicate CREATE_ACKs", @@ -838,26 +1111,24 @@ GCCH_handle_channel_open_ack (struct CadetChannel *ch) /** * Test if element @a e1 comes before element @a e2. * - * TODO: use opportunity to create generic list insertion sort - * logic in container! - * - * @param cls closure, our `struct CadetChannel` - * @param e1 an element of to sort - * @param e2 another element to sort + * @param cls closure, to a flag where we indicate duplicate packets + * @param m1 a message of to sort + * @param m2 another message to sort * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO */ static int is_before (void *cls, - void *e1, - void *e2) + struct CadetOutOfOrderMessage *m1, + struct CadetOutOfOrderMessage *m2) { - struct CadetOutOfOrderMessage *m1 = e1; - struct CadetOutOfOrderMessage *m2 = e2; + int *duplicate = cls; uint32_t v1 = ntohl (m1->mid.mid); uint32_t v2 = ntohl (m2->mid.mid); uint32_t delta; - delta = v1 - v2; + delta = v2 - v1; + if (0 == delta) + *duplicate = GNUNET_YES; if (delta > (uint32_t) INT_MAX) { /* in overflow range, we can safely assume we wrapped around */ @@ -865,6 +1136,7 @@ is_before (void *cls, } else { + /* result is small, thus v2 > v1, thus m1 < m2 */ return GNUNET_YES; } } @@ -875,150 +1147,201 @@ is_before (void *cls, * and send an ACK to the other end (once flow control allows it!) * * @param ch channel that got data + * @param cti identifier of the connection that delivered the message + * @param msg message that was received */ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, const struct GNUNET_CADET_ChannelAppDataMessage *msg) { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; - struct CadetOutOfOrderMessage *com; + struct CadetChannelClient *ccc; size_t payload_size; + struct CadetOutOfOrderMessage *com; + int duplicate; + uint32_t mid_min; + uint32_t mid_max; + uint32_t mid_msg; + uint32_t delta; + GNUNET_assert (GNUNET_NO == ch->is_loopback); + if ( (GNUNET_YES == ch->destroy) && + (NULL == ch->owner) && + (NULL == ch->dest) ) + { + /* This client is gone, but we still have messages to send to + the other end (which is why @a ch is not yet dead). However, + we cannot pass messages to our client anymore. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Dropping incoming payload on %s as this end is already closed\n", + GCCH_2s (ch)); + /* send back DESTROY notification to stop further retransmissions! */ + GCT_send_channel_destroy (ch->t, + ch->ctn); + return; + } payload_size = ntohs (msg->header.size) - sizeof (*msg); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receicved %u bytes of application data on channel %s\n", - (unsigned int) payload_size, - GCCH_2s (ch)); env = GNUNET_MQ_msg_extra (ld, payload_size, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - ld->ccn = ch->ccn; + ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn; GNUNET_memcpy (&ld[1], &msg[1], payload_size); - if ( (GNUNET_YES == ch->client_ready) && + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if ( (GNUNET_YES == ccc->client_ready) && ( (GNUNET_YES == ch->out_of_order) || (msg->mid.mid == ch->mid_recv.mid) ) ) { - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Giving %u bytes of payload with MID %u from %s to client %s\n", + (unsigned int) payload_size, + ntohl (msg->mid.mid), + GCCH_2s (ch), + GSC_2s (ccc->c)); + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, env); ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); ch->mid_futures >>= 1; + send_channel_data_ack (ch); + return; } - else + + if (GNUNET_YES == ch->reliable) { - /* FIXME-SECURITY: if the element is WAY too far ahead, - drop it (can't buffer too much!) */ - com = GNUNET_new (struct CadetOutOfOrderMessage); - com->mid = msg->mid; - com->env = env; - /* sort into list ordered by "is_before" */ - if ( (NULL == ch->head_recv) || - (GNUNET_YES == is_before (ch, - com, - ch->head_recv)) ) + /* check if message ought to be dropped because it is ancient/too distant/duplicate */ + mid_min = ntohl (ch->mid_recv.mid); + mid_max = mid_min + ch->max_pending_messages; + mid_msg = ntohl (msg->mid.mid); + if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) || + ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) ) { - GNUNET_CONTAINER_DLL_insert (ch->head_recv, - ch->tail_recv, - com); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s at %u drops ancient or far-future message %u\n", + GCCH_2s (ch), + (unsigned int) mid_min, + ntohl (msg->mid.mid)); + + GNUNET_STATISTICS_update (stats, + "# duplicate DATA (ancient or future)", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + send_channel_data_ack (ch); + return; } - else + /* mark bit for future ACKs */ + delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */ + if (delta < 64) { - struct CadetOutOfOrderMessage *pos; - - for (pos = ch->head_recv; - NULL != pos; - pos = pos->next) + if (0 != (ch->mid_futures & (1LLU << delta))) { - if (GNUNET_YES != - is_before (ch, - pos, - com)) - break; + /* Duplicate within the queue, drop also */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate payload of %u bytes on %s (mid %u) dropped\n", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (msg->mid.mid)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + send_channel_data_ack (ch); + return; } - if (NULL == pos) - GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, - ch->tail_recv, - com); - else - GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, - ch->tail_recv, - com, - pos->prev); + ch->mid_futures |= (1LLU << delta); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Marked bit %llX for mid %u (base: %u); now: %llX\n", + (1LLU << delta), + mid_msg, + mid_min, + ch->mid_futures); } } -} - - -/** - * We got an acknowledgement for payload data for a channel. - * Possibly resume transmissions. - * - * @param ch channel that got the ack - * @param ack details about what was received - */ -void -GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, - const struct GNUNET_CADET_ChannelDataAckMessage *ack) -{ - struct CadetReliableMessage *crm; - - if (GNUNET_NO == ch->reliable) + else /* ! ch->reliable */ { - /* not expecting ACKs on unreliable channel, odd */ - GNUNET_break_op (0); - return; + /* Channel is unreliable, so we do not ACK. But we also cannot + allow buffering everything, so check if we have space... */ + if (ccc->num_recv >= ch->max_pending_messages) + { + struct CadetOutOfOrderMessage *drop; + + /* Yep, need to drop. Drop the oldest message in + the buffer. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue full due slow client on %s, dropping oldest message\n", + GCCH_2s (ch)); + GNUNET_STATISTICS_update (stats, + "# messages dropped due to slow client", + 1, + GNUNET_NO); + drop = ccc->head_recv; + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + drop); + ccc->num_recv--; + GNUNET_MQ_discard (drop->env); + GNUNET_free (drop); + } } - for (crm = ch->head_sent; - NULL != crm; - crm = crm->next) - if (ack->mid.mid == crm->data_message.mid.mid) - break; - if (NULL == crm) + + /* Insert message into sorted out-of-order queue */ + com = GNUNET_new (struct CadetOutOfOrderMessage); + com->mid = msg->mid; + com->env = env; + duplicate = GNUNET_NO; + GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, + is_before, + &duplicate, + ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv++; + if (GNUNET_YES == duplicate) { - /* ACK for message we already dropped, might have been a - duplicate ACK? Ignore. */ + /* Duplicate within the queue, drop also (this is not covered by + the case above if "delta" >= 64, which could be the case if + max_pending_messages is also >= 64 or if our client is unready + and we are seeing retransmissions of the message our client is + blocked on. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate payload of %u bytes on %s (mid %u) dropped\n", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (msg->mid.mid)); GNUNET_STATISTICS_update (stats, - "# duplicate DATA_ACKs", + "# duplicate DATA", 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv--; + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + send_channel_data_ack (ch); return; } - GNUNET_CONTAINER_DLL_remove (ch->head_sent, - ch->tail_sent, - crm); - ch->pending_messages--; - GNUNET_free (crm); - GNUNET_assert (ch->pending_messages < ch->max_pending_messages); - send_ack_to_client (ch, - (NULL == ch->owner) ? ch->dest : ch->owner); -} - - -/** - * Destroy channel, based on the other peer closing the - * connection. Also needs to remove this channel from - * the tunnel. - * - * @param ch channel to destroy - */ -void -GCCH_handle_remote_destroy (struct CadetChannel *ch) -{ - struct GNUNET_MQ_Envelope *env; - struct GNUNET_CADET_LocalChannelDestroyMessage *tdm; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received remote channel DESTROY for channel %s\n", - GCCH_2s (ch)); - ch->destroy = GNUNET_YES; - env = GNUNET_MQ_msg (tdm, - GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); - tdm->ccn = ch->ccn; - GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest, - env); - channel_destroy (ch); + "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n", + (GNUNET_YES == ccc->client_ready) + ? "out-of-order" + : "client-not-ready", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (ccc->ccn.channel_of_client), + ccc, + ntohl (msg->mid.mid), + ntohl (ch->mid_recv.mid)); + /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and + the sender may already be transmitting the previous one. Needs + experimental evaluation to see if/when this ACK helps or + hurts. (We might even want another option.) */ + send_channel_data_ack (ch); } @@ -1029,9 +1352,12 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) * wait for ACK (or retransmit). * * @param cls the `struct CadetReliableMessage` that was sent + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -data_sent_cb (void *cls); +data_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid); /** @@ -1046,64 +1372,227 @@ retry_transmission (void *cls) struct CadetChannel *ch = cls; struct CadetReliableMessage *crm = ch->head_sent; - ch->retry_task = NULL; + ch->retry_data_task = NULL; GNUNET_assert (NULL == crm->qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Retrying transmission on %s of message %u\n", + GCCH_2s (ch), + (unsigned int) ntohl (crm->data_message->mid.mid)); crm->qe = GCT_send (ch->t, - &crm->data_message.header, + &crm->data_message->header, &data_sent_cb, crm); + GNUNET_assert (NULL == ch->retry_data_task); } /** - * Check if we can now allow the client to transmit, and if so, - * let the client know about it. + * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from + * the queue and tell our client that it can send more. * - * @param ch channel to check + * @param ch the channel that got the PLAINTEXT_DATA_ACK + * @param cti identifier of the connection that delivered the message + * @param crm the message that got acknowledged */ static void -GCCH_check_allow_client (struct CadetChannel *ch) +handle_matching_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, + struct CadetReliableMessage *crm) { - struct GNUNET_MQ_Envelope *env; - struct GNUNET_CADET_LocalAck *msg; + GNUNET_CONTAINER_DLL_remove (ch->head_sent, + ch->tail_sent, + crm); + ch->pending_messages--; + GNUNET_assert (ch->pending_messages < ch->max_pending_messages); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", + GCCH_2s (ch), + (unsigned int) ntohl (crm->data_message->mid.mid), + ch->pending_messages); + if (NULL != crm->qe) + { + GCT_send_cancel (crm->qe); + crm->qe = NULL; + } + if ( (1 == crm->num_transmissions) && + (NULL != cti) ) + { + GCC_ack_observed (cti); + if (0 == memcmp (cti, + &crm->connection_taken, + sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier))) + { + GCC_latency_observed (cti, + GNUNET_TIME_absolute_get_duration (crm->first_transmission_time)); + } + } + GNUNET_free (crm->data_message); + GNUNET_free (crm); + send_ack_to_client (ch, + (NULL == ch->owner) + ? GNUNET_NO + : GNUNET_YES); +} + + +/** + * We got an acknowledgement for payload data for a channel. + * Possibly resume transmissions. + * + * @param ch channel that got the ack + * @param cti identifier of the connection that delivered the message + * @param ack details about what was received + */ +void +GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, + const struct GNUNET_CADET_ChannelDataAckMessage *ack) +{ + struct CadetReliableMessage *crm; + struct CadetReliableMessage *crmn; + int found; + uint32_t mid_base; + uint64_t mid_mask; + unsigned int delta; - if (GNUNET_YES == ch->client_allowed) - return; /* client already allowed! */ - if (CADET_CHANNEL_READY != ch->state) + GNUNET_break (GNUNET_NO == ch->is_loopback); + if (GNUNET_NO == ch->reliable) { - /* destination did not yet ACK our CREATE! */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Channel %s not yet ready, throttling client until ACK.\n", - GCCH_2s (ch)); + /* not expecting ACKs on unreliable channel, odd */ + GNUNET_break_op (0); return; } - if (ch->pending_messages > ch->max_pending_messages) + /* mid_base is the MID of the next message that the + other peer expects (i.e. that is missing!), everything + LOWER (but excluding mid_base itself) was received. */ + mid_base = ntohl (ack->mid.mid); + mid_mask = GNUNET_htonll (ack->futures); + found = GNUNET_NO; + for (crm = ch->head_sent; + NULL != crm; + crm = crmn) { - /* Too many messages in queue. */ + crmn = crm->next; + delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base); + if (delta >= UINT_MAX - ch->max_pending_messages) + { + /* overflow, means crm was a bit in the past, so this ACK counts for it. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got DATA_ACK with base %u satisfying past message %u on %s\n", + (unsigned int) mid_base, + ntohl (crm->data_message->mid.mid), + GCCH_2s (ch)); + handle_matching_ack (ch, + cti, + crm); + found = GNUNET_YES; + continue; + } + delta--; + if (delta >= 64) + continue; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Message queue still too long on channel %s, throttling client until ACK.\n", - GCCH_2s (ch)); - return; + "Testing bit %llX for mid %u (base: %u)\n", + (1LLU << delta), + ntohl (crm->data_message->mid.mid), + mid_base); + if (0 != (mid_mask & (1LLU << delta))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got DATA_ACK with mask for %u on %s\n", + ntohl (crm->data_message->mid.mid), + GCCH_2s (ch)); + handle_matching_ack (ch, + cti, + crm); + found = GNUNET_YES; + } } - if ( (NULL != ch->head_sent) && - (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) ) + if (GNUNET_NO == found) { + /* ACK for message we already dropped, might have been a + duplicate ACK? Ignore. */ LOG (GNUNET_ERROR_TYPE_DEBUG, - "Gap in ACKs too big on channel %s, throttling client until ACK.\n", + "Duplicate DATA_ACK on %s, ignoring\n", GCCH_2s (ch)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA_ACKs", + 1, + GNUNET_NO); return; } - ch->client_allowed = GNUNET_YES; + if (NULL != ch->retry_data_task) + { + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task = NULL; + } + if ( (NULL != ch->head_sent) && + (NULL == ch->head_sent->qe) ) + ch->retry_data_task + = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, + &retry_transmission, + ch); +} + +/** + * Destroy channel, based on the other peer closing the + * connection. Also needs to remove this channel from + * the tunnel. + * + * @param ch channel to destroy + * @param cti identifier of the connection that delivered the message, + * NULL if we are simulating receiving a destroy due to shutdown + */ +void +GCCH_handle_remote_destroy (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) +{ + struct CadetChannelClient *ccc; + GNUNET_assert (GNUNET_NO == ch->is_loopback); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending local ack to channel %s client\n", + "Received remote channel DESTROY for %s\n", GCCH_2s (ch)); - env = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); - msg->ccn = ch->ccn; - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, - env); + if (GNUNET_YES == ch->destroy) + { + /* Local client already gone, this is instant-death. */ + channel_destroy (ch); + return; + } + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if (NULL != ccc->head_recv) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Lost end of transmission due to remote shutdown on %s\n", + GCCH_2s (ch)); + /* FIXME: change API to notify client about truncated transmission! */ + } + ch->destroy = GNUNET_YES; + GSC_handle_remote_channel_destroy (ccc->c, + ccc->ccn, + ch); + channel_destroy (ch); +} + + +/** + * Test if element @a e1 comes before element @a e2. + * + * @param cls closure, to a flag where we indicate duplicate packets + * @param crm1 an element of to sort + * @param crm2 another element to sort + * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO + */ +static int +cmp_crm_by_next_retry (void *cls, + struct CadetReliableMessage *crm1, + struct CadetReliableMessage *crm2) +{ + if (crm1->next_retry.abs_value_us < + crm2->next_retry.abs_value_us) + return GNUNET_YES; + return GNUNET_NO; } @@ -1114,61 +1603,85 @@ GCCH_check_allow_client (struct CadetChannel *ch) * wait for ACK (or retransmit). * * @param cls the `struct CadetReliableMessage` that was sent + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -data_sent_cb (void *cls) +data_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetReliableMessage *crm = cls; struct CadetChannel *ch = crm->ch; - struct CadetReliableMessage *off; + GNUNET_assert (GNUNET_NO == ch->is_loopback); + GNUNET_assert (NULL != crm->qe); crm->qe = NULL; GNUNET_CONTAINER_DLL_remove (ch->head_sent, ch->tail_sent, crm); if (GNUNET_NO == ch->reliable) { + GNUNET_free (crm->data_message); GNUNET_free (crm); ch->pending_messages--; - GCCH_check_allow_client (ch); + send_ack_to_client (ch, + (NULL == ch->owner) + ? GNUNET_NO + : GNUNET_YES); return; } - if (0 == crm->retry_delay.rel_value_us) - crm->retry_delay = ch->expected_delay; - crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay); - - /* find position for re-insertion into the DLL */ - if ( (NULL == ch->head_sent) || - (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) ) + if (NULL == cid) { - /* insert at HEAD, also (re)schedule retry task! */ - GNUNET_CONTAINER_DLL_insert (ch->head_sent, - ch->tail_sent, - crm); - if (NULL != ch->retry_task) - GNUNET_SCHEDULER_cancel (ch->retry_task); - ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay, - &retry_transmission, - ch); - return; + /* There was an error sending. */ + crm->num_transmissions = GNUNET_SYSERR; } - for (off = ch->head_sent; NULL != off; off = off->next) - if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us) - break; - if (NULL == off) + else if (GNUNET_SYSERR != crm->num_transmissions) { - /* insert at tail */ - GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent, + /* Increment transmission counter, and possibly store @a cid + if this was the first transmission. */ + crm->num_transmissions++; + if (1 == crm->num_transmissions) + { + crm->first_transmission_time = GNUNET_TIME_absolute_get (); + crm->connection_taken = *cid; + GCC_ack_expected (cid); + } + } + if ( (0 == crm->retry_delay.rel_value_us) && + (NULL != cid) ) + { + struct CadetConnection *cc = GCC_lookup (cid); + + if (NULL != cc) + crm->retry_delay = GCC_get_metrics (cc)->aged_latency; + else + crm->retry_delay = ch->retry_time; + } + crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay); + crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay, + MIN_RTT_DELAY); + crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay); + + GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage, + cmp_crm_by_next_retry, + NULL, + ch->head_sent, ch->tail_sent, crm); - } - else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message %u sent, next transmission on %s in %s\n", + (unsigned int) ntohl (crm->data_message->mid.mid), + GCCH_2s (ch), + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry), + GNUNET_YES)); + if (NULL == ch->head_sent->qe) { - /* insert before off */ - GNUNET_CONTAINER_DLL_insert_after (ch->head_sent, - ch->tail_sent, - off->prev, - crm); + if (NULL != ch->retry_data_task) + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task + = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, + &retry_transmission, + ch); } } @@ -1181,102 +1694,221 @@ data_sent_cb (void *cls) * buffer space in the tunnel. * * @param ch Channel. - * @param message payload to transmit. + * @param sender_ccn ccn of the sender + * @param buf payload to transmit. + * @param buf_len number of bytes in @a buf * @return #GNUNET_OK if everything goes well, * #GNUNET_SYSERR in case of an error. */ int GCCH_handle_local_data (struct CadetChannel *ch, - const struct GNUNET_MessageHeader *message) + struct GNUNET_CADET_ClientChannelNumber sender_ccn, + const char *buf, + size_t buf_len) { - uint16_t payload_size = ntohs (message->size); struct CadetReliableMessage *crm; - if (GNUNET_NO == ch->client_allowed) + if (ch->pending_messages > ch->max_pending_messages) { - GNUNET_break_op (0); + GNUNET_break (0); return GNUNET_SYSERR; } - ch->client_allowed = GNUNET_NO; + if (GNUNET_YES == ch->destroy) + { + /* we are going down, drop messages */ + return GNUNET_OK; + } ch->pending_messages++; + if (GNUNET_YES == ch->is_loopback) + { + struct CadetChannelClient *receiver; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_LocalData *ld; + int to_owner; + + env = GNUNET_MQ_msg_extra (ld, + buf_len, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); + if ( (NULL != ch->owner) && + (sender_ccn.channel_of_client == + ch->owner->ccn.channel_of_client) ) + { + receiver = ch->dest; + to_owner = GNUNET_NO; + } + else if ( (NULL != ch->dest) && + (sender_ccn.channel_of_client == + ch->dest->ccn.channel_of_client) ) + { + receiver = ch->owner; + to_owner = GNUNET_YES; + } + else + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + ld->ccn = receiver->ccn; + GNUNET_memcpy (&ld[1], + buf, + buf_len); + if (GNUNET_YES == receiver->client_ready) + { + GSC_send_to_client (receiver->c, + env); + send_ack_to_client (ch, + to_owner); + } + else + { + struct CadetOutOfOrderMessage *oom; + + oom = GNUNET_new (struct CadetOutOfOrderMessage); + oom->env = env; + GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv, + receiver->tail_recv, + oom); + receiver->num_recv++; + } + return GNUNET_OK; + } + /* Everything is correct, send the message. */ - crm = GNUNET_malloc (sizeof (*crm) + payload_size); + crm = GNUNET_malloc (sizeof (*crm)); crm->ch = ch; - crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size); - crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); + crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + + buf_len); + crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len); + crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1); - crm->data_message.mid = ch->mid_send; - crm->data_message.ctn = ch->ctn; - GNUNET_memcpy (&crm[1], - message, - payload_size); - GNUNET_CONTAINER_DLL_insert (ch->head_sent, - ch->tail_sent, - crm); + crm->data_message->mid = ch->mid_send; + crm->data_message->ctn = ch->ctn; + GNUNET_memcpy (&crm->data_message[1], + buf, + buf_len); + GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent, + ch->tail_sent, + crm); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes from local client to channel %s\n", - payload_size, - GCCH_2s (ch)); + "Sending message %u from local client to %s with %u bytes\n", + ntohl (crm->data_message->mid.mid), + GCCH_2s (ch), + buf_len); + if (NULL != ch->retry_data_task) + { + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task = NULL; + } crm->qe = GCT_send (ch->t, - &crm->data_message.header, + &crm->data_message->header, &data_sent_cb, crm); - GCCH_check_allow_client (ch); + GNUNET_assert (NULL == ch->retry_data_task); return GNUNET_OK; } /** - * Try to deliver messages to the local client, if it is ready for more. + * Handle ACK from client on local channel. Means the client is ready + * for more data, see if we have any for it. * - * @param ch channel to process + * @param ch channel to destroy + * @param client_ccn ccn of the client sending the ack */ -static void -send_client_buffered_data (struct CadetChannel *ch) +void +GCCH_handle_local_ack (struct CadetChannel *ch, + struct GNUNET_CADET_ClientChannelNumber client_ccn) { + struct CadetChannelClient *ccc; struct CadetOutOfOrderMessage *com; - if (GNUNET_NO == ch->client_ready) - return; /* client not ready */ - com = ch->head_recv; + if ( (NULL != ch->owner) && + (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->owner; + else if ( (NULL != ch->dest) && + (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->dest; + else + GNUNET_assert (0); + ccc->client_ready = GNUNET_YES; + com = ccc->head_recv; if (NULL == com) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n", + GSC_2s (ccc->c), + ntohl (client_ccn.channel_of_client), + GCCH_2s (ch), + ntohl (ccc->ccn.channel_of_client), + ccc); return; /* none pending */ + } + if (GNUNET_YES == ch->is_loopback) + { + int to_owner; + + /* Messages are always in-order, just send */ + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv--; + GSC_send_to_client (ccc->c, + com->env); + /* Notify sender that we can receive more */ + if (ccc->ccn.channel_of_client == + ch->owner->ccn.channel_of_client) + { + to_owner = GNUNET_NO; + } + else + { + GNUNET_assert (ccc->ccn.channel_of_client == + ch->dest->ccn.channel_of_client); + to_owner = GNUNET_YES; + } + send_ack_to_client (ch, + to_owner); + GNUNET_free (com); + return; + } + if ( (com->mid.mid != ch->mid_recv.mid) && - (GNUNET_NO == ch->out_of_order) ) + (GNUNET_NO == ch->out_of_order) && + (GNUNET_YES == ch->reliable) ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n", + GSC_2s (ccc->c), + ntohl (ccc->ccn.channel_of_client), + ntohl (com->mid.mid), + ntohl (ch->mid_recv.mid)); return; /* missing next one in-order */ + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Passing payload message to client on channel %s\n", - GCCH_2s (ch)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n", + ntohl (com->mid.mid), + GSC_2s (ccc->c), + ntohl (ccc->ccn.channel_of_client), + GCCH_2s (ch)); /* all good, pass next message to client */ - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, com); + ccc->num_recv--; /* FIXME: if unreliable, this is not aggressive enough, as it would be OK to have lost some! */ + ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); ch->mid_futures >>= 1; /* equivalent to division by 2 */ - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, com->env); GNUNET_free (com); - if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && - (GNUNET_YES == ch->reliable) ) - { - /* The next 15 messages were also already received (0xFF), this - suggests that the sender may be blocked on flow control - urgently waiting for an ACK from us. (As we have an inherent - maximum of 64 bits, and 15 is getting too close for comfort.) - So we should send one now. */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sender on channel %s likely blocked on flow-control, sending ACK now.\n", - GCCH_2s (ch)); - if (GNUNET_YES == ch->reliable) - send_channel_data_ack (ch); - } - - if (NULL != ch->head_recv) + send_channel_data_ack (ch); + if (NULL != ccc->head_recv) return; if (GNUNET_NO == ch->destroy) return; @@ -1286,19 +1918,6 @@ send_client_buffered_data (struct CadetChannel *ch) } -/** - * Handle ACK from client on local channel. - * - * @param ch channel to destroy - */ -void -GCCH_handle_local_ack (struct CadetChannel *ch) -{ - ch->client_ready = GNUNET_YES; - send_client_buffered_data (ch); -} - - #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) @@ -1326,7 +1945,7 @@ GCCH_debug (struct CadetChannel *ch, return; } LOG2 (level, - "CHN Channel %s:%X (%p)\n", + "CHN %s:%X (%p)\n", GCT_2s (ch->t), ch->ctn, ch); @@ -1334,17 +1953,17 @@ GCCH_debug (struct CadetChannel *ch, { LOG2 (level, "CHN origin %s ready %s local-id: %u\n", - GSC_2s (ch->owner), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn.channel_of_client)); + GSC_2s (ch->owner->c), + ch->owner->client_ready ? "YES" : "NO", + ntohl (ch->owner->ccn.channel_of_client)); } if (NULL != ch->dest) { LOG2 (level, "CHN destination %s ready %s local-id: %u\n", - GSC_2s (ch->dest), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn.channel_of_client)); + GSC_2s (ch->dest->c), + ch->dest->client_ready ? "YES" : "NO", + ntohl (ch->dest->ccn.channel_of_client)); } LOG2 (level, "CHN Message IDs recv: %d (%LLX), send: %d\n",