X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcadet%2Fgnunet-service-cadet-new_channel.c;h=9d9edc28dae8b5db3303288c9845b7533e3d9180;hb=dd5a385e16168f9d4fe43dde53e49b77a15afa6e;hp=74aafe5a1780030950a7b67e66447358e6f27eff;hpb=3bdda27bbeb0b69bd2c5a73022b237ae0342e9c1;p=oweals%2Fgnunet.git diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 74aafe5a1..9d9edc28d 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -1,4 +1,3 @@ - /* This file is part of GNUnet. Copyright (C) 2001-2017 GNUnet e.V. @@ -25,13 +24,14 @@ * @author Christian Grothoff * * TODO: - * - introduce shutdown so we can have half-closed channels, modify - * destroy to include MID to have FIN-ACK equivalents, etc. - * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! - * - check that '0xFFULL' really is sufficient for flow control! - * - revisit handling of 'unreliable' traffic! - * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'. - * - figure out flow control without ACKs (unreliable traffic!) + * - Congestion/flow control: + * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL! + * (and figure out how/where to use this!) + * + figure out flow control without ACKs (unreliable traffic!) + * - revisit handling of 'unbuffered' traffic! + * (need to push down through tunnel into connection selection) + * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe + * reserve more bits in 'options' to allow for buffer size control? */ #include "platform.h" #include "gnunet_util_lib.h" @@ -57,6 +57,23 @@ */ #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) +/** + * How long do we wait at least before retransmitting ever? + */ +#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75) + +/** + * Maximum message ID into the future we accept for out-of-order messages. + * If the message is more than this into the future, we drop it. This is + * important both to detect values that are actually in the past, as well + * as to limit adversarially triggerable memory consumption. + * + * Note that right now we have "max_pending_messages = 4" hard-coded in + * the logic below, so a value of 4 would suffice here. But we plan to + * allow larger windows in the future... + */ +#define MAX_OUT_OF_ORDER_DISTANCE 1024 + /** * All the states a connection can be in. @@ -109,6 +126,11 @@ struct CadetReliableMessage */ struct CadetTunnelQueueEntry *qe; + /** + * Data message we are trying to send. + */ + struct GNUNET_CADET_ChannelAppDataMessage *data_message; + /** * How soon should we retry if we fail to get an ACK? * Messages in the queue are sorted by this value. @@ -122,11 +144,24 @@ struct CadetReliableMessage struct GNUNET_TIME_Relative retry_delay; /** - * Data message we are trying to send. + * Time when we first successfully transmitted the message + * (that is, set @e num_transmissions to 1). */ - struct GNUNET_CADET_ChannelAppDataMessage data_message; + struct GNUNET_TIME_Absolute first_transmission_time; + + /** + * Identifier of the connection that this message took when it + * was first transmitted. Only useful if @e num_transmissions is 1. + */ + struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken; + + /** + * How often was this message transmitted? #GNUNET_SYSERR if there + * was an error transmitting the message, #GNUNET_NO if it was not + * yet transmitted ever, otherwise the number of (re) transmissions. + */ + int num_transmissions; - /* followed by variable-size payload */ }; @@ -159,6 +194,51 @@ struct CadetOutOfOrderMessage }; +/** + * Client endpoint of a `struct CadetChannel`. A channel may be a + * loopback channel, in which case it has two of these endpoints. + * Note that flow control also is required in both directions. + */ +struct CadetChannelClient +{ + /** + * Client handle. Not by itself sufficient to designate + * the client endpoint, as the same client handle may + * be used for both the owner and the destination, and + * we thus also need the channel ID to identify the client. + */ + struct CadetClient *c; + + /** + * Head of DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *head_recv; + + /** + * Tail DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *tail_recv; + + /** + * Local tunnel number for this client. + * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI, + * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) + */ + struct GNUNET_CADET_ClientChannelNumber ccn; + + /** + * Number of entries currently in @a head_recv DLL. + */ + unsigned int num_recv; + + /** + * Can we send data to the client? + */ + int client_ready; + +}; + + /** * Struct containing all information regarding a channel to a remote client. */ @@ -173,13 +253,13 @@ struct CadetChannel * Client owner of the tunnel, if any. * (Used if this channel represends the initiating end of the tunnel.) */ - struct CadetClient *owner; + struct CadetChannelClient *owner; /** * Client destination of the tunnel, if any. * (Used if this channel represents the listening end of the tunnel.) */ - struct CadetClient *dest; + struct CadetChannelClient *dest; /** * Last entry in the tunnel's queue relating to control messages @@ -199,16 +279,6 @@ struct CadetChannel */ struct CadetReliableMessage *tail_sent; - /** - * Head of DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *head_recv; - - /** - * Tail DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *tail_recv; - /** * Task to resend/poll in case no ACK is received. */ @@ -234,11 +304,6 @@ struct CadetChannel */ struct GNUNET_TIME_Relative retry_time; - /** - * How long does it usually take to get an ACK. - */ - struct GNUNET_TIME_Relative expected_delay; - /** * Bitfield of already-received messages past @e mid_recv. */ @@ -270,27 +335,16 @@ struct CadetChannel */ struct GNUNET_CADET_ChannelTunnelNumber ctn; - /** - * Local tunnel number for local client @e owner owning the channel. - * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) - */ - struct GNUNET_CADET_ClientChannelNumber ccn_owner; - - /** - * Local tunnel number for local client @e dest owning the channel. - * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) - */ - struct GNUNET_CADET_ClientChannelNumber ccn_dest; - /** * Channel state. */ enum CadetChannelState state; /** - * Can we send data to the client? + * Count how many ACKs we skipped, used to prevent long + * sequences of ACK skipping. */ - int client_ready; + unsigned int skip_ack_series; /** * Is the tunnel bufferless (minimum latency)? @@ -342,8 +396,8 @@ GCCH_2s (const struct CadetChannel *ch) : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), GNUNET_h2s (&ch->port), ch->ctn, - ntohl (ch->ccn_owner.channel_of_client), - ntohl (ch->ccn_dest.channel_of_client)); + (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client), + (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client)); return buf; } @@ -362,6 +416,29 @@ GCCH_get_id (const struct CadetChannel *ch) } +/** + * Release memory associated with @a ccc + * + * @param ccc data structure to clean up + */ +static void +free_channel_client (struct CadetChannelClient *ccc) +{ + struct CadetOutOfOrderMessage *com; + + while (NULL != (com = ccc->head_recv)) + { + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv--; + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + } + GNUNET_free (ccc); +} + + /** * Destroy the given channel. * @@ -371,7 +448,6 @@ static void channel_destroy (struct CadetChannel *ch) { struct CadetReliableMessage *crm; - struct CadetOutOfOrderMessage *com; while (NULL != (crm = ch->head_sent)) { @@ -384,15 +460,18 @@ channel_destroy (struct CadetChannel *ch) GNUNET_CONTAINER_DLL_remove (ch->head_sent, ch->tail_sent, crm); + GNUNET_free (crm->data_message); GNUNET_free (crm); } - while (NULL != (com = ch->head_recv)) + if (NULL != ch->owner) { - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, - com); - GNUNET_MQ_discard (com->env); - GNUNET_free (com); + free_channel_client (ch->owner); + ch->owner = NULL; + } + if (NULL != ch->dest) + { + free_channel_client (ch->dest); + ch->dest = NULL; } if (NULL != ch->last_control_qe) { @@ -434,9 +513,12 @@ send_channel_open (void *cls); * create message. Delays for a bit until we retry. * * @param cls our `struct CadetChannel`. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -channel_open_sent_cb (void *cls) +channel_open_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetChannel *ch = cls; @@ -444,7 +526,7 @@ channel_open_sent_cb (void *cls) ch->last_control_qe = NULL; ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sent CHANNEL_OPEN on %s, retrying in %s\n", + "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n", GCCH_2s (ch), GNUNET_STRINGS_relative_time_to_string (ch->retry_time, GNUNET_YES)); @@ -484,10 +566,13 @@ send_channel_open (void *cls) msgcc.port = ch->port; msgcc.ctn = ch->ctn; ch->state = CADET_CHANNEL_OPEN_SENT; + if (NULL != ch->last_control_qe) + GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = GCT_send (ch->t, &msgcc.header, &channel_open_sent_cb, ch); + GNUNET_assert (NULL == ch->retry_control_task); } @@ -532,23 +617,31 @@ GCCH_channel_local_new (struct CadetClient *owner, uint32_t options) { struct CadetChannel *ch; + struct CadetChannelClient *ccco; + + ccco = GNUNET_new (struct CadetChannelClient); + ccco->c = owner; + ccco->ccn = ccn; + ccco->client_ready = GNUNET_YES; ch = GNUNET_new (struct CadetChannel); + ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */ ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ - ch->owner = owner; - ch->ccn_owner = ccn; + ch->owner = ccco; ch->port = *port; if (0 == memcmp (&my_full_id, GCP_get_id (destination), sizeof (struct GNUNET_PeerIdentity))) { + struct CadetClient *c; + ch->is_loopback = GNUNET_YES; - ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports, - port); - if (NULL == ch->dest) + c = GNUNET_CONTAINER_multihashmap_get (open_ports, + port); + if (NULL == c) { /* port closed, wait for it to possibly open */ (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, @@ -561,8 +654,11 @@ GCCH_channel_local_new (struct CadetClient *owner, } else { + ch->dest = GNUNET_new (struct CadetChannelClient); + ch->dest->c = c; + ch->dest->client_ready = GNUNET_YES; GCCH_bind (ch, - ch->dest); + ch->dest->c); } } else @@ -676,9 +772,12 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, * ACKs for ACKs ;-). * * @param cls our `struct CadetChannel`. + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -send_ack_cb (void *cls) +send_ack_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetChannel *ch = cls; @@ -697,11 +796,18 @@ send_channel_data_ack (struct CadetChannel *ch) { struct GNUNET_CADET_ChannelDataAckMessage msg; + if (GNUNET_NO == ch->reliable) + return; /* no ACKs */ msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); msg.header.size = htons (sizeof (msg)); msg.ctn = ch->ctn; - msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1); + msg.mid.mid = htonl (ntohl (ch->mid_recv.mid)); msg.futures = GNUNET_htonll (ch->mid_futures); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending DATA_ACK %u:%llX via %s\n", + (unsigned int) ntohl (msg.mid.mid), + (unsigned long long) ch->mid_futures, + GCCH_2s (ch)); if (NULL != ch->last_control_qe) GCT_send_cancel (ch->last_control_qe); ch->last_control_qe = GCT_send (ch->t, @@ -746,26 +852,28 @@ send_open_ack (void *cls) * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK. * * @param ch channel that got the duplicate open + * @param cti identifier of the connection that delivered the message */ void -GCCH_handle_duplicate_open (struct CadetChannel *ch) +GCCH_handle_duplicate_open (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { if (NULL == ch->dest) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring duplicate channel OPEN on %s: port is closed\n", + "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n", GCCH_2s (ch)); return; } if (NULL != ch->retry_control_task) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring duplicate channel OPEN on %s: control message is pending\n", + "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n", GCCH_2s (ch)); return; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Retransmitting OPEN_ACK on %s\n", + "Retransmitting CHANNEL_OPEN_ACK on %s\n", GCCH_2s (ch)); ch->retry_control_task = GNUNET_SCHEDULER_add_now (&send_open_ack, @@ -786,20 +894,27 @@ send_ack_to_client (struct CadetChannel *ch, { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalAck *ack; - struct CadetClient *c; + struct CadetChannelClient *ccc; + ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest; + if (NULL == ccc) + { + /* This can happen if we are just getting ACKs after + our local client already disconnected. */ + GNUNET_assert (GNUNET_YES == ch->destroy); + return; + } env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); - ack->ccn = (GNUNET_YES == to_owner) ? ch->ccn_owner : ch->ccn_dest; - c = (GNUNET_YES == to_owner) - ? ch->owner - : ch->dest; + ack->ccn = ccc->ccn; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", - GSC_2s (c), + "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n", + GSC_2s (ccc->c), (GNUNET_YES == to_owner) ? "owner" : "dest", - ntohl (ack->ccn.channel_of_client)); - GSC_send_to_client (c, + ntohl (ack->ccn.channel_of_client), + ch->pending_messages, + ch->max_pending_messages); + GSC_send_to_client (ccc->c, env); } @@ -817,6 +932,7 @@ GCCH_bind (struct CadetChannel *ch, struct CadetClient *c) { uint32_t options; + struct CadetChannelClient *cccd; LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding %s from %s to port %s of %s\n", @@ -837,22 +953,26 @@ GCCH_bind (struct CadetChannel *ch, options |= GNUNET_CADET_OPTION_RELIABLE; if (ch->out_of_order) options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; - ch->dest = c; - ch->ccn_dest = GSC_bind (c, - ch, - (GNUNET_YES == ch->is_loopback) - ? GCP_get (&my_full_id, - GNUNET_YES) - : GCT_get_destination (ch->t), - &ch->port, - options); - GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < + cccd = GNUNET_new (struct CadetChannelClient); + ch->dest = cccd; + cccd->c = c; + cccd->client_ready = GNUNET_YES; + cccd->ccn = GSC_bind (c, + ch, + (GNUNET_YES == ch->is_loopback) + ? GCP_get (&my_full_id, + GNUNET_YES) + : GCT_get_destination (ch->t), + &ch->port, + options); + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); - ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ + ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */ if (GNUNET_YES == ch->is_loopback) { ch->state = CADET_CHANNEL_OPEN_SENT; - GCCH_handle_channel_open_ack (ch); + GCCH_handle_channel_open_ack (ch, + NULL); } else { @@ -862,7 +982,7 @@ GCCH_bind (struct CadetChannel *ch, ch); } /* give client it's initial supply of ACKs */ - GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); for (unsigned int i=0;imax_pending_messages;i++) send_ack_to_client (ch, @@ -876,22 +996,37 @@ GCCH_bind (struct CadetChannel *ch, * * @param ch channel to destroy * @param c client that caused the destruction + * @param ccn client number of the client @a c */ void GCCH_channel_local_destroy (struct CadetChannel *ch, - struct CadetClient *c) + struct CadetClient *c, + struct GNUNET_CADET_ClientChannelNumber ccn) { LOG (GNUNET_ERROR_TYPE_DEBUG, "%s asks for destruction of %s\n", GSC_2s (c), GCCH_2s (ch)); GNUNET_assert (NULL != c); - if (c == ch->owner) + if ( (NULL != ch->owner) && + (c == ch->owner->c) && + (ccn.channel_of_client == ch->owner->ccn.channel_of_client) ) + { + free_channel_client (ch->owner); ch->owner = NULL; - else if (c == ch->dest) + } + else if ( (NULL != ch->dest) && + (c == ch->dest->c) && + (ccn.channel_of_client == ch->dest->ccn.channel_of_client) ) + { + free_channel_client (ch->dest); ch->dest = NULL; + } else + { GNUNET_assert (0); + } + if (GNUNET_YES == ch->destroy) { /* other end already destroyed, with the local client gone, no need @@ -909,7 +1044,10 @@ GCCH_channel_local_destroy (struct CadetChannel *ch, return; } /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */ - if (CADET_CHANNEL_NEW != ch->state) + if (CADET_CHANNEL_NEW == ch->state) + GSC_drop_loose_channel (&ch->port, + ch); + else GCT_send_channel_destroy (ch->t, ch->ctn); /* Nothing left to do, just finish destruction */ @@ -922,9 +1060,11 @@ GCCH_channel_local_destroy (struct CadetChannel *ch, * (the port is open on the other side). Begin transmissions. * * @param ch channel to destroy + * @param cti identifier of the connection that delivered the message */ void -GCCH_handle_channel_open_ack (struct CadetChannel *ch) +GCCH_handle_channel_open_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { switch (ch->state) { @@ -971,26 +1111,24 @@ GCCH_handle_channel_open_ack (struct CadetChannel *ch) /** * Test if element @a e1 comes before element @a e2. * - * TODO: use opportunity to create generic list insertion sort - * logic in container! - * - * @param cls closure, our `struct CadetChannel` - * @param e1 an element of to sort - * @param e2 another element to sort + * @param cls closure, to a flag where we indicate duplicate packets + * @param m1 a message of to sort + * @param m2 another message to sort * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO */ static int is_before (void *cls, - void *e1, - void *e2) + struct CadetOutOfOrderMessage *m1, + struct CadetOutOfOrderMessage *m2) { - struct CadetOutOfOrderMessage *m1 = e1; - struct CadetOutOfOrderMessage *m2 = e2; + int *duplicate = cls; uint32_t v1 = ntohl (m1->mid.mid); uint32_t v2 = ntohl (m2->mid.mid); uint32_t delta; - delta = v1 - v2; + delta = v2 - v1; + if (0 == delta) + *duplicate = GNUNET_YES; if (delta > (uint32_t) INT_MAX) { /* in overflow range, we can safely assume we wrapped around */ @@ -998,6 +1136,7 @@ is_before (void *cls, } else { + /* result is small, thus v2 > v1, thus m1 < m2 */ return GNUNET_YES; } } @@ -1008,91 +1147,291 @@ is_before (void *cls, * and send an ACK to the other end (once flow control allows it!) * * @param ch channel that got data + * @param cti identifier of the connection that delivered the message + * @param msg message that was received */ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, const struct GNUNET_CADET_ChannelAppDataMessage *msg) { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; - struct CadetOutOfOrderMessage *com; + struct CadetChannelClient *ccc; size_t payload_size; + struct CadetOutOfOrderMessage *com; + int duplicate; + uint32_t mid_min; + uint32_t mid_max; + uint32_t mid_msg; + uint32_t delta; GNUNET_assert (GNUNET_NO == ch->is_loopback); + if ( (GNUNET_YES == ch->destroy) && + (NULL == ch->owner) && + (NULL == ch->dest) ) + { + /* This client is gone, but we still have messages to send to + the other end (which is why @a ch is not yet dead). However, + we cannot pass messages to our client anymore. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Dropping incoming payload on %s as this end is already closed\n", + GCCH_2s (ch)); + /* send back DESTROY notification to stop further retransmissions! */ + GCT_send_channel_destroy (ch->t, + ch->ctn); + return; + } payload_size = ntohs (msg->header.size) - sizeof (*msg); env = GNUNET_MQ_msg_extra (ld, payload_size, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest; + ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn; GNUNET_memcpy (&ld[1], &msg[1], payload_size); - if ( (GNUNET_YES == ch->client_ready) && + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if ( (GNUNET_YES == ccc->client_ready) && ( (GNUNET_YES == ch->out_of_order) || (msg->mid.mid == ch->mid_recv.mid) ) ) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Giving %u bytes of payload from %s to client %s\n", + "Giving %u bytes of payload with MID %u from %s to client %s\n", (unsigned int) payload_size, + ntohl (msg->mid.mid), GCCH_2s (ch), - GSC_2s (ch->owner ? ch->owner : ch->dest)); - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + GSC_2s (ccc->c)); + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, env); ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); ch->mid_futures >>= 1; + send_channel_data_ack (ch); + return; } - else - { - /* FIXME-SECURITY: if the element is WAY too far ahead, - drop it (can't buffer too much!) */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", - (GNUNET_YES == ch->client_ready) - ? "out-of-order" - : "client-not-ready", - (unsigned int) payload_size, - GCCH_2s (ch), - ntohl (msg->mid.mid), - ntohl (ch->mid_recv.mid)); - com = GNUNET_new (struct CadetOutOfOrderMessage); - com->mid = msg->mid; - com->env = env; - /* sort into list ordered by "is_before" */ - if ( (NULL == ch->head_recv) || - (GNUNET_YES == is_before (ch, - com, - ch->head_recv)) ) + if (GNUNET_YES == ch->reliable) + { + /* check if message ought to be dropped because it is ancient/too distant/duplicate */ + mid_min = ntohl (ch->mid_recv.mid); + mid_max = mid_min + ch->max_pending_messages; + mid_msg = ntohl (msg->mid.mid); + if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) || + ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) ) { - GNUNET_CONTAINER_DLL_insert (ch->head_recv, - ch->tail_recv, - com); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s at %u drops ancient or far-future message %u\n", + GCCH_2s (ch), + (unsigned int) mid_min, + ntohl (msg->mid.mid)); + + GNUNET_STATISTICS_update (stats, + "# duplicate DATA (ancient or future)", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + send_channel_data_ack (ch); + return; } - else + /* mark bit for future ACKs */ + delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */ + if (delta < 64) { - struct CadetOutOfOrderMessage *pos; - - for (pos = ch->head_recv; - NULL != pos; - pos = pos->next) + if (0 != (ch->mid_futures & (1LLU << delta))) { - if (GNUNET_YES != - is_before (ch, - pos, - com)) - break; + /* Duplicate within the queue, drop also */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate payload of %u bytes on %s (mid %u) dropped\n", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (msg->mid.mid)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA", + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + send_channel_data_ack (ch); + return; } - if (NULL == pos) - GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, - ch->tail_recv, - com); - else - GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, - ch->tail_recv, - com, - pos->prev); + ch->mid_futures |= (1LLU << delta); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Marked bit %llX for mid %u (base: %u); now: %llX\n", + (1LLU << delta), + mid_msg, + mid_min, + ch->mid_futures); } } + else /* ! ch->reliable */ + { + /* Channel is unreliable, so we do not ACK. But we also cannot + allow buffering everything, so check if we have space... */ + if (ccc->num_recv >= ch->max_pending_messages) + { + struct CadetOutOfOrderMessage *drop; + + /* Yep, need to drop. Drop the oldest message in + the buffer. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue full due slow client on %s, dropping oldest message\n", + GCCH_2s (ch)); + GNUNET_STATISTICS_update (stats, + "# messages dropped due to slow client", + 1, + GNUNET_NO); + drop = ccc->head_recv; + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + drop); + ccc->num_recv--; + GNUNET_MQ_discard (drop->env); + GNUNET_free (drop); + } + } + + /* Insert message into sorted out-of-order queue */ + com = GNUNET_new (struct CadetOutOfOrderMessage); + com->mid = msg->mid; + com->env = env; + duplicate = GNUNET_NO; + GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, + is_before, + &duplicate, + ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv++; + if (GNUNET_YES == duplicate) + { + /* Duplicate within the queue, drop also (this is not covered by + the case above if "delta" >= 64, which could be the case if + max_pending_messages is also >= 64 or if our client is unready + and we are seeing retransmissions of the message our client is + blocked on. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate payload of %u bytes on %s (mid %u) dropped\n", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (msg->mid.mid)); + GNUNET_STATISTICS_update (stats, + "# duplicate DATA", + 1, + GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv--; + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + send_channel_data_ack (ch); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n", + (GNUNET_YES == ccc->client_ready) + ? "out-of-order" + : "client-not-ready", + (unsigned int) payload_size, + GCCH_2s (ch), + ntohl (ccc->ccn.channel_of_client), + ccc, + ntohl (msg->mid.mid), + ntohl (ch->mid_recv.mid)); + /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and + the sender may already be transmitting the previous one. Needs + experimental evaluation to see if/when this ACK helps or + hurts. (We might even want another option.) */ + send_channel_data_ack (ch); +} + + +/** + * Function called once the tunnel has sent one of our messages. + * If the message is unreliable, simply frees the `crm`. If the + * message was reliable, calculate retransmission time and + * wait for ACK (or retransmit). + * + * @param cls the `struct CadetReliableMessage` that was sent + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed + */ +static void +data_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid); + + +/** + * We need to retry a transmission, the last one took too long to + * be acknowledged. + * + * @param cls the `struct CadetChannel` where we need to retransmit + */ +static void +retry_transmission (void *cls) +{ + struct CadetChannel *ch = cls; + struct CadetReliableMessage *crm = ch->head_sent; + + ch->retry_data_task = NULL; + GNUNET_assert (NULL == crm->qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Retrying transmission on %s of message %u\n", + GCCH_2s (ch), + (unsigned int) ntohl (crm->data_message->mid.mid)); + crm->qe = GCT_send (ch->t, + &crm->data_message->header, + &data_sent_cb, + crm); + GNUNET_assert (NULL == ch->retry_data_task); +} + + +/** + * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from + * the queue and tell our client that it can send more. + * + * @param ch the channel that got the PLAINTEXT_DATA_ACK + * @param cti identifier of the connection that delivered the message + * @param crm the message that got acknowledged + */ +static void +handle_matching_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, + struct CadetReliableMessage *crm) +{ + GNUNET_CONTAINER_DLL_remove (ch->head_sent, + ch->tail_sent, + crm); + ch->pending_messages--; + GNUNET_assert (ch->pending_messages < ch->max_pending_messages); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", + GCCH_2s (ch), + (unsigned int) ntohl (crm->data_message->mid.mid), + ch->pending_messages); + if (NULL != crm->qe) + { + GCT_send_cancel (crm->qe); + crm->qe = NULL; + } + if ( (1 == crm->num_transmissions) && + (NULL != cti) ) + { + GCC_ack_observed (cti); + if (0 == memcmp (cti, + &crm->connection_taken, + sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier))) + { + GCC_latency_observed (cti, + GNUNET_TIME_absolute_get_duration (crm->first_transmission_time)); + } + } + GNUNET_free (crm->data_message); + GNUNET_free (crm); + send_ack_to_client (ch, + (NULL == ch->owner) + ? GNUNET_NO + : GNUNET_YES); } @@ -1101,13 +1440,20 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, * Possibly resume transmissions. * * @param ch channel that got the ack + * @param cti identifier of the connection that delivered the message * @param ack details about what was received */ void GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti, const struct GNUNET_CADET_ChannelDataAckMessage *ack) { struct CadetReliableMessage *crm; + struct CadetReliableMessage *crmn; + int found; + uint32_t mid_base; + uint64_t mid_mask; + unsigned int delta; GNUNET_break (GNUNET_NO == ch->is_loopback); if (GNUNET_NO == ch->reliable) @@ -1116,12 +1462,53 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, GNUNET_break_op (0); return; } + /* mid_base is the MID of the next message that the + other peer expects (i.e. that is missing!), everything + LOWER (but excluding mid_base itself) was received. */ + mid_base = ntohl (ack->mid.mid); + mid_mask = GNUNET_htonll (ack->futures); + found = GNUNET_NO; for (crm = ch->head_sent; NULL != crm; - crm = crm->next) - if (ack->mid.mid == crm->data_message.mid.mid) - break; - if (NULL == crm) + crm = crmn) + { + crmn = crm->next; + delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base); + if (delta >= UINT_MAX - ch->max_pending_messages) + { + /* overflow, means crm was a bit in the past, so this ACK counts for it. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got DATA_ACK with base %u satisfying past message %u on %s\n", + (unsigned int) mid_base, + ntohl (crm->data_message->mid.mid), + GCCH_2s (ch)); + handle_matching_ack (ch, + cti, + crm); + found = GNUNET_YES; + continue; + } + delta--; + if (delta >= 64) + continue; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Testing bit %llX for mid %u (base: %u)\n", + (1LLU << delta), + ntohl (crm->data_message->mid.mid), + mid_base); + if (0 != (mid_mask & (1LLU << delta))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got DATA_ACK with mask for %u on %s\n", + ntohl (crm->data_message->mid.mid), + GCCH_2s (ch)); + handle_matching_ack (ch, + cti, + crm); + found = GNUNET_YES; + } + } + if (GNUNET_NO == found) { /* ACK for message we already dropped, might have been a duplicate ACK? Ignore. */ @@ -1134,25 +1521,17 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, GNUNET_NO); return; } - GNUNET_CONTAINER_DLL_remove (ch->head_sent, - ch->tail_sent, - crm); - ch->pending_messages--; - send_ack_to_client (ch, - (NULL == ch->owner) - ? GNUNET_NO - : GNUNET_YES); - GNUNET_free (crm); - GNUNET_assert (ch->pending_messages < ch->max_pending_messages); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", - GCCH_2s (ch), - (unsigned int) ntohl (ack->mid.mid), - ch->pending_messages); - send_ack_to_client (ch, - (NULL == ch->owner) - ? GNUNET_NO - : GNUNET_YES); + if (NULL != ch->retry_data_task) + { + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task = NULL; + } + if ( (NULL != ch->head_sent) && + (NULL == ch->head_sent->qe) ) + ch->retry_data_task + = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, + &retry_transmission, + ch); } @@ -1162,10 +1541,15 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, * the tunnel. * * @param ch channel to destroy + * @param cti identifier of the connection that delivered the message, + * NULL if we are simulating receiving a destroy due to shutdown */ void -GCCH_handle_remote_destroy (struct CadetChannel *ch) +GCCH_handle_remote_destroy (struct CadetChannel *ch, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti) { + struct CadetChannelClient *ccc; + GNUNET_assert (GNUNET_NO == ch->is_loopback); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received remote channel DESTROY for %s\n", @@ -1176,7 +1560,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) channel_destroy (ch); return; } - if (NULL != ch->head_recv) + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if (NULL != ccc->head_recv) { LOG (GNUNET_ERROR_TYPE_WARNING, "Lost end of transmission due to remote shutdown on %s\n", @@ -1184,43 +1569,30 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) /* FIXME: change API to notify client about truncated transmission! */ } ch->destroy = GNUNET_YES; - GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest, - (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest, + GSC_handle_remote_channel_destroy (ccc->c, + ccc->ccn, ch); channel_destroy (ch); } /** - * Function called once the tunnel has sent one of our messages. - * If the message is unreliable, simply frees the `crm`. If the - * message was reliable, calculate retransmission time and - * wait for ACK (or retransmit). - * - * @param cls the `struct CadetReliableMessage` that was sent - */ -static void -data_sent_cb (void *cls); - - -/** - * We need to retry a transmission, the last one took too long to - * be acknowledged. + * Test if element @a e1 comes before element @a e2. * - * @param cls the `struct CadetChannel` where we need to retransmit + * @param cls closure, to a flag where we indicate duplicate packets + * @param crm1 an element of to sort + * @param crm2 another element to sort + * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO */ -static void -retry_transmission (void *cls) +static int +cmp_crm_by_next_retry (void *cls, + struct CadetReliableMessage *crm1, + struct CadetReliableMessage *crm2) { - struct CadetChannel *ch = cls; - struct CadetReliableMessage *crm = ch->head_sent; - - ch->retry_data_task = NULL; - GNUNET_assert (NULL == crm->qe); - crm->qe = GCT_send (ch->t, - &crm->data_message.header, - &data_sent_cb, - crm); + if (crm1->next_retry.abs_value_us < + crm2->next_retry.abs_value_us) + return GNUNET_YES; + return GNUNET_NO; } @@ -1231,21 +1603,25 @@ retry_transmission (void *cls) * wait for ACK (or retransmit). * * @param cls the `struct CadetReliableMessage` that was sent + * @param cid identifier of the connection within the tunnel, NULL + * if transmission failed */ static void -data_sent_cb (void *cls) +data_sent_cb (void *cls, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) { struct CadetReliableMessage *crm = cls; struct CadetChannel *ch = crm->ch; - struct CadetReliableMessage *off; GNUNET_assert (GNUNET_NO == ch->is_loopback); + GNUNET_assert (NULL != crm->qe); crm->qe = NULL; GNUNET_CONTAINER_DLL_remove (ch->head_sent, ch->tail_sent, crm); if (GNUNET_NO == ch->reliable) { + GNUNET_free (crm->data_message); GNUNET_free (crm); ch->pending_messages--; send_ack_to_client (ch, @@ -1254,43 +1630,58 @@ data_sent_cb (void *cls) : GNUNET_YES); return; } - if (0 == crm->retry_delay.rel_value_us) - crm->retry_delay = ch->expected_delay; - crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay); - - /* find position for re-insertion into the DLL */ - if ( (NULL == ch->head_sent) || - (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) ) + if (NULL == cid) { - /* insert at HEAD, also (re)schedule retry task! */ - GNUNET_CONTAINER_DLL_insert (ch->head_sent, - ch->tail_sent, - crm); - if (NULL != ch->retry_data_task) - GNUNET_SCHEDULER_cancel (ch->retry_data_task); - ch->retry_data_task - = GNUNET_SCHEDULER_add_delayed (crm->retry_delay, - &retry_transmission, - ch); - return; + /* There was an error sending. */ + crm->num_transmissions = GNUNET_SYSERR; } - for (off = ch->head_sent; NULL != off; off = off->next) - if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us) - break; - if (NULL == off) + else if (GNUNET_SYSERR != crm->num_transmissions) { - /* insert at tail */ - GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent, + /* Increment transmission counter, and possibly store @a cid + if this was the first transmission. */ + crm->num_transmissions++; + if (1 == crm->num_transmissions) + { + crm->first_transmission_time = GNUNET_TIME_absolute_get (); + crm->connection_taken = *cid; + GCC_ack_expected (cid); + } + } + if ( (0 == crm->retry_delay.rel_value_us) && + (NULL != cid) ) + { + struct CadetConnection *cc = GCC_lookup (cid); + + if (NULL != cc) + crm->retry_delay = GCC_get_metrics (cc)->aged_latency; + else + crm->retry_delay = ch->retry_time; + } + crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay); + crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay, + MIN_RTT_DELAY); + crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay); + + GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage, + cmp_crm_by_next_retry, + NULL, + ch->head_sent, ch->tail_sent, crm); - } - else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message %u sent, next transmission on %s in %s\n", + (unsigned int) ntohl (crm->data_message->mid.mid), + GCCH_2s (ch), + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry), + GNUNET_YES)); + if (NULL == ch->head_sent->qe) { - /* insert before off */ - GNUNET_CONTAINER_DLL_insert_after (ch->head_sent, - ch->tail_sent, - off->prev, - crm); + if (NULL != ch->retry_data_task) + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task + = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, + &retry_transmission, + ch); } } @@ -1322,11 +1713,16 @@ GCCH_handle_local_data (struct CadetChannel *ch, GNUNET_break (0); return GNUNET_SYSERR; } + if (GNUNET_YES == ch->destroy) + { + /* we are going down, drop messages */ + return GNUNET_OK; + } ch->pending_messages++; if (GNUNET_YES == ch->is_loopback) { - struct CadetClient *receiver; + struct CadetChannelClient *receiver; struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; int to_owner; @@ -1334,108 +1730,185 @@ GCCH_handle_local_data (struct CadetChannel *ch, env = GNUNET_MQ_msg_extra (ld, buf_len, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - if (sender_ccn.channel_of_client == - ch->ccn_owner.channel_of_client) + if ( (NULL != ch->owner) && + (sender_ccn.channel_of_client == + ch->owner->ccn.channel_of_client) ) { receiver = ch->dest; - ld->ccn = ch->ccn_dest; to_owner = GNUNET_NO; } - else + else if ( (NULL != ch->dest) && + (sender_ccn.channel_of_client == + ch->dest->ccn.channel_of_client) ) { - GNUNET_assert (sender_ccn.channel_of_client == - ch->ccn_dest.channel_of_client); receiver = ch->owner; - ld->ccn = ch->ccn_owner; to_owner = GNUNET_YES; } + else + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + ld->ccn = receiver->ccn; GNUNET_memcpy (&ld[1], buf, buf_len); - /* FIXME: this does not provide for flow control! */ - GSC_send_to_client (receiver, - env); - send_ack_to_client (ch, - to_owner); + if (GNUNET_YES == receiver->client_ready) + { + GSC_send_to_client (receiver->c, + env); + send_ack_to_client (ch, + to_owner); + } + else + { + struct CadetOutOfOrderMessage *oom; + + oom = GNUNET_new (struct CadetOutOfOrderMessage); + oom->env = env; + GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv, + receiver->tail_recv, + oom); + receiver->num_recv++; + } return GNUNET_OK; } /* Everything is correct, send the message. */ - crm = GNUNET_malloc (sizeof (*crm) + buf_len); + crm = GNUNET_malloc (sizeof (*crm)); crm->ch = ch; - crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len); - crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); + crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + + buf_len); + crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len); + crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1); - crm->data_message.mid = ch->mid_send; - crm->data_message.ctn = ch->ctn; - GNUNET_memcpy (&crm[1], + crm->data_message->mid = ch->mid_send; + crm->data_message->ctn = ch->ctn; + GNUNET_memcpy (&crm->data_message[1], buf, buf_len); - GNUNET_CONTAINER_DLL_insert (ch->head_sent, - ch->tail_sent, - crm); + GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent, + ch->tail_sent, + crm); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes from local client to %s\n", - buf_len, - GCCH_2s (ch)); + "Sending message %u from local client to %s with %u bytes\n", + ntohl (crm->data_message->mid.mid), + GCCH_2s (ch), + buf_len); + if (NULL != ch->retry_data_task) + { + GNUNET_SCHEDULER_cancel (ch->retry_data_task); + ch->retry_data_task = NULL; + } crm->qe = GCT_send (ch->t, - &crm->data_message.header, + &crm->data_message->header, &data_sent_cb, crm); + GNUNET_assert (NULL == ch->retry_data_task); return GNUNET_OK; } /** - * Try to deliver messages to the local client, if it is ready for more. + * Handle ACK from client on local channel. Means the client is ready + * for more data, see if we have any for it. * - * @param ch channel to process + * @param ch channel to destroy + * @param client_ccn ccn of the client sending the ack */ -static void -send_client_buffered_data (struct CadetChannel *ch) +void +GCCH_handle_local_ack (struct CadetChannel *ch, + struct GNUNET_CADET_ClientChannelNumber client_ccn) { + struct CadetChannelClient *ccc; struct CadetOutOfOrderMessage *com; - if (GNUNET_NO == ch->client_ready) - return; /* client not ready */ - com = ch->head_recv; + if ( (NULL != ch->owner) && + (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->owner; + else if ( (NULL != ch->dest) && + (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->dest; + else + GNUNET_assert (0); + ccc->client_ready = GNUNET_YES; + com = ccc->head_recv; if (NULL == com) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n", + GSC_2s (ccc->c), + ntohl (client_ccn.channel_of_client), + GCCH_2s (ch), + ntohl (ccc->ccn.channel_of_client), + ccc); return; /* none pending */ + } + if (GNUNET_YES == ch->is_loopback) + { + int to_owner; + + /* Messages are always in-order, just send */ + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + ccc->num_recv--; + GSC_send_to_client (ccc->c, + com->env); + /* Notify sender that we can receive more */ + if (ccc->ccn.channel_of_client == + ch->owner->ccn.channel_of_client) + { + to_owner = GNUNET_NO; + } + else + { + GNUNET_assert (ccc->ccn.channel_of_client == + ch->dest->ccn.channel_of_client); + to_owner = GNUNET_YES; + } + send_ack_to_client (ch, + to_owner); + GNUNET_free (com); + return; + } + if ( (com->mid.mid != ch->mid_recv.mid) && - (GNUNET_NO == ch->out_of_order) ) + (GNUNET_NO == ch->out_of_order) && + (GNUNET_YES == ch->reliable) ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n", + GSC_2s (ccc->c), + ntohl (ccc->ccn.channel_of_client), + ntohl (com->mid.mid), + ntohl (ch->mid_recv.mid)); return; /* missing next one in-order */ + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Passing payload message to client on %s\n", - GCCH_2s (ch)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n", + ntohl (com->mid.mid), + GSC_2s (ccc->c), + ntohl (ccc->ccn.channel_of_client), + GCCH_2s (ch)); /* all good, pass next message to client */ - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, com); + ccc->num_recv--; /* FIXME: if unreliable, this is not aggressive enough, as it would be OK to have lost some! */ + ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); ch->mid_futures >>= 1; /* equivalent to division by 2 */ - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, com->env); GNUNET_free (com); - if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && - (GNUNET_YES == ch->reliable) ) - { - /* The next 15 messages were also already received (0xFF), this - suggests that the sender may be blocked on flow control - urgently waiting for an ACK from us. (As we have an inherent - maximum of 64 bits, and 15 is getting too close for comfort.) - So we should send one now. */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sender on %s likely blocked on flow-control, sending ACK now.\n", - GCCH_2s (ch)); - if (GNUNET_YES == ch->reliable) - send_channel_data_ack (ch); - } - - if (NULL != ch->head_recv) + send_channel_data_ack (ch); + if (NULL != ccc->head_recv) return; if (GNUNET_NO == ch->destroy) return; @@ -1445,23 +1918,6 @@ send_client_buffered_data (struct CadetChannel *ch) } -/** - * Handle ACK from client on local channel. - * - * @param ch channel to destroy - * @param client_ccn ccn of the client sending the ack - */ -void -GCCH_handle_local_ack (struct CadetChannel *ch, - struct GNUNET_CADET_ClientChannelNumber client_ccn) -{ - ch->client_ready = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got LOCAL_ACK, client ready to receive more data!\n"); - send_client_buffered_data (ch); -} - - #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) @@ -1497,17 +1953,17 @@ GCCH_debug (struct CadetChannel *ch, { LOG2 (level, "CHN origin %s ready %s local-id: %u\n", - GSC_2s (ch->owner), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn_owner.channel_of_client)); + GSC_2s (ch->owner->c), + ch->owner->client_ready ? "YES" : "NO", + ntohl (ch->owner->ccn.channel_of_client)); } if (NULL != ch->dest) { LOG2 (level, "CHN destination %s ready %s local-id: %u\n", - GSC_2s (ch->dest), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn_dest.channel_of_client)); + GSC_2s (ch->dest->c), + ch->dest->client_ready ? "YES" : "NO", + ntohl (ch->dest->ccn.channel_of_client)); } LOG2 (level, "CHN Message IDs recv: %d (%LLX), send: %d\n",