-
/*
This file is part of GNUnet.
Copyright (C) 2001-2017 GNUnet e.V.
* @author Christian Grothoff
*
* TODO:
- * - introduce shutdown so we can have half-closed channels, modify
- * destroy to include MID to have FIN-ACK equivalents, etc.
- * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
- * - check that '0xFFULL' really is sufficient for flow control!
- * - revisit handling of 'unreliable' traffic!
- * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
- * - figure out flow control without ACKs (unreliable traffic!)
+ * - Congestion/flow control:
+ * + calculate current RTT if possible, use that for initial retransmissions
+ * (NOTE: needs us to learn which connection the tunnel uses for the message!)
+ * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
+ * (and figure out how/where to use this!)
+ * + figure out flow control without ACKs (unreliable traffic!)
+ * - revisit handling of 'unbuffered' traffic!
+ * (need to push down through tunnel into connection selection)
+ * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
+ * reserve more bits in 'options' to allow for buffer size control?
*/
#include "platform.h"
#include "gnunet_util_lib.h"
*/
#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
+/**
+ * How long do we wait at least before retransmitting ever?
+ */
+#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
+
+/**
+ * Maximum message ID into the future we accept for out-of-order messages.
+ * If the message is more than this into the future, we drop it. This is
+ * important both to detect values that are actually in the past, as well
+ * as to limit adversarially triggerable memory consumption.
+ *
+ * Note that right now we have "max_pending_messages = 4" hard-coded in
+ * the logic below, so a value of 4 would suffice here. But we plan to
+ * allow larger windows in the future...
+ */
+#define MAX_OUT_OF_ORDER_DISTANCE 1024
+
/**
* All the states a connection can be in.
*/
struct CadetTunnelQueueEntry *qe;
+ /**
+ * Data message we are trying to send.
+ */
+ struct GNUNET_CADET_ChannelAppDataMessage *data_message;
+
/**
* How soon should we retry if we fail to get an ACK?
* Messages in the queue are sorted by this value.
struct GNUNET_TIME_Relative retry_delay;
/**
- * Data message we are trying to send.
+ * Time when we first successfully transmitted the message
+ * (that is, set @e num_transmissions to 1).
*/
- struct GNUNET_CADET_ChannelAppDataMessage data_message;
+ struct GNUNET_TIME_Absolute first_transmission_time;
+
+ /**
+ * Identifier of the connection that this message took when it
+ * was first transmitted. Only useful if @e num_transmissions is 1.
+ */
+ struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
+
+ /**
+ * How often was this message transmitted? #GNUNET_SYSERR if there
+ * was an error transmitting the message, #GNUNET_NO if it was not
+ * yet transmitted ever, otherwise the number of (re) transmissions.
+ */
+ int num_transmissions;
- /* followed by variable-size payload */
};
};
+/**
+ * Client endpoint of a `struct CadetChannel`. A channel may be a
+ * loopback channel, in which case it has two of these endpoints.
+ * Note that flow control also is required in both directions.
+ */
+struct CadetChannelClient
+{
+ /**
+ * Client handle. Not by itself sufficient to designate
+ * the client endpoint, as the same client handle may
+ * be used for both the owner and the destination, and
+ * we thus also need the channel ID to identify the client.
+ */
+ struct CadetClient *c;
+
+ /**
+ * Head of DLL of messages received out of order or while client was unready.
+ */
+ struct CadetOutOfOrderMessage *head_recv;
+
+ /**
+ * Tail DLL of messages received out of order or while client was unready.
+ */
+ struct CadetOutOfOrderMessage *tail_recv;
+
+ /**
+ * Local tunnel number for this client.
+ * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
+ * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
+ */
+ struct GNUNET_CADET_ClientChannelNumber ccn;
+
+ /**
+ * Number of entries currently in @a head_recv DLL.
+ */
+ unsigned int num_recv;
+
+ /**
+ * Can we send data to the client?
+ */
+ int client_ready;
+
+};
+
+
/**
* Struct containing all information regarding a channel to a remote client.
*/
* Client owner of the tunnel, if any.
* (Used if this channel represends the initiating end of the tunnel.)
*/
- struct CadetClient *owner;
+ struct CadetChannelClient *owner;
/**
* Client destination of the tunnel, if any.
* (Used if this channel represents the listening end of the tunnel.)
*/
- struct CadetClient *dest;
+ struct CadetChannelClient *dest;
/**
* Last entry in the tunnel's queue relating to control messages
*/
struct CadetReliableMessage *tail_sent;
- /**
- * Head of DLL of messages received out of order or while client was unready.
- */
- struct CadetOutOfOrderMessage *head_recv;
-
- /**
- * Tail DLL of messages received out of order or while client was unready.
- */
- struct CadetOutOfOrderMessage *tail_recv;
-
/**
* Task to resend/poll in case no ACK is received.
*/
*/
struct GNUNET_CADET_ChannelTunnelNumber ctn;
- /**
- * Local tunnel number for local client @e owner owning the channel.
- * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
- */
- struct GNUNET_CADET_ClientChannelNumber ccn_owner;
-
- /**
- * Local tunnel number for local client @e dest owning the channel.
- * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
- */
- struct GNUNET_CADET_ClientChannelNumber ccn_dest;
-
/**
* Channel state.
*/
enum CadetChannelState state;
/**
- * Can we send data to the client?
+ * Count how many ACKs we skipped, used to prevent long
+ * sequences of ACK skipping.
*/
- int client_ready;
+ unsigned int skip_ack_series;
/**
* Is the tunnel bufferless (minimum latency)?
: GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
GNUNET_h2s (&ch->port),
ch->ctn,
- ntohl (ch->ccn_owner.channel_of_client),
- ntohl (ch->ccn_dest.channel_of_client));
+ (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
+ (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
return buf;
}
}
+/**
+ * Release memory associated with @a ccc
+ *
+ * @param ccc data structure to clean up
+ */
+static void
+free_channel_client (struct CadetChannelClient *ccc)
+{
+ struct CadetOutOfOrderMessage *com;
+
+ while (NULL != (com = ccc->head_recv))
+ {
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ GNUNET_MQ_discard (com->env);
+ GNUNET_free (com);
+ }
+ GNUNET_free (ccc);
+}
+
+
/**
* Destroy the given channel.
*
channel_destroy (struct CadetChannel *ch)
{
struct CadetReliableMessage *crm;
- struct CadetOutOfOrderMessage *com;
while (NULL != (crm = ch->head_sent))
{
GNUNET_CONTAINER_DLL_remove (ch->head_sent,
ch->tail_sent,
crm);
+ GNUNET_free (crm->data_message);
GNUNET_free (crm);
}
- while (NULL != (com = ch->head_recv))
+ if (NULL != ch->owner)
{
- GNUNET_CONTAINER_DLL_remove (ch->head_recv,
- ch->tail_recv,
- com);
- GNUNET_MQ_discard (com->env);
- GNUNET_free (com);
+ free_channel_client (ch->owner);
+ ch->owner = NULL;
+ }
+ if (NULL != ch->dest)
+ {
+ free_channel_client (ch->dest);
+ ch->dest = NULL;
}
if (NULL != ch->last_control_qe)
{
* create message. Delays for a bit until we retry.
*
* @param cls our `struct CadetChannel`.
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
static void
-channel_open_sent_cb (void *cls)
+channel_open_sent_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
struct CadetChannel *ch = cls;
ch->last_control_qe = NULL;
ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sent CHANNEL_OPEN on %s, retrying in %s\n",
+ "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
GCCH_2s (ch),
GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
GNUNET_YES));
&msgcc.header,
&channel_open_sent_cb,
ch);
+ GNUNET_assert (NULL == ch->retry_control_task);
}
uint32_t options)
{
struct CadetChannel *ch;
+ struct CadetChannelClient *ccco;
+
+ ccco = GNUNET_new (struct CadetChannelClient);
+ ccco->c = owner;
+ ccco->ccn = ccn;
+ ccco->client_ready = GNUNET_YES;
ch = GNUNET_new (struct CadetChannel);
+ ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
- ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
- ch->owner = owner;
- ch->ccn_owner = ccn;
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
+ ch->owner = ccco;
ch->port = *port;
if (0 == memcmp (&my_full_id,
GCP_get_id (destination),
sizeof (struct GNUNET_PeerIdentity)))
{
+ struct CadetClient *c;
+
ch->is_loopback = GNUNET_YES;
- ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports,
- port);
- if (NULL == ch->dest)
+ c = GNUNET_CONTAINER_multihashmap_get (open_ports,
+ port);
+ if (NULL == c)
{
/* port closed, wait for it to possibly open */
(void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
}
else
{
+ ch->dest = GNUNET_new (struct CadetChannelClient);
+ ch->dest->c = c;
+ ch->dest->client_ready = GNUNET_YES;
GCCH_bind (ch,
- ch->dest);
+ ch->dest->c);
}
}
else
ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
- ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
GNUNET_STATISTICS_update (stats,
"# channels",
1,
* ACKs for ACKs ;-).
*
* @param cls our `struct CadetChannel`.
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
static void
-send_ack_cb (void *cls)
+send_ack_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
struct CadetChannel *ch = cls;
{
struct GNUNET_CADET_ChannelDataAckMessage msg;
+ if (GNUNET_NO == ch->reliable)
+ return; /* no ACKs */
msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
msg.header.size = htons (sizeof (msg));
msg.ctn = ch->ctn;
- msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
+ msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
msg.futures = GNUNET_htonll (ch->mid_futures);
if (NULL != ch->last_control_qe)
GCT_send_cancel (ch->last_control_qe);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending DATA_ACK %u:%llX via %s\n",
+ (unsigned int) ntohl (msg.mid.mid),
+ (unsigned long long) ch->mid_futures,
+ GCCH_2s (ch));
ch->last_control_qe = GCT_send (ch->t,
&msg.header,
&send_ack_cb,
* #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
*
* @param ch channel that got the duplicate open
+ * @param cti identifier of the connection that delivered the message
*/
void
-GCCH_handle_duplicate_open (struct CadetChannel *ch)
+GCCH_handle_duplicate_open (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
if (NULL == ch->dest)
{
/**
- * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL ACK to the client to solicit more messages.
+ * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
*
* @param ch channel the ack is for
- * @param c client to send the ACK to
+ * @param to_owner #GNUNET_YES to send to owner,
+ * #GNUNET_NO to send to dest
*/
static void
send_ack_to_client (struct CadetChannel *ch,
- struct CadetClient *c)
+ int to_owner)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_LocalAck *ack;
+ struct CadetChannelClient *ccc;
+ ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
+ if (NULL == ccc)
+ {
+ /* This can happen if we are just getting ACKs after
+ our local client already disconnected. */
+ GNUNET_assert (GNUNET_YES == ch->destroy);
+ return;
+ }
env = GNUNET_MQ_msg (ack,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
- ack->ccn = (c == ch->owner) ? ch->ccn_owner : ch->ccn_dest;
- GSC_send_to_client (c,
+ ack->ccn = ccc->ccn;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
+ GSC_2s (ccc->c),
+ (GNUNET_YES == to_owner) ? "owner" : "dest",
+ ntohl (ack->ccn.channel_of_client),
+ ch->pending_messages,
+ ch->max_pending_messages);
+ GSC_send_to_client (ccc->c,
env);
}
struct CadetClient *c)
{
uint32_t options;
+ struct CadetChannelClient *cccd;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Binding %s from %s to port %s of %s\n",
options |= GNUNET_CADET_OPTION_RELIABLE;
if (ch->out_of_order)
options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
- ch->dest = c;
- ch->ccn_dest = GSC_bind (c,
- ch,
- (GNUNET_YES == ch->is_loopback)
- ? GCP_get (&my_full_id,
- GNUNET_YES)
- : GCT_get_destination (ch->t),
- &ch->port,
- options);
- ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
+ cccd = GNUNET_new (struct CadetChannelClient);
+ ch->dest = cccd;
+ cccd->c = c;
+ cccd->client_ready = GNUNET_YES;
+ cccd->ccn = GSC_bind (c,
+ ch,
+ (GNUNET_YES == ch->is_loopback)
+ ? GCP_get (&my_full_id,
+ GNUNET_YES)
+ : GCT_get_destination (ch->t),
+ &ch->port,
+ options);
+ GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
+ GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
+ ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
if (GNUNET_YES == ch->is_loopback)
{
ch->state = CADET_CHANNEL_OPEN_SENT;
- GCCH_handle_channel_open_ack (ch);
+ GCCH_handle_channel_open_ack (ch,
+ NULL);
}
else
{
ch);
}
/* give client it's initial supply of ACKs */
+ GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
+ GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
for (unsigned int i=0;i<ch->max_pending_messages;i++)
send_ack_to_client (ch,
- ch->dest);
+ GNUNET_NO);
}
*
* @param ch channel to destroy
* @param c client that caused the destruction
+ * @param ccn client number of the client @a c
*/
void
GCCH_channel_local_destroy (struct CadetChannel *ch,
- struct CadetClient *c)
+ struct CadetClient *c,
+ struct GNUNET_CADET_ClientChannelNumber ccn)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s asks for destruction of %s\n",
GSC_2s (c),
GCCH_2s (ch));
GNUNET_assert (NULL != c);
- if (c == ch->owner)
+ if ( (NULL != ch->owner) &&
+ (c == ch->owner->c) &&
+ (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
+ {
+ free_channel_client (ch->owner);
ch->owner = NULL;
- else if (c == ch->dest)
+ }
+ else if ( (NULL != ch->dest) &&
+ (c == ch->dest->c) &&
+ (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
+ {
+ free_channel_client (ch->dest);
ch->dest = NULL;
+ }
else
+ {
GNUNET_assert (0);
+ }
+
if (GNUNET_YES == ch->destroy)
{
/* other end already destroyed, with the local client gone, no need
* (the port is open on the other side). Begin transmissions.
*
* @param ch channel to destroy
+ * @param cti identifier of the connection that delivered the message
*/
void
-GCCH_handle_channel_open_ack (struct CadetChannel *ch)
+GCCH_handle_channel_open_ack (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
switch (ch->state)
{
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received channel OPEN_ACK for waiting %s, entering READY state\n",
+ "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
GCCH_2s (ch));
if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
{
to be buffered! */
for (unsigned int i=0;i<ch->max_pending_messages;i++)
send_ack_to_client (ch,
- ch->owner);
+ GNUNET_YES);
break;
case CADET_CHANNEL_READY:
/* duplicate ACK, maybe we retried the CREATE. Ignore. */
/**
* Test if element @a e1 comes before element @a e2.
*
- * TODO: use opportunity to create generic list insertion sort
- * logic in container!
- *
- * @param cls closure, our `struct CadetChannel`
- * @param e1 an element of to sort
- * @param e2 another element to sort
+ * @param cls closure, to a flag where we indicate duplicate packets
+ * @param m1 a message of to sort
+ * @param m2 another message to sort
* @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
*/
static int
is_before (void *cls,
- void *e1,
- void *e2)
+ struct CadetOutOfOrderMessage *m1,
+ struct CadetOutOfOrderMessage *m2)
{
- struct CadetOutOfOrderMessage *m1 = e1;
- struct CadetOutOfOrderMessage *m2 = e2;
+ int *duplicate = cls;
uint32_t v1 = ntohl (m1->mid.mid);
uint32_t v2 = ntohl (m2->mid.mid);
uint32_t delta;
- delta = v1 - v2;
+ delta = v2 - v1;
+ if (0 == delta)
+ *duplicate = GNUNET_YES;
if (delta > (uint32_t) INT_MAX)
{
/* in overflow range, we can safely assume we wrapped around */
}
else
{
+ /* result is small, thus v2 > v1, thus m1 < m2 */
return GNUNET_YES;
}
}
* and send an ACK to the other end (once flow control allows it!)
*
* @param ch channel that got data
+ * @param cti identifier of the connection that delivered the message
+ * @param msg message that was received
*/
void
GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
const struct GNUNET_CADET_ChannelAppDataMessage *msg)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_LocalData *ld;
- struct CadetOutOfOrderMessage *com;
+ struct CadetChannelClient *ccc;
size_t payload_size;
+ struct CadetOutOfOrderMessage *com;
+ int duplicate;
+ uint32_t mid_min;
+ uint32_t mid_max;
+ uint32_t mid_msg;
+ uint32_t delta;
GNUNET_assert (GNUNET_NO == ch->is_loopback);
+ if ( (GNUNET_YES == ch->destroy) &&
+ (NULL == ch->owner) &&
+ (NULL == ch->dest) )
+ {
+ /* This client is gone, but we still have messages to send to
+ the other end (which is why @a ch is not yet dead). However,
+ we cannot pass messages to our client anymore. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping incoming payload on %s as this end is already closed\n",
+ GCCH_2s (ch));
+ /* send back DESTROY notification to stop further retransmissions! */
+ GCT_send_channel_destroy (ch->t,
+ ch->ctn);
+ return;
+ }
payload_size = ntohs (msg->header.size) - sizeof (*msg);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receicved %u bytes of application data on %s\n",
- (unsigned int) payload_size,
- GCCH_2s (ch));
env = GNUNET_MQ_msg_extra (ld,
payload_size,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
- ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest;
+ ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
GNUNET_memcpy (&ld[1],
&msg[1],
payload_size);
- if ( (GNUNET_YES == ch->client_ready) &&
+ ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
+ if ( (GNUNET_YES == ccc->client_ready) &&
( (GNUNET_YES == ch->out_of_order) ||
(msg->mid.mid == ch->mid_recv.mid) ) )
{
- GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Giving %u bytes of payload with MID %u from %s to client %s\n",
+ (unsigned int) payload_size,
+ ntohl (msg->mid.mid),
+ GCCH_2s (ch),
+ GSC_2s (ccc->c));
+ ccc->client_ready = GNUNET_NO;
+ GSC_send_to_client (ccc->c,
env);
ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
ch->mid_futures >>= 1;
+ send_channel_data_ack (ch);
+ return;
}
- else
+
+ if (GNUNET_YES == ch->reliable)
{
- /* FIXME-SECURITY: if the element is WAY too far ahead,
- drop it (can't buffer too much!) */
- com = GNUNET_new (struct CadetOutOfOrderMessage);
- com->mid = msg->mid;
- com->env = env;
- /* sort into list ordered by "is_before" */
- if ( (NULL == ch->head_recv) ||
- (GNUNET_YES == is_before (ch,
- com,
- ch->head_recv)) )
+ /* check if message ought to be dropped because it is ancient/too distant/duplicate */
+ mid_min = ntohl (ch->mid_recv.mid);
+ mid_max = mid_min + ch->max_pending_messages;
+ mid_msg = ntohl (msg->mid.mid);
+ if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
+ ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
{
- GNUNET_CONTAINER_DLL_insert (ch->head_recv,
- ch->tail_recv,
- com);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s at %u drops ancient or far-future message %u\n",
+ GCCH_2s (ch),
+ (unsigned int) mid_min,
+ ntohl (msg->mid.mid));
+
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA (ancient or future)",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_discard (env);
+ send_channel_data_ack (ch);
+ return;
}
- else
+ /* mark bit for future ACKs */
+ delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
+ if (delta < 64)
{
- struct CadetOutOfOrderMessage *pos;
-
- for (pos = ch->head_recv;
- NULL != pos;
- pos = pos->next)
+ if (0 != (ch->mid_futures & (1LLU << delta)))
{
- if (GNUNET_YES !=
- is_before (ch,
- pos,
- com))
- break;
+ /* Duplicate within the queue, drop also */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_discard (env);
+ send_channel_data_ack (ch);
+ return;
}
- if (NULL == pos)
- GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
- ch->tail_recv,
- com);
- else
- GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
- ch->tail_recv,
- com,
- pos->prev);
+ ch->mid_futures |= (1LLU << delta);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Marked bit %llX for mid %u (base: %u); now: %llX\n",
+ (1LLU << delta),
+ mid_msg,
+ mid_min,
+ ch->mid_futures);
}
}
+ else /* ! ch->reliable */
+ {
+ /* Channel is unreliable, so we do not ACK. But we also cannot
+ allow buffering everything, so check if we have space... */
+ if (ccc->num_recv >= ch->max_pending_messages)
+ {
+ struct CadetOutOfOrderMessage *drop;
+
+ /* Yep, need to drop. Drop the oldest message in
+ the buffer. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full due slow client on %s, dropping oldest message\n",
+ GCCH_2s (ch));
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped due to slow client",
+ 1,
+ GNUNET_NO);
+ drop = ccc->head_recv;
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ drop);
+ ccc->num_recv--;
+ GNUNET_MQ_discard (drop->env);
+ GNUNET_free (drop);
+ }
+ }
+
+ /* Insert message into sorted out-of-order queue */
+ com = GNUNET_new (struct CadetOutOfOrderMessage);
+ com->mid = msg->mid;
+ com->env = env;
+ duplicate = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
+ is_before,
+ &duplicate,
+ ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv++;
+ if (GNUNET_YES == duplicate)
+ {
+ /* Duplicate within the queue, drop also (this is not covered by
+ the case above if "delta" >= 64, which could be the case if
+ max_pending_messages is also >= 64 or if our client is unready
+ and we are seeing retransmissions of the message our client is
+ blocked on. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA",
+ 1,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ GNUNET_MQ_discard (com->env);
+ GNUNET_free (com);
+ send_channel_data_ack (ch);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
+ (GNUNET_YES == ccc->client_ready)
+ ? "out-of-order"
+ : "client-not-ready",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (ccc->ccn.channel_of_client),
+ ccc,
+ ntohl (msg->mid.mid),
+ ntohl (ch->mid_recv.mid));
+ /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
+ the sender may already be transmitting the previous one. Needs
+ experimental evaluation to see if/when this ACK helps or
+ hurts. (We might even want another option.) */
+ send_channel_data_ack (ch);
+}
+
+
+/**
+ * Function called once the tunnel has sent one of our messages.
+ * If the message is unreliable, simply frees the `crm`. If the
+ * message was reliable, calculate retransmission time and
+ * wait for ACK (or retransmit).
+ *
+ * @param cls the `struct CadetReliableMessage` that was sent
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
+ */
+static void
+data_sent_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
+
+
+/**
+ * We need to retry a transmission, the last one took too long to
+ * be acknowledged.
+ *
+ * @param cls the `struct CadetChannel` where we need to retransmit
+ */
+static void
+retry_transmission (void *cls)
+{
+ struct CadetChannel *ch = cls;
+ struct CadetReliableMessage *crm = ch->head_sent;
+
+ ch->retry_data_task = NULL;
+ GNUNET_assert (NULL == crm->qe);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrying transmission on %s of message %u\n",
+ GCCH_2s (ch),
+ (unsigned int) ntohl (crm->data_message->mid.mid));
+ crm->qe = GCT_send (ch->t,
+ &crm->data_message->header,
+ &data_sent_cb,
+ crm);
+ GNUNET_assert (NULL == ch->retry_data_task);
+}
+
+
+/**
+ * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
+ * the queue and tell our client that it can send more.
+ *
+ * @param ch the channel that got the PLAINTEXT_DATA_ACK
+ * @param cti identifier of the connection that delivered the message
+ * @param crm the message that got acknowledged
+ */
+static void
+handle_matching_ack (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
+ struct CadetReliableMessage *crm)
+{
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ ch->pending_messages--;
+ GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
+ GCCH_2s (ch),
+ (unsigned int) ntohl (crm->data_message->mid.mid),
+ ch->pending_messages);
+ if (NULL != crm->qe)
+ {
+ GCT_send_cancel (crm->qe);
+ crm->qe = NULL;
+ }
+ if ( (1 == crm->num_transmissions) &&
+ (NULL != cti) )
+ {
+ GCC_ack_observed (cti);
+ if (0 == memcmp (cti,
+ &crm->connection_taken,
+ sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
+ {
+ GCC_latency_observed (cti,
+ GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
+ }
+ }
+ GNUNET_free (crm->data_message);
+ GNUNET_free (crm);
+ send_ack_to_client (ch,
+ (NULL == ch->owner)
+ ? GNUNET_NO
+ : GNUNET_YES);
}
* Possibly resume transmissions.
*
* @param ch channel that got the ack
+ * @param cti identifier of the connection that delivered the message
* @param ack details about what was received
*/
void
GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
const struct GNUNET_CADET_ChannelDataAckMessage *ack)
{
struct CadetReliableMessage *crm;
+ struct CadetReliableMessage *crmn;
+ int found;
+ uint32_t mid_base;
+ uint64_t mid_mask;
+ unsigned int delta;
GNUNET_break (GNUNET_NO == ch->is_loopback);
if (GNUNET_NO == ch->reliable)
GNUNET_break_op (0);
return;
}
+ /* mid_base is the MID of the next message that the
+ other peer expects (i.e. that is missing!), everything
+ LOWER (but excluding mid_base itself) was received. */
+ mid_base = ntohl (ack->mid.mid);
+ mid_mask = GNUNET_htonll (ack->futures);
+ found = GNUNET_NO;
for (crm = ch->head_sent;
NULL != crm;
- crm = crm->next)
- if (ack->mid.mid == crm->data_message.mid.mid)
- break;
- if (NULL == crm)
+ crm = crmn)
+ {
+ crmn = crm->next;
+ delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
+ if (delta >= UINT_MAX - ch->max_pending_messages)
+ {
+ /* overflow, means crm was a bit in the past, so this ACK counts for it. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got DATA_ACK with base %u satisfying past message %u on %s\n",
+ (unsigned int) mid_base,
+ ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch));
+ handle_matching_ack (ch,
+ cti,
+ crm);
+ found = GNUNET_YES;
+ continue;
+ }
+ delta--;
+ if (delta >= 64)
+ continue;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Testing bit %llX for mid %u (base: %u)\n",
+ (1LLU << delta),
+ ntohl (crm->data_message->mid.mid),
+ mid_base);
+ if (0 != (mid_mask & (1LLU << delta)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got DATA_ACK with mask for %u on %s\n",
+ ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch));
+ handle_matching_ack (ch,
+ cti,
+ crm);
+ found = GNUNET_YES;
+ }
+ }
+ if (GNUNET_NO == found)
{
/* ACK for message we already dropped, might have been a
duplicate ACK? Ignore. */
GNUNET_NO);
return;
}
- GNUNET_CONTAINER_DLL_remove (ch->head_sent,
- ch->tail_sent,
- crm);
- ch->pending_messages--;
- GNUNET_free (crm);
- GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
- GCCH_2s (ch),
- (unsigned int) ntohl (ack->mid.mid),
- ch->pending_messages);
- send_ack_to_client (ch,
- (NULL == ch->owner) ? ch->dest : ch->owner);
+ if (NULL != ch->retry_data_task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
+ }
+ if ( (NULL != ch->head_sent) &&
+ (NULL == ch->head_sent->qe) )
+ ch->retry_data_task
+ = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
+ &retry_transmission,
+ ch);
}
* the tunnel.
*
* @param ch channel to destroy
+ * @param cti identifier of the connection that delivered the message,
+ * NULL if we are simulating receiving a destroy due to shutdown
*/
void
-GCCH_handle_remote_destroy (struct CadetChannel *ch)
+GCCH_handle_remote_destroy (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
+ struct CadetChannelClient *ccc;
+
GNUNET_assert (GNUNET_NO == ch->is_loopback);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received remote channel DESTROY for %s\n",
channel_destroy (ch);
return;
}
- if (NULL != ch->head_recv)
+ ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
+ if (NULL != ccc->head_recv)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Lost end of transmission due to remote shutdown on %s\n",
/* FIXME: change API to notify client about truncated transmission! */
}
ch->destroy = GNUNET_YES;
- GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
- (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest,
+ GSC_handle_remote_channel_destroy (ccc->c,
+ ccc->ccn,
ch);
channel_destroy (ch);
}
/**
- * Function called once the tunnel has sent one of our messages.
- * If the message is unreliable, simply frees the `crm`. If the
- * message was reliable, calculate retransmission time and
- * wait for ACK (or retransmit).
- *
- * @param cls the `struct CadetReliableMessage` that was sent
- */
-static void
-data_sent_cb (void *cls);
-
-
-/**
- * We need to retry a transmission, the last one took too long to
- * be acknowledged.
- *
- * @param cls the `struct CadetChannel` where we need to retransmit
- */
-static void
-retry_transmission (void *cls)
-{
- struct CadetChannel *ch = cls;
- struct CadetReliableMessage *crm = ch->head_sent;
-
- ch->retry_data_task = NULL;
- GNUNET_assert (NULL == crm->qe);
- crm->qe = GCT_send (ch->t,
- &crm->data_message.header,
- &data_sent_cb,
- crm);
-}
-
-
-/**
- * Check if we can now allow the client to transmit, and if so,
- * let the client know about it.
+ * Test if element @a e1 comes before element @a e2.
*
- * @param ch channel to check
+ * @param cls closure, to a flag where we indicate duplicate packets
+ * @param crm1 an element of to sort
+ * @param crm2 another element to sort
+ * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
*/
-static void
-GCCH_check_allow_client (struct CadetChannel *ch)
+static int
+cmp_crm_by_next_retry (void *cls,
+ struct CadetReliableMessage *crm1,
+ struct CadetReliableMessage *crm2)
{
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_CADET_LocalAck *msg;
-
- if (CADET_CHANNEL_READY != ch->state)
- {
- /* destination did not yet ACK our CREATE! */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s not yet ready, throttling client until ACK.\n",
- GCCH_2s (ch));
- return;
- }
- if (ch->pending_messages > ch->max_pending_messages)
- {
- /* Too many messages in queue. */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Message queue still too long on %s, throttling client until ACK.\n",
- GCCH_2s (ch));
- return;
- }
- if ( (NULL != ch->head_sent) &&
- (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Gap in ACKs too big on %s, throttling client until ACK.\n",
- GCCH_2s (ch));
- return;
- }
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending local ack to %s client\n",
- GCCH_2s (ch));
- env = GNUNET_MQ_msg (msg,
- GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
- msg->ccn = (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest;
- GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
- env);
+ if (crm1->next_retry.abs_value_us <
+ crm2->next_retry.abs_value_us)
+ return GNUNET_YES;
+ return GNUNET_NO;
}
* wait for ACK (or retransmit).
*
* @param cls the `struct CadetReliableMessage` that was sent
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
static void
-data_sent_cb (void *cls)
+data_sent_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
struct CadetReliableMessage *crm = cls;
struct CadetChannel *ch = crm->ch;
- struct CadetReliableMessage *off;
+ GNUNET_assert (GNUNET_NO == ch->is_loopback);
+ GNUNET_assert (NULL != crm->qe);
crm->qe = NULL;
GNUNET_CONTAINER_DLL_remove (ch->head_sent,
ch->tail_sent,
crm);
if (GNUNET_NO == ch->reliable)
{
+ GNUNET_free (crm->data_message);
GNUNET_free (crm);
ch->pending_messages--;
- GCCH_check_allow_client (ch);
+ send_ack_to_client (ch,
+ (NULL == ch->owner)
+ ? GNUNET_NO
+ : GNUNET_YES);
return;
}
+ if (NULL == cid)
+ {
+ /* There was an error sending. */
+ crm->num_transmissions = GNUNET_SYSERR;
+ }
+ else if (GNUNET_SYSERR != crm->num_transmissions)
+ {
+ /* Increment transmission counter, and possibly store @a cid
+ if this was the first transmission. */
+ crm->num_transmissions++;
+ if (1 == crm->num_transmissions)
+ {
+ crm->first_transmission_time = GNUNET_TIME_absolute_get ();
+ crm->connection_taken = *cid;
+ GCC_ack_expected (cid);
+ }
+ }
if (0 == crm->retry_delay.rel_value_us)
crm->retry_delay = ch->expected_delay;
+ else
+ crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
+ crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
+ MIN_RTT_DELAY);
crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
- /* find position for re-insertion into the DLL */
- if ( (NULL == ch->head_sent) ||
- (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
+ GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
+ cmp_crm_by_next_retry,
+ NULL,
+ ch->head_sent,
+ ch->tail_sent,
+ crm);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message %u sent, next transmission on %s in %s\n",
+ (unsigned int) ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch),
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
+ GNUNET_YES));
+ if (NULL == ch->head_sent->qe)
{
- /* insert at HEAD, also (re)schedule retry task! */
- GNUNET_CONTAINER_DLL_insert (ch->head_sent,
- ch->tail_sent,
- crm);
if (NULL != ch->retry_data_task)
GNUNET_SCHEDULER_cancel (ch->retry_data_task);
ch->retry_data_task
- = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
- &retry_transmission,
- ch);
- return;
- }
- for (off = ch->head_sent; NULL != off; off = off->next)
- if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
- break;
- if (NULL == off)
- {
- /* insert at tail */
- GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
- ch->tail_sent,
- crm);
- }
- else
- {
- /* insert before off */
- GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
- ch->tail_sent,
- off->prev,
- crm);
+ = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
+ &retry_transmission,
+ ch);
}
}
* buffer space in the tunnel.
*
* @param ch Channel.
- * @param sender client sending the data
+ * @param sender_ccn ccn of the sender
* @param buf payload to transmit.
* @param buf_len number of bytes in @a buf
* @return #GNUNET_OK if everything goes well,
*/
int
GCCH_handle_local_data (struct CadetChannel *ch,
- struct CadetClient *sender,
+ struct GNUNET_CADET_ClientChannelNumber sender_ccn,
const char *buf,
size_t buf_len)
{
if (GNUNET_YES == ch->is_loopback)
{
- struct CadetClient *receiver;
+ struct CadetChannelClient *receiver;
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_LocalData *ld;
+ int to_owner;
env = GNUNET_MQ_msg_extra (ld,
buf_len,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
- receiver = (ch->owner == sender) ? ch->dest : ch->owner;
- ld->ccn = (ch->owner == sender) ? ch->ccn_dest : ch->ccn_owner;
+ if (sender_ccn.channel_of_client ==
+ ch->owner->ccn.channel_of_client)
+ {
+ receiver = ch->dest;
+ to_owner = GNUNET_NO;
+ }
+ else
+ {
+ GNUNET_assert (sender_ccn.channel_of_client ==
+ ch->dest->ccn.channel_of_client);
+ receiver = ch->owner;
+ to_owner = GNUNET_YES;
+ }
+ ld->ccn = receiver->ccn;
GNUNET_memcpy (&ld[1],
buf,
buf_len);
- /* FIXME: this does not provide for flow control! */
- GSC_send_to_client (receiver,
- env);
- send_ack_to_client (ch,
- sender);
+ if (GNUNET_YES == receiver->client_ready)
+ {
+ GSC_send_to_client (receiver->c,
+ env);
+ send_ack_to_client (ch,
+ to_owner);
+ }
+ else
+ {
+ struct CadetOutOfOrderMessage *oom;
+
+ oom = GNUNET_new (struct CadetOutOfOrderMessage);
+ oom->env = env;
+ GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
+ receiver->tail_recv,
+ oom);
+ receiver->num_recv++;
+ }
return GNUNET_OK;
}
/* Everything is correct, send the message. */
- crm = GNUNET_malloc (sizeof (*crm) + buf_len);
+ crm = GNUNET_malloc (sizeof (*crm));
crm->ch = ch;
- crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
- crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
+ crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
+ + buf_len);
+ crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
+ crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
- crm->data_message.mid = ch->mid_send;
- crm->data_message.ctn = ch->ctn;
- GNUNET_memcpy (&crm[1],
+ crm->data_message->mid = ch->mid_send;
+ crm->data_message->ctn = ch->ctn;
+ GNUNET_memcpy (&crm->data_message[1],
buf,
buf_len);
- GNUNET_CONTAINER_DLL_insert (ch->head_sent,
- ch->tail_sent,
- crm);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
+ ch->tail_sent,
+ crm);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending %u bytes from local client to %s\n",
- buf_len,
- GCCH_2s (ch));
+ "Sending message %u from local client to %s with %u bytes\n",
+ ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch),
+ buf_len);
+ if (NULL != ch->retry_data_task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
+ }
crm->qe = GCT_send (ch->t,
- &crm->data_message.header,
+ &crm->data_message->header,
&data_sent_cb,
crm);
- GCCH_check_allow_client (ch);
+ GNUNET_assert (NULL == ch->retry_data_task);
return GNUNET_OK;
}
/**
- * Try to deliver messages to the local client, if it is ready for more.
+ * Handle ACK from client on local channel. Means the client is ready
+ * for more data, see if we have any for it.
*
- * @param ch channel to process
+ * @param ch channel to destroy
+ * @param client_ccn ccn of the client sending the ack
*/
-static void
-send_client_buffered_data (struct CadetChannel *ch)
+void
+GCCH_handle_local_ack (struct CadetChannel *ch,
+ struct GNUNET_CADET_ClientChannelNumber client_ccn)
{
+ struct CadetChannelClient *ccc;
struct CadetOutOfOrderMessage *com;
- if (GNUNET_NO == ch->client_ready)
- return; /* client not ready */
- com = ch->head_recv;
+ if ( (NULL != ch->owner) &&
+ (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
+ ccc = ch->owner;
+ else if ( (NULL != ch->dest) &&
+ (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
+ ccc = ch->dest;
+ else
+ GNUNET_assert (0);
+ ccc->client_ready = GNUNET_YES;
+ com = ccc->head_recv;
if (NULL == com)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
+ GSC_2s (ccc->c),
+ ntohl (client_ccn.channel_of_client),
+ GCCH_2s (ch),
+ ntohl (ccc->ccn.channel_of_client),
+ ccc);
return; /* none pending */
+ }
+ if (GNUNET_YES == ch->is_loopback)
+ {
+ int to_owner;
+
+ /* Messages are always in-order, just send */
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ GSC_send_to_client (ccc->c,
+ com->env);
+ /* Notify sender that we can receive more */
+ if (ccc->ccn.channel_of_client ==
+ ch->owner->ccn.channel_of_client)
+ {
+ to_owner = GNUNET_NO;
+ }
+ else
+ {
+ GNUNET_assert (ccc->ccn.channel_of_client ==
+ ch->dest->ccn.channel_of_client);
+ to_owner = GNUNET_YES;
+ }
+ send_ack_to_client (ch,
+ to_owner);
+ GNUNET_free (com);
+ return;
+ }
+
if ( (com->mid.mid != ch->mid_recv.mid) &&
- (GNUNET_NO == ch->out_of_order) )
+ (GNUNET_NO == ch->out_of_order) &&
+ (GNUNET_YES == ch->reliable) )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
+ GSC_2s (ccc->c),
+ ntohl (ccc->ccn.channel_of_client),
+ ntohl (com->mid.mid),
+ ntohl (ch->mid_recv.mid));
return; /* missing next one in-order */
+ }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Passing payload message to client on %s\n",
- GCCH_2s (ch));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
+ ntohl (com->mid.mid),
+ GSC_2s (ccc->c),
+ ntohl (ccc->ccn.channel_of_client),
+ GCCH_2s (ch));
/* all good, pass next message to client */
- GNUNET_CONTAINER_DLL_remove (ch->head_recv,
- ch->tail_recv,
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
com);
+ ccc->num_recv--;
/* FIXME: if unreliable, this is not aggressive
enough, as it would be OK to have lost some! */
+
ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
ch->mid_futures >>= 1; /* equivalent to division by 2 */
- GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
+ ccc->client_ready = GNUNET_NO;
+ GSC_send_to_client (ccc->c,
com->env);
GNUNET_free (com);
- if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
- (GNUNET_YES == ch->reliable) )
- {
- /* The next 15 messages were also already received (0xFF), this
- suggests that the sender may be blocked on flow control
- urgently waiting for an ACK from us. (As we have an inherent
- maximum of 64 bits, and 15 is getting too close for comfort.)
- So we should send one now. */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sender on %s likely blocked on flow-control, sending ACK now.\n",
- GCCH_2s (ch));
- if (GNUNET_YES == ch->reliable)
- send_channel_data_ack (ch);
- }
-
- if (NULL != ch->head_recv)
+ send_channel_data_ack (ch);
+ if (NULL != ccc->head_recv)
return;
if (GNUNET_NO == ch->destroy)
return;
}
-/**
- * Handle ACK from client on local channel.
- *
- * @param ch channel to destroy
- */
-void
-GCCH_handle_local_ack (struct CadetChannel *ch)
-{
- ch->client_ready = GNUNET_YES;
- send_client_buffered_data (ch);
-}
-
-
#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
{
LOG2 (level,
"CHN origin %s ready %s local-id: %u\n",
- GSC_2s (ch->owner),
- ch->client_ready ? "YES" : "NO",
- ntohl (ch->ccn_owner.channel_of_client));
+ GSC_2s (ch->owner->c),
+ ch->owner->client_ready ? "YES" : "NO",
+ ntohl (ch->owner->ccn.channel_of_client));
}
if (NULL != ch->dest)
{
LOG2 (level,
"CHN destination %s ready %s local-id: %u\n",
- GSC_2s (ch->dest),
- ch->client_ready ? "YES" : "NO",
- ntohl (ch->ccn_dest.channel_of_client));
+ GSC_2s (ch->dest->c),
+ ch->dest->client_ready ? "YES" : "NO",
+ ntohl (ch->dest->ccn.channel_of_client));
}
LOG2 (level,
"CHN Message IDs recv: %d (%LLX), send: %d\n",