/*
This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2001-2017 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
+ Affero General Public License for more details.
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ SPDX-License-Identifier: AGPL3.0-or-later
+*/
+/**
+ * @file cadet/gnunet-service-cadet_channel.c
+ * @brief logical links between CADET clients
+ * @author Bartlomiej Polot
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - Congestion/flow control:
+ * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
+ * (and figure out how/where to use this!)
+ * + figure out flow control without ACKs (unreliable traffic!)
+ * - revisit handling of 'unbuffered' traffic!
+ * (need to push down through tunnel into connection selection)
+ * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
+ * reserve more bits in 'options' to allow for buffer size control?
+ */
#include "platform.h"
-#include "gnunet_util_lib.h"
-
+#include "cadet.h"
#include "gnunet_statistics_service.h"
+#include "gnunet-service-cadet_channel.h"
+#include "gnunet-service-cadet_connection.h"
+#include "gnunet-service-cadet_tunnels.h"
+#include "gnunet-service-cadet_paths.h"
-#include "cadet.h"
-#include "cadet_protocol.h"
+#define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
-#include "gnunet-service-cadet_channel.h"
-#include "gnunet-service-cadet_local.h"
-#include "gnunet-service-cadet_tunnel.h"
-#include "gnunet-service-cadet_peer.h"
+/**
+ * How long do we initially wait before retransmitting?
+ */
+#define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
-#define LOG(level, ...) GNUNET_log_from(level,"cadet-chn",__VA_ARGS__)
+/**
+ * How long do we wait before dropping state about incoming
+ * connection to closed port?
+ */
+#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
+
+/**
+ * How long do we wait at least before retransmitting ever?
+ */
+#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
-#define CADET_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(\
- GNUNET_TIME_UNIT_MILLISECONDS, 250)
-#define CADET_RETRANSMIT_MARGIN 4
+/**
+ * Maximum message ID into the future we accept for out-of-order messages.
+ * If the message is more than this into the future, we drop it. This is
+ * important both to detect values that are actually in the past, as well
+ * as to limit adversarially triggerable memory consumption.
+ *
+ * Note that right now we have "max_pending_messages = 4" hard-coded in
+ * the logic below, so a value of 4 would suffice here. But we plan to
+ * allow larger windows in the future...
+ */
+#define MAX_OUT_OF_ORDER_DISTANCE 1024
/**
- * All the states a connection can be in.
+ * All the states a channel can be in.
*/
enum CadetChannelState
{
CADET_CHANNEL_NEW,
/**
- * Connection create message sent, waiting for ACK.
+ * Channel is to a port that is not open, we're waiting for the
+ * port to be opened.
+ */
+ CADET_CHANNEL_LOOSE,
+
+ /**
+ * CHANNEL_OPEN message sent, waiting for CHANNEL_OPEN_ACK.
*/
- CADET_CHANNEL_SENT,
+ CADET_CHANNEL_OPEN_SENT,
/**
* Connection confirmed, ready to carry traffic.
*/
- CADET_CHANNEL_READY,
+ CADET_CHANNEL_READY
};
/**
- * Info holder for channel messages in queues.
+ * Info needed to retry a message in case it gets lost.
+ * Note that we DO use this structure also for unreliable
+ * messages.
*/
-struct CadetChannelQueue
+struct CadetReliableMessage
{
/**
- * Tunnel Queue.
+ * Double linked list, FIFO style
*/
- struct CadetTunnelQueue *tq;
+ struct CadetReliableMessage *next;
/**
- * Message type (DATA/DATA_ACK)
+ * Double linked list, FIFO style
*/
- uint16_t type;
+ struct CadetReliableMessage *prev;
/**
- * Message copy (for DATAs, to start retransmission timer)
+ * Which channel is this message in?
*/
- struct CadetReliableMessage *copy;
+ struct CadetChannel *ch;
/**
- * Reliability (for DATA_ACKs, to access rel->ack_q)
+ * Entry in the tunnels queue for this message, NULL if it has left
+ * the tunnel. Used to cancel transmission in case we receive an
+ * ACK in time.
*/
- struct CadetChannelReliability *rel;
-};
+ struct CadetTunnelQueueEntry *qe;
+ /**
+ * Data message we are trying to send.
+ */
+ struct GNUNET_CADET_ChannelAppDataMessage *data_message;
-/**
- * Info needed to retry a message in case it gets lost.
- */
-struct CadetReliableMessage
-{
- /**
- * Double linked list, FIFO style
- */
- struct CadetReliableMessage *next;
- struct CadetReliableMessage *prev;
-
- /**
- * Type of message (payload, channel management).
- */
- int16_t type;
+ /**
+ * How soon should we retry if we fail to get an ACK?
+ * Messages in the queue are sorted by this value.
+ */
+ struct GNUNET_TIME_Absolute next_retry;
- /**
- * Tunnel Reliability queue this message is in.
- */
- struct CadetChannelReliability *rel;
+ /**
+ * How long do we wait for an ACK after transmission?
+ * Use for the back-off calculation.
+ */
+ struct GNUNET_TIME_Relative retry_delay;
- /**
- * ID of the message (ACK needed to free)
- */
- uint32_t mid;
+ /**
+ * Time when we first successfully transmitted the message
+ * (that is, set @e num_transmissions to 1).
+ */
+ struct GNUNET_TIME_Absolute first_transmission_time;
/**
- * Tunnel Queue.
+ * Identifier of the connection that this message took when it
+ * was first transmitted. Only useful if @e num_transmissions is 1.
*/
- struct CadetChannelQueue *chq;
+ struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
- /**
- * When was this message issued (to calculate ACK delay)
- */
- struct GNUNET_TIME_Absolute timestamp;
+ /**
+ * How often was this message transmitted? #GNUNET_SYSERR if there
+ * was an error transmitting the message, #GNUNET_NO if it was not
+ * yet transmitted ever, otherwise the number of (re) transmissions.
+ */
+ int num_transmissions;
- /* struct GNUNET_CADET_Data with payload */
};
/**
- * Info about the traffic state for a client in a channel.
+ * List of received out-of-order data messages.
*/
-struct CadetChannelReliability
+struct CadetOutOfOrderMessage
{
- /**
- * Channel this is about.
- */
- struct CadetChannel *ch;
+ /**
+ * Double linked list, FIFO style
+ */
+ struct CadetOutOfOrderMessage *next;
- /**
- * DLL of messages sent and not yet ACK'd.
- */
- struct CadetReliableMessage *head_sent;
- struct CadetReliableMessage *tail_sent;
+ /**
+ * Double linked list, FIFO style
+ */
+ struct CadetOutOfOrderMessage *prev;
- /**
- * DLL of messages received out of order.
- */
- struct CadetReliableMessage *head_recv;
- struct CadetReliableMessage *tail_recv;
+ /**
+ * ID of the message (messages up to this point needed
+ * before we give this one to the client).
+ */
+ struct ChannelMessageIdentifier mid;
- /**
- * Messages received.
- */
- unsigned int n_recv;
+ /**
+ * The envelope with the payload of the out-of-order message
+ */
+ struct GNUNET_MQ_Envelope *env;
- /**
- * Next MID to use for outgoing traffic.
- */
- uint32_t mid_send;
+};
- /**
- * Next MID expected for incoming traffic.
- */
- uint32_t mid_recv;
- /**
- * Handle for queued unique data CREATE, DATA_ACK.
- */
- struct CadetChannelQueue *uniq;
+/**
+ * Client endpoint of a `struct CadetChannel`. A channel may be a
+ * loopback channel, in which case it has two of these endpoints.
+ * Note that flow control also is required in both directions.
+ */
+struct CadetChannelClient
+{
+ /**
+ * Client handle. Not by itself sufficient to designate
+ * the client endpoint, as the same client handle may
+ * be used for both the owner and the destination, and
+ * we thus also need the channel ID to identify the client.
+ */
+ struct CadetClient *c;
- /**
- * Can we send data to the client?
- */
- int client_ready;
+ /**
+ * Head of DLL of messages received out of order or while client was unready.
+ */
+ struct CadetOutOfOrderMessage *head_recv;
/**
- * Can the client send data to us?
+ * Tail DLL of messages received out of order or while client was unready.
*/
- int client_allowed;
+ struct CadetOutOfOrderMessage *tail_recv;
- /**
- * Task to resend/poll in case no ACK is received.
- */
- GNUNET_SCHEDULER_TaskIdentifier retry_task;
+ /**
+ * Local tunnel number for this client.
+ * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
+ * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
+ */
+ struct GNUNET_CADET_ClientChannelNumber ccn;
- /**
- * Counter for exponential backoff.
- */
- struct GNUNET_TIME_Relative retry_timer;
+ /**
+ * Number of entries currently in @a head_recv DLL.
+ */
+ unsigned int num_recv;
+
+ /**
+ * Can we send data to the client?
+ */
+ int client_ready;
- /**
- * How long does it usually take to get an ACK.
- */
- struct GNUNET_TIME_Relative expected_delay;
};
*/
struct CadetChannel
{
- /**
- * Tunnel this channel is in.
- */
+ /**
+ * Tunnel this channel is in.
+ */
struct CadetTunnel *t;
- /**
- * Destination port of the channel.
- */
- uint32_t port;
+ /**
+ * Client owner of the tunnel, if any.
+ * (Used if this channel represends the initiating end of the tunnel.)
+ */
+ struct CadetChannelClient *owner;
- /**
- * Global channel number ( < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
- */
- CADET_ChannelNumber gid;
+ /**
+ * Client destination of the tunnel, if any.
+ * (Used if this channel represents the listening end of the tunnel.)
+ */
+ struct CadetChannelClient *dest;
- /**
- * Local tunnel number for root (owner) client.
- * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
- */
- CADET_ChannelNumber lid_root;
+ /**
+ * Last entry in the tunnel's queue relating to control messages
+ * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
+ * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
+ * transmission in case we receive updated information.
+ */
+ struct CadetTunnelQueueEntry *last_control_qe;
- /**
- * Local tunnel number for local destination clients (incoming number)
- * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV or 0).
- */
- CADET_ChannelNumber lid_dest;
+ /**
+ * Head of DLL of messages sent and not yet ACK'd.
+ */
+ struct CadetReliableMessage *head_sent;
- /**
- * Channel state.
- */
- enum CadetChannelState state;
+ /**
+ * Tail of DLL of messages sent and not yet ACK'd.
+ */
+ struct CadetReliableMessage *tail_sent;
- /**
- * Is the tunnel bufferless (minimum latency)?
- */
- int nobuffer;
+ /**
+ * Task to resend/poll in case no ACK is received.
+ */
+ struct GNUNET_SCHEDULER_Task *retry_control_task;
- /**
- * Is the tunnel reliable?
- */
- int reliable;
+ /**
+ * Task to resend/poll in case no ACK is received.
+ */
+ struct GNUNET_SCHEDULER_Task *retry_data_task;
- /**
- * Last time the channel was used
- */
+ /**
+ * Last time the channel was used
+ */
struct GNUNET_TIME_Absolute timestamp;
- /**
- * Client owner of the tunnel, if any
- */
- struct CadetClient *root;
+ /**
+ * Destination port of the channel.
+ */
+ struct GNUNET_HashCode port;
- /**
- * Client destination of the tunnel, if any.
- */
- struct CadetClient *dest;
+ /**
+ * Hash'ed port of the channel with initiator and destination PID.
+ */
+ struct GNUNET_HashCode h_port;
- /**
- * Flag to signal the destruction of the channel.
- * If this is set GNUNET_YES the channel will be destroyed
- * when the queue is empty.
- */
- int destroy;
+ /**
+ * Counter for exponential backoff.
+ */
+ struct GNUNET_TIME_Relative retry_time;
- /**
- * Total (reliable) messages pending ACK for this channel.
- */
+ /**
+ * Bitfield of already-received messages past @e mid_recv.
+ */
+ uint64_t mid_futures;
+
+ /**
+ * Next MID expected for incoming traffic.
+ */
+ struct ChannelMessageIdentifier mid_recv;
+
+ /**
+ * Next MID to use for outgoing traffic.
+ */
+ struct ChannelMessageIdentifier mid_send;
+
+ /**
+ * Total (reliable) messages pending ACK for this channel.
+ */
unsigned int pending_messages;
- /**
- * Reliability data.
- * Only present (non-NULL) at the owner of a tunnel.
- */
- struct CadetChannelReliability *root_rel;
+ /**
+ * Maximum (reliable) messages pending ACK for this channel
+ * before we throttle the client.
+ */
+ unsigned int max_pending_messages;
- /**
- * Reliability data.
- * Only present (non-NULL) at the destination of a tunnel.
- */
- struct CadetChannelReliability *dest_rel;
+ /**
+ * Number identifying this channel in its tunnel.
+ */
+ struct GNUNET_CADET_ChannelTunnelNumber ctn;
-};
+ /**
+ * Channel state.
+ */
+ enum CadetChannelState state;
+ /**
+ * Count how many ACKs we skipped, used to prevent long
+ * sequences of ACK skipping.
+ */
+ unsigned int skip_ack_series;
-/******************************************************************************/
-/******************************* GLOBALS ***********************************/
-/******************************************************************************/
+ /**
+ * Is the tunnel bufferless (minimum latency)?
+ */
+ int nobuffer;
-/**
- * Global handle to the statistics service.
- */
-extern struct GNUNET_STATISTICS_Handle *stats;
+ /**
+ * Is the tunnel reliable?
+ */
+ int reliable;
-/**
- * Local peer own ID (memory efficient handle).
- */
-extern GNUNET_PEER_Id myid;
+ /**
+ * Is the tunnel out-of-order?
+ */
+ int out_of_order;
+ /**
+ * Is this channel a loopback channel, where the destination is us again?
+ */
+ int is_loopback;
+
+ /**
+ * Flag to signal the destruction of the channel. If this is set to
+ * #GNUNET_YES the channel will be destroyed once the queue is
+ * empty.
+ */
+ int destroy;
-/******************************************************************************/
-/******************************** STATIC ***********************************/
-/******************************************************************************/
+};
/**
- * Destroy a reliable message after it has been acknowledged, either by
- * direct mid ACK or bitfield. Updates the appropriate data structures and
- * timers and frees all memory.
+ * Get the static string for identification of the channel.
*
- * @param copy Message that is no longer needed: remote peer got it.
- * @param update_time Is the timing information relevant?
- * If this message is ACK in a batch the timing information
- * is skewed by the retransmission, count only for the
- * retransmitted message.
+ * @param ch Channel.
*
- * @return #GNUNET_YES if channel was destroyed as a result of the call,
- * #GNUNET_NO otherwise.
+ * @return Static string with the channel IDs.
*/
-static int
-rel_message_free (struct CadetReliableMessage *copy, int update_time);
+const char *
+GCCH_2s (const struct CadetChannel *ch)
+{
+ static char buf[128];
+
+ GNUNET_snprintf (buf,
+ sizeof (buf),
+ "Channel %s:%s ctn:%X(%X/%X)",
+ (GNUNET_YES == ch->is_loopback)
+ ? "loopback"
+ : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
+ GNUNET_h2s (&ch->port),
+ ch->ctn,
+ (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
+ (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
+ return buf;
+}
-/**
- * send a channel create message.
- *
- * @param ch Channel for which to send.
- */
-static void
-send_create (struct CadetChannel *ch);
/**
- * Confirm we got a channel create, FWD ack.
+ * Hash the @a port and @a initiator and @a listener to
+ * calculate the "challenge" @a h_port we send to the other
+ * peer on #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN.
*
- * @param ch The channel to confirm.
- * @param fwd Should we send a FWD ACK? (going dest->root)
- * @param reaction This ACK is a reaction to a duplicate CREATE, don't save.
+ * @param[out] h_port set to the hash of @a port, @a initiator and @a listener
+ * @param port cadet port, as seen by CADET clients
+ * @param listener peer that is listining on @a port
*/
-static void
-send_ack (struct CadetChannel *ch, int fwd, int reaction);
-
+void
+GCCH_hash_port (struct GNUNET_HashCode *h_port,
+ const struct GNUNET_HashCode *port,
+ const struct GNUNET_PeerIdentity *listener)
+{
+ struct GNUNET_HashContext *hc;
+
+ hc = GNUNET_CRYPTO_hash_context_start ();
+ GNUNET_CRYPTO_hash_context_read (hc,
+ port,
+ sizeof (*port));
+ GNUNET_CRYPTO_hash_context_read (hc,
+ listener,
+ sizeof (*listener));
+ GNUNET_CRYPTO_hash_context_finish (hc,
+ h_port);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated port hash %s\n",
+ GNUNET_h2s (h_port));
+}
/**
- * Test if the channel is loopback: both root and dest are on the local peer.
+ * Get the channel's public ID.
*
- * @param ch Channel to test.
+ * @param ch Channel.
*
- * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise.
+ * @return ID used to identify the channel with the remote peer.
*/
-static int
-is_loopback (const struct CadetChannel *ch)
+struct GNUNET_CADET_ChannelTunnelNumber
+GCCH_get_id (const struct CadetChannel *ch)
{
- if (NULL != ch->t)
- return GCT_is_loopback (ch->t);
-
- return (NULL != ch->root && NULL != ch->dest);
+ return ch->ctn;
}
/**
- * Save a copy of the data message for later retransmission.
+ * Release memory associated with @a ccc
*
- * @param msg Message to copy.
- * @param mid Message ID.
- * @param rel Reliability data for retransmission.
+ * @param ccc data structure to clean up
*/
-static struct CadetReliableMessage *
-copy_message (const struct GNUNET_CADET_Data *msg, uint32_t mid,
- struct CadetChannelReliability *rel)
+static void
+free_channel_client (struct CadetChannelClient *ccc)
{
- struct CadetReliableMessage *copy;
- uint16_t size;
-
- size = ntohs (msg->header.size);
- copy = GNUNET_malloc (sizeof (*copy) + size);
- copy->mid = mid;
- copy->rel = rel;
- copy->type = GNUNET_MESSAGE_TYPE_CADET_DATA;
- memcpy (©[1], msg, size);
+ struct CadetOutOfOrderMessage *com;
- return copy;
+ while (NULL != (com = ccc->head_recv))
+ {
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ GNUNET_MQ_discard (com->env);
+ GNUNET_free (com);
+ }
+ GNUNET_free (ccc);
}
+
/**
- * We have received a message out of order, or the client is not ready.
- * Buffer it until we receive an ACK from the client or the missing
- * message from the channel.
+ * Destroy the given channel.
*
- * @param msg Message to buffer (MUST be of type CADET_DATA).
- * @param rel Reliability data to the corresponding direction.
+ * @param ch channel to destroy
*/
static void
-add_buffered_data (const struct GNUNET_CADET_Data *msg,
- struct CadetChannelReliability *rel)
+channel_destroy (struct CadetChannel *ch)
{
- struct CadetReliableMessage *copy;
- struct CadetReliableMessage *prev;
- uint32_t mid;
-
- mid = ntohl (msg->mid);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data %u\n", mid);
+ struct CadetReliableMessage *crm;
- rel->n_recv++;
-
- // FIXME do something better than O(n), although n < 64...
- // FIXME start from the end (most messages are the latest ones)
- for (prev = rel->head_recv; NULL != prev; prev = prev->next)
+ while (NULL != (crm = ch->head_sent))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " prev %u\n", prev->mid);
- if (prev->mid == mid)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " already there!\n");
- return;
- }
- else if (GC_is_pid_bigger (prev->mid, mid))
+ GNUNET_assert (ch == crm->ch);
+ if (NULL != crm->qe)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " bingo!\n");
- copy = copy_message (msg, mid, rel);
- GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv,
- prev, copy);
- return;
+ GCT_send_cancel (crm->qe);
+ crm->qe = NULL;
}
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ GNUNET_free (crm->data_message);
+ GNUNET_free (crm);
+ }
+ if (CADET_CHANNEL_LOOSE == ch->state)
+ {
+ GSC_drop_loose_channel (&ch->h_port,
+ ch);
+ }
+ if (NULL != ch->owner)
+ {
+ free_channel_client (ch->owner);
+ ch->owner = NULL;
+ }
+ if (NULL != ch->dest)
+ {
+ free_channel_client (ch->dest);
+ ch->dest = NULL;
+ }
+ if (NULL != ch->last_control_qe)
+ {
+ GCT_send_cancel (ch->last_control_qe);
+ ch->last_control_qe = NULL;
+ }
+ if (NULL != ch->retry_data_task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
+ }
+ if (NULL != ch->retry_control_task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->retry_control_task);
+ ch->retry_control_task = NULL;
+ }
+ if (GNUNET_NO == ch->is_loopback)
+ {
+ GCT_remove_channel (ch->t,
+ ch,
+ ch->ctn);
+ ch->t = NULL;
}
- copy = copy_message (msg, mid, rel);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " insert at tail!\n");
- GNUNET_CONTAINER_DLL_insert_tail (rel->head_recv, rel->tail_recv, copy);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n");
+ GNUNET_free (ch);
}
/**
- * Add a destination client to a channel, initializing all data structures
- * in the channel and the client.
+ * Send a channel create message.
*
- * @param ch Channel to which add the destination.
- * @param c Client which to add to the channel.
+ * @param cls Channel for which to send.
*/
static void
-add_destination (struct CadetChannel *ch, struct CadetClient *c)
-{
- if (NULL != ch->dest)
- {
- GNUNET_break (0);
- return;
- }
-
- /* Assign local id as destination */
- ch->lid_dest = GML_get_next_chid (c);
-
- /* Store in client's hashmap */
- GML_channel_add (c, ch->lid_dest, ch);
-
- GNUNET_break (NULL == ch->dest_rel);
- ch->dest_rel = GNUNET_new (struct CadetChannelReliability);
- ch->dest_rel->ch = ch;
- ch->dest_rel->expected_delay.rel_value_us = 0;
- ch->dest_rel->retry_timer = CADET_RETRANSMIT_TIME;
-
- ch->dest = c;
-}
+send_channel_open (void *cls);
/**
- * Set options in a channel, extracted from a bit flag field.
+ * Function called once the tunnel confirms that we sent the
+ * create message. Delays for a bit until we retry.
*
- * @param ch Channel to set options to.
- * @param options Bit array in host byte order.
+ * @param cls our `struct CadetChannel`.
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
static void
-channel_set_options (struct CadetChannel *ch, uint32_t options)
+channel_open_sent_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
- ch->nobuffer = (options & GNUNET_CADET_OPTION_NOBUFFER) != 0 ?
- GNUNET_YES : GNUNET_NO;
- ch->reliable = (options & GNUNET_CADET_OPTION_RELIABLE) != 0 ?
- GNUNET_YES : GNUNET_NO;
+ struct CadetChannel *ch = cls;
+
+ GNUNET_assert (NULL != ch->last_control_qe);
+ ch->last_control_qe = NULL;
+ ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
+ GCCH_2s (ch),
+ GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
+ GNUNET_YES));
+ ch->retry_control_task
+ = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
+ &send_channel_open,
+ ch);
}
/**
- * Get a bit flag field with the options of a channel.
- *
- * @param ch Channel to get options from.
+ * Send a channel open message.
*
- * @return Bit array in host byte order.
+ * @param cls Channel for which to send.
*/
-static uint32_t
-channel_get_options (struct CadetChannel *ch)
+static void
+send_channel_open (void *cls)
{
+ struct CadetChannel *ch = cls;
+ struct GNUNET_CADET_ChannelOpenMessage msgcc;
uint32_t options;
+ ch->retry_control_task = NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending CHANNEL_OPEN message for %s\n",
+ GCCH_2s (ch));
options = 0;
if (ch->nobuffer)
options |= GNUNET_CADET_OPTION_NOBUFFER;
if (ch->reliable)
options |= GNUNET_CADET_OPTION_RELIABLE;
-
- return options;
+ if (ch->out_of_order)
+ options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
+ msgcc.header.size = htons (sizeof (msgcc));
+ msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
+ msgcc.opt = htonl (options);
+ msgcc.h_port = ch->h_port;
+ msgcc.ctn = ch->ctn;
+ ch->state = CADET_CHANNEL_OPEN_SENT;
+ if (NULL != ch->last_control_qe)
+ GCT_send_cancel (ch->last_control_qe);
+ ch->last_control_qe = GCT_send (ch->t,
+ &msgcc.header,
+ &channel_open_sent_cb,
+ ch);
+ GNUNET_assert (NULL == ch->retry_control_task);
}
/**
- * Notify a client that the channel is no longer valid.
- *
- * @param ch Channel that is destroyed.
- * @param local_only Should we avoid sending it to other peers?
+ * Function called once and only once after a channel was bound
+ * to its tunnel via #GCT_add_channel() is ready for transmission.
+ * Note that this is only the case for channels that this peer
+ * initiates, as for incoming channels we assume that they are
+ * ready for transmission immediately upon receiving the open
+ * message. Used to bootstrap the #GCT_send() process.
+ *
+ * @param ch the channel for which the tunnel is now ready
*/
-static void
-send_destroy (struct CadetChannel *ch, int local_only)
+void
+GCCH_tunnel_up (struct CadetChannel *ch)
{
- struct GNUNET_CADET_ChannelManage msg;
-
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
- msg.header.size = htons (sizeof (msg));
- msg.chid = htonl (ch->gid);
-
- /* If root is not NULL, notify.
- * If it's NULL, check lid_root. When a local destroy comes in, root
- * is set to NULL but lid_root is left untouched. In this case, do nothing,
- * the client is the one who requested the channel to be destroyed.
- */
- if (NULL != ch->root)
- GML_send_channel_destroy (ch->root, ch->lid_root);
- else if (0 == ch->lid_root && GNUNET_NO == local_only)
- GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
-
- if (NULL != ch->dest)
- GML_send_channel_destroy (ch->dest, ch->lid_dest);
- else if (0 == ch->lid_dest && GNUNET_NO == local_only)
- GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL);
+ GNUNET_assert (NULL == ch->retry_control_task);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Tunnel up, sending CHANNEL_OPEN on %s now\n",
+ GCCH_2s (ch));
+ ch->retry_control_task
+ = GNUNET_SCHEDULER_add_now (&send_channel_open,
+ ch);
}
/**
- * Notify the destination client that a new incoming channel was created.
+ * Create a new channel.
*
- * @param ch Channel that was created.
+ * @param owner local client owning the channel
+ * @param ccn local number of this channel at the @a owner
+ * @param destination peer to which we should build the channel
+ * @param port desired port at @a destination
+ * @param options options for the channel
+ * @return handle to the new channel
*/
-static void
-send_client_create (struct CadetChannel *ch)
+struct CadetChannel *
+GCCH_channel_local_new (struct CadetClient *owner,
+ struct GNUNET_CADET_ClientChannelNumber ccn,
+ struct CadetPeer *destination,
+ const struct GNUNET_HashCode *port,
+ uint32_t options)
{
- uint32_t opt;
-
- if (NULL == ch->dest)
- return;
-
- opt = 0;
- opt |= GNUNET_YES == ch->reliable ? GNUNET_CADET_OPTION_RELIABLE : 0;
- opt |= GNUNET_YES == ch->nobuffer ? GNUNET_CADET_OPTION_NOBUFFER : 0;
- GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt,
- GCT_get_destination (ch->t));
-
-}
+ struct CadetChannel *ch;
+ struct CadetChannelClient *ccco;
+ ccco = GNUNET_new (struct CadetChannelClient);
+ ccco->c = owner;
+ ccco->ccn = ccn;
+ ccco->client_ready = GNUNET_YES;
-/**
- * Send data to a client.
- *
- * If the client is ready, send directly, otherwise buffer while listening
- * for a local ACK.
- *
- * @param ch Channel
- * @param msg Message.
- * @param fwd Is this a fwd (root->dest) message?
- */
-static void
-send_client_data (struct CadetChannel *ch,
- const struct GNUNET_CADET_Data *msg,
- int fwd)
-{
- if (fwd)
- {
- if (ch->dest_rel->client_ready)
- GML_send_data (ch->dest, msg, ch->lid_dest);
+ ch = GNUNET_new (struct CadetChannel);
+ ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
+ ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
+ ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
+ ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
+ ch->owner = ccco;
+ ch->port = *port;
+ GCCH_hash_port (&ch->h_port,
+ port,
+ GCP_get_id (destination));
+ if (0 == GNUNET_memcmp (&my_full_id,
+ GCP_get_id (destination)))
+ {
+ struct OpenPort *op;
+
+ ch->is_loopback = GNUNET_YES;
+ op = GNUNET_CONTAINER_multihashmap_get (open_ports,
+ &ch->h_port);
+ if (NULL == op)
+ {
+ /* port closed, wait for it to possibly open */
+ ch->state = CADET_CHANNEL_LOOSE;
+ (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
+ &ch->h_port,
+ ch,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Created loose incoming loopback channel to port %s\n",
+ GNUNET_h2s (&ch->port));
+ }
else
- add_buffered_data (msg, ch->dest_rel);
+ {
+ GCCH_bind (ch,
+ op->c,
+ &op->port);
+ }
}
else
{
- if (ch->root_rel->client_ready)
- GML_send_data (ch->root, msg, ch->lid_root);
- else
- add_buffered_data (msg, ch->root_rel);
+ ch->t = GCP_get_tunnel (destination,
+ GNUNET_YES);
+ ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
+ ch->ctn = GCT_add_channel (ch->t,
+ ch);
}
+ GNUNET_STATISTICS_update (stats,
+ "# channels",
+ 1,
+ GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Created channel to port %s at peer %s for %s using %s\n",
+ GNUNET_h2s (port),
+ GCP_2s (destination),
+ GSC_2s (owner),
+ (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
+ return ch;
}
/**
- * Send a buffered message to the client, for in order delivery or
- * as result of client ACK.
+ * We had an incoming channel to a port that is closed.
+ * It has not been opened for a while, drop it.
*
- * @param ch Channel on which to empty the message buffer.
- * @param c Client to send to.
- * @param fwd Is this to send FWD data?.
+ * @param cls the channel to drop
*/
static void
-send_client_buffered_data (struct CadetChannel *ch,
- struct CadetClient *c,
- int fwd)
+timeout_closed_cb (void *cls)
{
- struct CadetReliableMessage *copy;
- struct CadetChannelReliability *rel;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
- rel = fwd ? ch->dest_rel : ch->root_rel;
- if (GNUNET_NO == rel->client_ready)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
- return;
- }
+ struct CadetChannel *ch = cls;
- copy = rel->head_recv;
- /* We never buffer channel management messages */
- if (NULL != copy)
- {
- if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable)
- {
- struct GNUNET_CADET_Data *msg = (struct GNUNET_CADET_Data *) ©[1];
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " have %u! now expecting %u\n",
- copy->mid, rel->mid_recv + 1);
- send_client_data (ch, msg, fwd);
- rel->n_recv--;
- rel->mid_recv++;
- GCCH_send_data_ack (ch, fwd);
- GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE RECV %p\n", copy);
- GNUNET_free (copy);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " reliable && don't have %u, next is %u\n",
- rel->mid_recv, copy->mid);
- if (GNUNET_YES == ch->destroy)
- {
- /* We don't have the next data piece and the remote peer has closed the
- * channel. We won't receive it anymore, so just destroy the channel.
- * FIXME: wait some time to allow other connections to
- * deliver missing messages
- */
- send_destroy (ch, GNUNET_YES);
- GCCH_destroy (ch);
- }
- }
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data END\n");
+ ch->retry_control_task = NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Closing incoming channel to port %s from peer %s due to timeout\n",
+ GNUNET_h2s (&ch->port),
+ GCP_2s (GCT_get_destination (ch->t)));
+ channel_destroy (ch);
}
/**
- * Allow a client to send more data.
+ * Create a new channel based on a request coming in over the network.
*
- * In case the client was already allowed to send data, do nothing.
- *
- * @param ch Channel.
- * @param fwd Is this a FWD ACK? (FWD ACKs are sent to root)
+ * @param t tunnel to the remote peer
+ * @param ctn identifier of this channel in the tunnel
+ * @param h_port desired hash of local port
+ * @param options options for the channel
+ * @return handle to the new channel
*/
-static void
-send_client_ack (struct CadetChannel *ch, int fwd)
+struct CadetChannel *
+GCCH_channel_incoming_new (struct CadetTunnel *t,
+ struct GNUNET_CADET_ChannelTunnelNumber ctn,
+ const struct GNUNET_HashCode *h_port,
+ uint32_t options)
{
- struct CadetChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel;
- struct CadetClient *c = fwd ? ch->root : ch->dest;
-
- if (NULL == c)
- {
- GNUNET_break (GNUNET_NO != ch->destroy);
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- " sending %s ack to client on channel %s\n",
- GC_f2s (fwd), GCCH_2s (ch));
+ struct CadetChannel *ch;
+ struct OpenPort *op;
- if (NULL == rel)
- {
- GNUNET_break (0);
- return;
+ ch = GNUNET_new (struct CadetChannel);
+ ch->h_port = *h_port;
+ ch->t = t;
+ ch->ctn = ctn;
+ ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
+ ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
+ ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
+ ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
+ GNUNET_STATISTICS_update (stats,
+ "# channels",
+ 1,
+ GNUNET_NO);
+
+ op = GNUNET_CONTAINER_multihashmap_get (open_ports,
+ h_port);
+ if (NULL == op)
+ {
+ /* port closed, wait for it to possibly open */
+ ch->state = CADET_CHANNEL_LOOSE;
+ (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
+ &ch->h_port,
+ ch,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_assert (NULL == ch->retry_control_task);
+ ch->retry_control_task
+ = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
+ &timeout_closed_cb,
+ ch);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Created loose incoming channel to port %s from peer %s\n",
+ GNUNET_h2s (&ch->port),
+ GCP_2s (GCT_get_destination (ch->t)));
}
-
- if (GNUNET_YES == rel->client_allowed)
+ else
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " already allowed\n");
- return;
+ GCCH_bind (ch,
+ op->c,
+ &op->port);
}
- rel->client_allowed = GNUNET_YES;
-
- GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest);
+ GNUNET_STATISTICS_update (stats,
+ "# channels",
+ 1,
+ GNUNET_NO);
+ return ch;
}
/**
- * Notify the root that the destination rejected the channel.
+ * Function called once the tunnel confirms that we sent the
+ * ACK message. Just remembers it was sent, we do not expect
+ * ACKs for ACKs ;-).
*
- * @param ch Rejected channel.
+ * @param cls our `struct CadetChannel`.
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
static void
-send_client_nack (struct CadetChannel *ch)
+send_ack_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
- if (NULL == ch->root)
- {
- GNUNET_break (0);
- return;
- }
- GML_send_channel_nack (ch->root, ch->lid_root);
+ struct CadetChannel *ch = cls;
+
+ GNUNET_assert (NULL != ch->last_control_qe);
+ ch->last_control_qe = NULL;
}
/**
- * We haven't received an ACK after a certain time: restransmit the message.
+ * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
*
- * @param cls Closure (CadetChannelReliability with the message to restransmit)
- * @param tc TaskContext.
+ * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
*/
static void
-channel_retransmit_message (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+send_channel_data_ack (struct CadetChannel *ch)
{
- struct CadetChannelReliability *rel = cls;
- struct CadetReliableMessage *copy;
- struct CadetChannel *ch;
- struct GNUNET_CADET_Data *payload;
- int fwd;
-
- rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
-
- ch = rel->ch;
- copy = rel->head_sent;
- if (NULL == copy)
- {
- GNUNET_break (0);
- return;
- }
-
- payload = (struct GNUNET_CADET_Data *) ©[1];
- fwd = (rel == ch->root_rel);
-
- /* Message not found in the queue that we are going to use. */
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid);
+ struct GNUNET_CADET_ChannelDataAckMessage msg;
- GCCH_send_prebuilt_message (&payload->header, ch, fwd, copy);
- GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
+ if (GNUNET_NO == ch->reliable)
+ return; /* no ACKs */
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
+ msg.header.size = htons (sizeof (msg));
+ msg.ctn = ch->ctn;
+ msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
+ msg.futures = GNUNET_htonll (ch->mid_futures);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending DATA_ACK %u:%llX via %s\n",
+ (unsigned int) ntohl (msg.mid.mid),
+ (unsigned long long) ch->mid_futures,
+ GCCH_2s (ch));
+ if (NULL != ch->last_control_qe)
+ GCT_send_cancel (ch->last_control_qe);
+ ch->last_control_qe = GCT_send (ch->t,
+ &msg.header,
+ &send_ack_cb,
+ ch);
}
/**
- * We haven't received an Channel ACK after a certain time: resend the CREATE.
+ * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
+ * connection is up.
*
- * @param cls Closure (CadetChannelReliability of the channel to recreate)
- * @param tc TaskContext.
+ * @param cls the `struct CadetChannel`
*/
static void
-channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+send_open_ack (void *cls)
{
- struct CadetChannelReliability *rel = cls;
-
- rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RE-CREATE\n");
- GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
-
- if (rel == rel->ch->root_rel)
- {
- send_create (rel->ch);
- }
- else if (rel == rel->ch->dest_rel)
- {
- send_ack (rel->ch, GNUNET_YES, GNUNET_NO);
- }
- else
- {
- GNUNET_break (0);
- }
+ struct CadetChannel *ch = cls;
+ struct GNUNET_CADET_ChannelOpenAckMessage msg;
+ ch->retry_control_task = NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending CHANNEL_OPEN_ACK on %s\n",
+ GCCH_2s (ch));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
+ msg.header.size = htons (sizeof (msg));
+ msg.reserved = htonl (0);
+ msg.ctn = ch->ctn;
+ msg.port = ch->port;
+ if (NULL != ch->last_control_qe)
+ GCT_send_cancel (ch->last_control_qe);
+ ch->last_control_qe = GCT_send (ch->t,
+ &msg.header,
+ &send_ack_cb,
+ ch);
}
/**
- * Message has been sent: start retransmission timer.
+ * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
+ * this channel. If the binding was successful, (re)transmit the
+ * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
*
- * @param cls Closure (queue structure).
- * @param t Tunnel.
- * @param q Queue handler (no longer valid).
- * @param type Type of message.
- * @param size Size of the message.
+ * @param ch channel that got the duplicate open
+ * @param cti identifier of the connection that delivered the message
*/
-static void
-ch_message_sent (void *cls,
- struct CadetTunnel *t,
- struct CadetTunnelQueue *q,
- uint16_t type, size_t size)
+void
+GCCH_handle_duplicate_open (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
- struct CadetChannelQueue *chq = cls;
- struct CadetReliableMessage *copy = chq->copy;
- struct CadetChannelReliability *rel;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "channel message sent callback %s\n",
- GC_m2s (chq->type));
-
- switch (chq->type)
+ if (NULL == ch->dest)
{
- case GNUNET_MESSAGE_TYPE_CADET_DATA:
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT DATA MID %u\n", copy->mid);
- GNUNET_assert (chq == copy->chq);
- copy->timestamp = GNUNET_TIME_absolute_get ();
- rel = copy->rel;
- if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry in 4 * %s\n",
- GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
- GNUNET_YES));
- if (0 != rel->expected_delay.rel_value_us)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay != 0\n");
- rel->retry_timer =
- GNUNET_TIME_relative_multiply (rel->expected_delay,
- CADET_RETRANSMIT_MARGIN);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay reset\n");
- rel->retry_timer = CADET_RETRANSMIT_TIME;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!! using delay %s\n",
- GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
- GNUNET_NO));
- rel->retry_task =
- GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
- &channel_retransmit_message, rel);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!! retry task %u\n", rel->retry_task);
- }
- copy->chq = NULL;
- break;
-
-
- case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GC_m2s (chq->type));
- rel = chq->rel;
- GNUNET_assert (rel->uniq == chq);
- rel->uniq = NULL;
-
- if (CADET_CHANNEL_READY != rel->ch->state
- && GNUNET_MESSAGE_TYPE_CADET_DATA_ACK != type
- && GNUNET_NO == rel->ch->destroy)
- {
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rel->retry_task);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! STD BACKOFF %s\n",
- GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
- GNUNET_NO));
- rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
- rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
- &channel_recreate, rel);
- }
- break;
-
- default:
- GNUNET_break (0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
+ GCCH_2s (ch));
+ return;
}
-
- GNUNET_free (chq);
+ if (NULL != ch->retry_control_task)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
+ GCCH_2s (ch));
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Retransmitting CHANNEL_OPEN_ACK on %s\n",
+ GCCH_2s (ch));
+ ch->retry_control_task
+ = GNUNET_SCHEDULER_add_now (&send_open_ack,
+ ch);
}
/**
- * send a channel create message.
+ * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
*
- * @param ch Channel for which to send.
+ * @param ch channel the ack is for
+ * @param to_owner #GNUNET_YES to send to owner,
+ * #GNUNET_NO to send to dest
*/
static void
-send_create (struct CadetChannel *ch)
+send_ack_to_client (struct CadetChannel *ch,
+ int to_owner)
{
- struct GNUNET_CADET_ChannelCreate msgcc;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalAck *ack;
+ struct CadetChannelClient *ccc;
- msgcc.header.size = htons (sizeof (msgcc));
- msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE);
- msgcc.chid = htonl (ch->gid);
- msgcc.port = htonl (ch->port);
- msgcc.opt = htonl (channel_get_options (ch));
-
- GCCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL);
+ ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
+ if (NULL == ccc)
+ {
+ /* This can happen if we are just getting ACKs after
+ our local client already disconnected. */
+ GNUNET_assert (GNUNET_YES == ch->destroy);
+ return;
+ }
+ env = GNUNET_MQ_msg (ack,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
+ ack->ccn = ccc->ccn;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
+ GSC_2s (ccc->c),
+ (GNUNET_YES == to_owner) ? "owner" : "dest",
+ ntohl (ack->ccn.channel_of_client),
+ ch->pending_messages,
+ ch->max_pending_messages);
+ GSC_send_to_client (ccc->c,
+ env);
}
/**
- * Confirm we got a channel create or FWD ack.
+ * A client is bound to the port that we have a channel
+ * open to. Send the acknowledgement for the connection
+ * request and establish the link with the client.
*
- * @param ch The channel to confirm.
- * @param fwd Should we send a FWD ACK? (going dest->root)
- * @param reaction This ACK is a reaction to a duplicate CREATE, don't save.
+ * @param ch open incoming channel
+ * @param c client listening on the respective @a port
+ * @param port the port @a is listening on
*/
-static void
-send_ack (struct CadetChannel *ch, int fwd, int reaction)
+void
+GCCH_bind (struct CadetChannel *ch,
+ struct CadetClient *c,
+ const struct GNUNET_HashCode *port)
{
- struct GNUNET_CADET_ChannelManage msg;
-
- msg.header.size = htons (sizeof (msg));
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " sending channel %s ack for channel %s\n",
- GC_f2s (fwd), GCCH_2s (ch));
+ uint32_t options;
+ struct CadetChannelClient *cccd;
- msg.chid = htonl (ch->gid);
- GCCH_send_prebuilt_message (&msg.header, ch, !fwd, reaction ? &msg : NULL);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Binding %s from %s to port %s of %s\n",
+ GCCH_2s (ch),
+ GCT_2s (ch->t),
+ GNUNET_h2s (&ch->port),
+ GSC_2s (c));
+ if (NULL != ch->retry_control_task)
+ {
+ /* there might be a timeout task here */
+ GNUNET_SCHEDULER_cancel (ch->retry_control_task);
+ ch->retry_control_task = NULL;
+ }
+ options = 0;
+ if (ch->nobuffer)
+ options |= GNUNET_CADET_OPTION_NOBUFFER;
+ if (ch->reliable)
+ options |= GNUNET_CADET_OPTION_RELIABLE;
+ if (ch->out_of_order)
+ options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
+ cccd = GNUNET_new (struct CadetChannelClient);
+ GNUNET_assert (NULL == ch->dest);
+ ch->dest = cccd;
+ ch->port = *port;
+ cccd->c = c;
+ cccd->client_ready = GNUNET_YES;
+ cccd->ccn = GSC_bind (c,
+ ch,
+ (GNUNET_YES == ch->is_loopback)
+ ? GCP_get (&my_full_id,
+ GNUNET_YES)
+ : GCT_get_destination (ch->t),
+ port,
+ options);
+ GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
+ GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
+ ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
+ if (GNUNET_YES == ch->is_loopback)
+ {
+ ch->state = CADET_CHANNEL_OPEN_SENT;
+ GCCH_handle_channel_open_ack (ch,
+ NULL,
+ port);
+ }
+ else
+ {
+ /* notify other peer that we accepted the connection */
+ ch->state = CADET_CHANNEL_READY;
+ ch->retry_control_task
+ = GNUNET_SCHEDULER_add_now (&send_open_ack,
+ ch);
+ }
+ /* give client it's initial supply of ACKs */
+ GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
+ GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
+ for (unsigned int i=0;i<ch->max_pending_messages;i++)
+ send_ack_to_client (ch,
+ GNUNET_NO);
}
/**
- * Send a message and don't keep any info about it: we won't need to cancel it
- * or resend it.
+ * One of our clients has disconnected, tell the other one that we
+ * are finished. Done asynchronously to avoid concurrent modification
+ * issues if this is the same client.
*
- * @param msg Header of the message to fire away.
- * @param ch Channel on which the message should go.
- * @param force Is this a forced (undroppable) message?
+ * @param cls the `struct CadetChannel` where one of the ends is now dead
*/
static void
-fire_and_forget (const struct GNUNET_MessageHeader *msg,
- struct CadetChannel *ch,
- int force)
+signal_remote_destroy_cb (void *cls)
{
- GNUNET_break (NULL == GCT_send_prebuilt_message (msg, ch->t, NULL,
- force, NULL, NULL));
+ struct CadetChannel *ch = cls;
+ struct CadetChannelClient *ccc;
+
+ /* Find which end is left... */
+ ch->retry_control_task = NULL;
+ ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
+ GSC_handle_remote_channel_destroy (ccc->c,
+ ccc->ccn,
+ ch);
+ channel_destroy (ch);
}
/**
- * Notify that a channel create didn't succeed.
+ * Destroy locally created channel. Called by the local client, so no
+ * need to tell the client.
*
- * @param ch The channel to reject.
+ * @param ch channel to destroy
+ * @param c client that caused the destruction
+ * @param ccn client number of the client @a c
*/
-static void
-send_nack (struct CadetChannel *ch)
+void
+GCCH_channel_local_destroy (struct CadetChannel *ch,
+ struct CadetClient *c,
+ struct GNUNET_CADET_ClientChannelNumber ccn)
{
- struct GNUNET_CADET_ChannelManage msg;
-
- msg.header.size = htons (sizeof (msg));
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " sending channel NACK for channel %s\n",
+ "%s asks for destruction of %s\n",
+ GSC_2s (c),
GCCH_2s (ch));
+ GNUNET_assert (NULL != c);
+ if ( (NULL != ch->owner) &&
+ (c == ch->owner->c) &&
+ (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
+ {
+ free_channel_client (ch->owner);
+ ch->owner = NULL;
+ }
+ else if ( (NULL != ch->dest) &&
+ (c == ch->dest->c) &&
+ (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
+ {
+ free_channel_client (ch->dest);
+ ch->dest = NULL;
+ }
+ else
+ {
+ GNUNET_assert (0);
+ }
- msg.chid = htonl (ch->gid);
- GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
-}
-
-
-/**
- * Destroy all reliable messages queued for a channel,
- * during a channel destruction.
- * Frees the reliability structure itself.
- *
- * @param rel Reliability data for a channel.
- */
-static void
-channel_rel_free_all (struct CadetChannelReliability *rel)
-{
- struct CadetReliableMessage *copy;
- struct CadetReliableMessage *next;
-
- if (NULL == rel)
- return;
-
- for (copy = rel->head_recv; NULL != copy; copy = next)
+ if (GNUNET_YES == ch->destroy)
{
- next = copy->next;
- GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH RECV %p\n", copy);
- GNUNET_break (NULL == copy->chq);
- GNUNET_free (copy);
+ /* other end already destroyed, with the local client gone, no need
+ to finish transmissions, just destroy immediately. */
+ channel_destroy (ch);
+ return;
}
- for (copy = rel->head_sent; NULL != copy; copy = next)
+ if ( (NULL != ch->head_sent) &&
+ ( (NULL != ch->owner) ||
+ (NULL != ch->dest) ) )
{
- next = copy->next;
- GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH %p\n", copy);
- if (NULL != copy->chq)
- {
- if (NULL != copy->chq->tq)
- {
- GCT_cancel (copy->chq->tq);
- /* ch_message_sent will free copy->q */
- }
- else
- {
- GNUNET_free (copy->chq);
- GNUNET_break (0);
- }
- }
- GNUNET_free (copy);
+ /* Wait for other end to destroy us as well,
+ and otherwise allow send queue to be transmitted first */
+ ch->destroy = GNUNET_YES;
+ return;
}
- if (NULL != rel->uniq && NULL != rel->uniq->tq)
+ if ( (GNUNET_YES == ch->is_loopback) &&
+ ( (NULL != ch->owner) ||
+ (NULL != ch->dest) ) )
{
- GCT_cancel (rel->uniq->tq);
- /* ch_message_sent is called freeing uniq */
+ if (NULL != ch->retry_control_task)
+ GNUNET_SCHEDULER_cancel (ch->retry_control_task);
+ ch->retry_control_task
+ = GNUNET_SCHEDULER_add_now (&signal_remote_destroy_cb,
+ ch);
+ return;
}
- if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
+ if (GNUNET_NO == ch->is_loopback)
{
- GNUNET_SCHEDULER_cancel (rel->retry_task);
- rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
+ switch (ch->state)
+ {
+ case CADET_CHANNEL_NEW:
+ /* We gave up on a channel that we created as a client to a remote
+ target, but that never went anywhere. Nothing to do here. */
+ break;
+ case CADET_CHANNEL_LOOSE:
+ break;
+ default:
+ GCT_send_channel_destroy (ch->t,
+ ch->ctn);
+ }
}
- GNUNET_free (rel);
+ /* Nothing left to do, just finish destruction */
+ channel_destroy (ch);
}
/**
- * Mark future messages as ACK'd.
+ * We got an acknowledgement for the creation of the channel
+ * (the port is open on the other side). Verify that the
+ * other end really has the right port, and begin transmissions.
*
- * @param rel Reliability data.
- * @param msg DataACK message with a bitfield of future ACK'd messages.
+ * @param ch channel to destroy
+ * @param cti identifier of the connection that delivered the message
+ * @param port port number (needed to verify receiver knows the port)
*/
-static void
-channel_rel_free_sent (struct CadetChannelReliability *rel,
- const struct GNUNET_CADET_DataACK *msg)
+void
+GCCH_handle_channel_open_ack (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
+ const struct GNUNET_HashCode *port)
{
- struct CadetReliableMessage *copy;
- struct CadetReliableMessage *next;
- uint64_t bitfield;
- uint64_t mask;
- uint32_t mid;
- uint32_t target;
- unsigned int i;
-
- bitfield = msg->futures;
- mid = ntohl (msg->mid);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable %u %llX\n", mid, bitfield);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " rel %p, head %p\n", rel, rel->head_sent);
- for (i = 0, copy = rel->head_sent;
- i < 64 && NULL != copy && 0 != bitfield;
- i++)
+ switch (ch->state)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " trying bit %u (mid %u)\n", i, mid + i + 1);
- mask = 0x1LL << i;
- if (0 == (bitfield & mask))
- continue;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " set!\n");
- /* Bit was set, clear the bit from the bitfield */
- bitfield &= ~mask;
-
- /* The i-th bit was set. Do we have that copy? */
- /* Skip copies with mid < target */
- target = mid + i + 1;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target);
- while (NULL != copy && GC_is_pid_bigger (target, copy->mid))
- copy = copy->next;
-
- /* Did we run out of copies? (previously freed, it's ok) */
- if (NULL == copy)
+ case CADET_CHANNEL_NEW:
+ /* this should be impossible */
+ GNUNET_break (0);
+ break;
+ case CADET_CHANNEL_LOOSE:
+ /* This makes no sense. */
+ GNUNET_break_op (0);
+ break;
+ case CADET_CHANNEL_OPEN_SENT:
+ if (NULL == ch->owner)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "run out of copies...\n");
- return;
+ /* We're not the owner, wrong direction! */
+ GNUNET_break_op (0);
+ return;
}
-
- /* Did we overshoot the target? (previously freed, it's ok) */
- if (GC_is_pid_bigger (copy->mid, target))
+ if (0 != GNUNET_memcmp (&ch->port,
+ port))
+ {
+ /* Other peer failed to provide the right port,
+ refuse connection. */
+ GNUNET_break_op (0);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
+ GCCH_2s (ch));
+ if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " next copy %u\n", copy->mid);
- continue;
+ GNUNET_SCHEDULER_cancel (ch->retry_control_task);
+ ch->retry_control_task = NULL;
}
-
- /* Now copy->mid == target, free it */
- next = copy->next;
- GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES));
- copy = next;
+ ch->state = CADET_CHANNEL_READY;
+ /* On first connect, send client as many ACKs as we allow messages
+ to be buffered! */
+ for (unsigned int i=0;i<ch->max_pending_messages;i++)
+ send_ack_to_client (ch,
+ GNUNET_YES);
+ break;
+ case CADET_CHANNEL_READY:
+ /* duplicate ACK, maybe we retried the CREATE. Ignore. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received duplicate channel OPEN_ACK for %s\n",
+ GCCH_2s (ch));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate CREATE_ACKs",
+ 1,
+ GNUNET_NO);
+ break;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n");
}
/**
- * Destroy a reliable message after it has been acknowledged, either by
- * direct mid ACK or bitfield. Updates the appropriate data structures and
- * timers and frees all memory.
- *
- * @param copy Message that is no longer needed: remote peer got it.
- * @param update_time Is the timing information relevant?
- * If this message is ACK in a batch the timing information
- * is skewed by the retransmission, count only for the
- * retransmitted message.
+ * Test if element @a e1 comes before element @a e2.
*
- * @return #GNUNET_YES if channel was destroyed as a result of the call,
- * #GNUNET_NO otherwise.
+ * @param cls closure, to a flag where we indicate duplicate packets
+ * @param m1 a message of to sort
+ * @param m2 another message to sort
+ * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
*/
static int
-rel_message_free (struct CadetReliableMessage *copy, int update_time)
+is_before (void *cls,
+ struct CadetOutOfOrderMessage *m1,
+ struct CadetOutOfOrderMessage *m2)
{
- struct CadetChannelReliability *rel;
- struct GNUNET_TIME_Relative time;
+ int *duplicate = cls;
+ uint32_t v1 = ntohl (m1->mid.mid);
+ uint32_t v2 = ntohl (m2->mid.mid);
+ uint32_t delta;
- rel = copy->rel;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Freeing %u\n", copy->mid);
- if (update_time)
+ delta = v2 - v1;
+ if (0 == delta)
+ *duplicate = GNUNET_YES;
+ if (delta > (uint32_t) INT_MAX)
{
- time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
- if (0 == rel->expected_delay.rel_value_us)
- rel->expected_delay = time;
- else
- {
- rel->expected_delay.rel_value_us *= 7;
- rel->expected_delay.rel_value_us += time.rel_value_us;
- rel->expected_delay.rel_value_us /= 8;
- }
- LOG (GNUNET_ERROR_TYPE_INFO, "!!! took %s, new delay %s\n",
- GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO),
- GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
- GNUNET_NO));
- rel->retry_timer = rel->expected_delay;
+ /* in overflow range, we can safely assume we wrapped around */
+ return GNUNET_NO;
}
else
{
- LOG (GNUNET_ERROR_TYPE_INFO, "!!! batch free, ignoring timing\n");
- }
- rel->ch->pending_messages--;
- if (NULL != copy->chq)
- {
- GCT_cancel (copy->chq->tq);
- /* copy->q is set to NULL by ch_message_sent */
- }
- GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE %p\n", copy);
- GNUNET_free (copy);
-
- if (GNUNET_NO != rel->ch->destroy && 0 == rel->ch->pending_messages)
- {
- GCCH_destroy (rel->ch);
+ /* result is small, thus v2 > v1, thus m1 < m2 */
return GNUNET_YES;
}
- return GNUNET_NO;
}
/**
- * Channel was ACK'd by remote peer, mark as ready and cancel retransmission.
+ * We got payload data for a channel. Pass it on to the client
+ * and send an ACK to the other end (once flow control allows it!)
*
- * @param ch Channel to mark as ready.
- * @param fwd Was the ACK message a FWD ACK? (dest->root, SYNACK)
+ * @param ch channel that got data
+ * @param cti identifier of the connection that delivered the message
+ * @param msg message that was received
*/
-static void
-channel_confirm (struct CadetChannel *ch, int fwd)
+void
+GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
+ const struct GNUNET_CADET_ChannelAppDataMessage *msg)
{
- struct CadetChannelReliability *rel;
- enum CadetChannelState oldstate;
-
- rel = fwd ? ch->root_rel : ch->dest_rel;
- if (NULL == rel)
- {
- GNUNET_break (GNUNET_NO != ch->destroy);
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalData *ld;
+ struct CadetChannelClient *ccc;
+ size_t payload_size;
+ struct CadetOutOfOrderMessage *com;
+ int duplicate;
+ uint32_t mid_min;
+ uint32_t mid_max;
+ uint32_t mid_msg;
+ uint32_t delta;
+
+ GNUNET_assert (GNUNET_NO == ch->is_loopback);
+ if ( (NULL == ch->owner) &&
+ (NULL == ch->dest) )
+ {
+ /* This client is gone, but we still have messages to send to
+ the other end (which is why @a ch is not yet dead). However,
+ we cannot pass messages to our client anymore. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping incoming payload on %s as this end is already closed\n",
+ GCCH_2s (ch));
+ /* send back DESTROY notification to stop further retransmissions! */
+ if (GNUNET_YES == ch->destroy)
+ GCT_send_channel_destroy (ch->t,
+ ch->ctn);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG, " channel confirm %s %s\n",
- GC_f2s (fwd), GCCH_2s (ch));
- oldstate = ch->state;
- ch->state = CADET_CHANNEL_READY;
-
- if (CADET_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch))
- {
- rel->client_ready = GNUNET_YES;
- rel->expected_delay = rel->retry_timer;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " !! retry timer confirm %s\n",
- GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO));
- if (GCT_get_connections_buffer (ch->t) > 0 || GCT_is_loopback (ch->t))
- send_client_ack (ch, fwd);
-
- if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
- {
- GNUNET_SCHEDULER_cancel (rel->retry_task);
- rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
- }
- else if (NULL != rel->uniq)
- {
- GCT_cancel (rel->uniq->tq);
- /* ch_message_sent will free and NULL uniq */
- }
- else
+ payload_size = ntohs (msg->header.size) - sizeof (*msg);
+ env = GNUNET_MQ_msg_extra (ld,
+ payload_size,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
+ ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
+ GNUNET_memcpy (&ld[1],
+ &msg[1],
+ payload_size);
+ ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
+ if (GNUNET_YES == ccc->client_ready)
+ {
+ /*
+ * We ad-hoc send the message if
+ * - The channel is out-of-order
+ * - The channel is reliable and MID matches next expected MID
+ * - The channel is unreliable and MID is before lowest seen MID
+ */
+ if ( (GNUNET_YES == ch->out_of_order) ||
+ ((msg->mid.mid == ch->mid_recv.mid) &&
+ (GNUNET_YES == ch->reliable)) ||
+ ((GNUNET_NO == ch->reliable) &&
+ (ntohl (msg->mid.mid) >= ntohl (ch->mid_recv.mid)) &&
+ ((NULL == ccc->head_recv) ||
+ (ntohl (msg->mid.mid) < ntohl (ccc->head_recv->mid.mid)))) )
{
- if (GNUNET_NO == is_loopback (ch))
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Giving %u bytes of payload with MID %u from %s to client %s\n",
+ (unsigned int) payload_size,
+ ntohl (msg->mid.mid),
+ GCCH_2s (ch),
+ GSC_2s (ccc->c));
+ ccc->client_ready = GNUNET_NO;
+ GSC_send_to_client (ccc->c,
+ env);
+ if (GNUNET_NO == ch->out_of_order)
+ ch->mid_recv.mid = htonl (1 + ntohl (msg->mid.mid));
+ else
+ ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
+ ch->mid_futures >>= 1;
+ if ( (GNUNET_YES == ch->out_of_order) &&
+ (GNUNET_NO == ch->reliable) )
{
- /* We SHOULD have been trying to retransmit this! */
- GNUNET_break (0);
+ /* possibly shift by more if we skipped messages */
+ uint64_t delta = htonl (msg->mid.mid) - 1 - ntohl (ch->mid_recv.mid);
+
+ if (delta > 63)
+ ch->mid_futures = 0;
+ else
+ ch->mid_futures >>= delta;
+ ch->mid_recv.mid = htonl (1 + ntohl (msg->mid.mid));
}
+ send_channel_data_ack (ch);
+ return;
}
}
- /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */
- if (GNUNET_YES == fwd)
- send_ack (ch, GNUNET_NO, GNUNET_NO);
-}
-
-
-/**
- * Save a copy to retransmit in case it gets lost.
- *
- * Initializes all needed callbacks and timers.
- *
- * @param ch Channel this message goes on.
- * @param msg Message to copy.
- * @param fwd Is this fwd traffic?
- */
-static struct CadetReliableMessage *
-channel_save_copy (struct CadetChannel *ch,
- const struct GNUNET_MessageHeader *msg,
- int fwd)
-{
- struct CadetChannelReliability *rel;
- struct CadetReliableMessage *copy;
- uint32_t mid;
- uint16_t type;
- uint16_t size;
-
- rel = fwd ? ch->root_rel : ch->dest_rel;
- mid = rel->mid_send - 1;
- type = ntohs (msg->type);
- size = ntohs (msg->size);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u %s\n", mid, GC_m2s (type));
- copy = GNUNET_malloc (sizeof (struct CadetReliableMessage) + size);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", copy);
- copy->mid = mid;
- copy->rel = rel;
- copy->type = type;
- memcpy (©[1], msg, size);
- GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
- ch->pending_messages++;
-
- return copy;
-}
-
-
-/**
- * Create a new channel.
- *
- * @param t Tunnel this channel is in.
- * @param owner Client that owns the channel, NULL for foreign channels.
- * @param lid_root Local ID for root client.
- *
- * @return A new initialized channel. NULL on error.
- */
-static struct CadetChannel *
-channel_new (struct CadetTunnel *t,
- struct CadetClient *owner,
- CADET_ChannelNumber lid_root)
-{
- struct CadetChannel *ch;
-
- ch = GNUNET_new (struct CadetChannel);
- ch->root = owner;
- ch->lid_root = lid_root;
- ch->t = t;
-
- GNUNET_STATISTICS_update (stats, "# channels", 1, GNUNET_NO);
-
- if (NULL != owner)
+ if (GNUNET_YES == ch->reliable)
{
- ch->gid = GCT_get_next_chid (t);
- GML_channel_add (owner, lid_root, ch);
+ /* check if message ought to be dropped because it is ancient/too distant/duplicate */
+ mid_min = ntohl (ch->mid_recv.mid);
+ mid_max = mid_min + ch->max_pending_messages;
+ mid_msg = ntohl (msg->mid.mid);
+ if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
+ ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s at %u drops ancient or far-future message %u\n",
+ GCCH_2s (ch),
+ (unsigned int) mid_min,
+ ntohl (msg->mid.mid));
+
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA (ancient or future)",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_discard (env);
+ send_channel_data_ack (ch);
+ return;
+ }
+ /* mark bit for future ACKs */
+ delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
+ if (delta < 64)
+ {
+ if (0 != (ch->mid_futures & (1LLU << delta)))
+ {
+ /* Duplicate within the queue, drop also */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_discard (env);
+ send_channel_data_ack (ch);
+ return;
+ }
+ ch->mid_futures |= (1LLU << delta);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Marked bit %llX for mid %u (base: %u); now: %llX\n",
+ (1LLU << delta),
+ mid_msg,
+ mid_min,
+ ch->mid_futures);
+ }
}
- GCT_add_channel (t, ch);
-
- return ch;
-}
-
-
-/**
- * Handle a loopback message: call the appropriate handler for the message type.
- *
- * @param ch Channel this message is on.
- * @param msgh Message header.
- * @param fwd Is this FWD traffic?
- */
-void
-handle_loopback (struct CadetChannel *ch,
- const struct GNUNET_MessageHeader *msgh,
- int fwd)
-{
- uint16_t type;
-
- type = ntohs (msgh->type);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Loopback %s %s message!\n",
- GC_f2s (fwd), GC_m2s (type));
-
- switch (type)
+ else /* ! ch->reliable */
{
- case GNUNET_MESSAGE_TYPE_CADET_DATA:
- /* Don't send hop ACK, wait for client to ACK */
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SEND loopback %u (%u)\n",
- ntohl (((struct GNUNET_CADET_Data *) msgh)->mid), ntohs (msgh->size));
- GCCH_handle_data (ch, (struct GNUNET_CADET_Data *) msgh, fwd);
- break;
-
- case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
- GCCH_handle_data_ack (ch, (struct GNUNET_CADET_DataACK *) msgh, fwd);
- break;
-
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
- GCCH_handle_create (ch->t,
- (struct GNUNET_CADET_ChannelCreate *) msgh);
- break;
+ struct CadetOutOfOrderMessage *next_msg;
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
- GCCH_handle_ack (ch,
- (struct GNUNET_CADET_ChannelManage *) msgh,
- fwd);
- break;
+ /**
+ * We always send if possible in this case.
+ * It is guaranteed that the queued MID < received MID
+ **/
+ if ((NULL != ccc->head_recv) &&
+ (GNUNET_YES == ccc->client_ready))
+ {
+ next_msg = ccc->head_recv;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Giving queued MID %u from %s to client %s\n",
+ ntohl (next_msg->mid.mid),
+ GCCH_2s (ch),
+ GSC_2s (ccc->c));
+ ccc->client_ready = GNUNET_NO;
+ GSC_send_to_client (ccc->c,
+ next_msg->env);
+ ch->mid_recv.mid = htonl (1 + ntohl (next_msg->mid.mid));
+ ch->mid_futures >>= 1;
+ send_channel_data_ack (ch);
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ next_msg);
+ ccc->num_recv--;
+ /* Do not process duplicate MID */
+ if (msg->mid.mid == next_msg->mid.mid) /* Duplicate */
+ {
+ /* Duplicate within the queue, drop */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message on %s (mid %u) dropped, duplicate\n",
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_free (next_msg);
+ GNUNET_MQ_discard (env);
+ return;
+ }
+ GNUNET_free (next_msg);
+ }
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
- GCCH_handle_nack (ch);
- break;
+ if (ntohl (msg->mid.mid) < ntohl (ch->mid_recv.mid)) /* Old */
+ {
+ /* Duplicate within the queue, drop */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message on %s (mid %u) dropped, old.\n",
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_MQ_discard (env);
+ return;
+ }
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
- GCCH_handle_destroy (ch,
- (struct GNUNET_CADET_ChannelManage *) msgh,
- fwd);
- break;
+ /* Channel is unreliable, so we do not ACK. But we also cannot
+ allow buffering everything, so check if we have space... */
+ if (ccc->num_recv >= ch->max_pending_messages)
+ {
+ struct CadetOutOfOrderMessage *drop;
- default:
- GNUNET_break_op (0);
+ /* Yep, need to drop. Drop the oldest message in
+ the buffer. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "end-to-end message not known (%u)\n",
- ntohs (msgh->type));
+ "Queue full due slow client on %s, dropping oldest message\n",
+ GCCH_2s (ch));
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped due to slow client",
+ 1,
+ GNUNET_NO);
+ drop = ccc->head_recv;
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ drop);
+ ccc->num_recv--;
+ GNUNET_MQ_discard (drop->env);
+ GNUNET_free (drop);
+ }
}
-}
-
-
-
-/******************************************************************************/
-/******************************** API ***********************************/
-/******************************************************************************/
-
-/**
- * Destroy a channel and free all resources.
- *
- * @param ch Channel to destroy.
- */
-void
-GCCH_destroy (struct CadetChannel *ch)
-{
- struct CadetClient *c;
- struct CadetTunnel *t;
- if (NULL == ch)
+ /* Insert message into sorted out-of-order queue */
+ com = GNUNET_new (struct CadetOutOfOrderMessage);
+ com->mid = msg->mid;
+ com->env = env;
+ duplicate = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
+ is_before,
+ &duplicate,
+ ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv++;
+ if (GNUNET_YES == duplicate)
+ {
+ /* Duplicate within the queue, drop also (this is not covered by
+ the case above if "delta" >= 64, which could be the case if
+ max_pending_messages is also >= 64 or if our client is unready
+ and we are seeing retransmissions of the message our client is
+ blocked on. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA",
+ 1,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ GNUNET_MQ_discard (com->env);
+ GNUNET_free (com);
+ send_channel_data_ack (ch);
return;
- if (2 == ch->destroy)
- return; /* recursive call */
- ch->destroy = 2;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n",
- GCT_2s (ch->t), ch->gid);
- GCCH_debug (ch);
-
- c = ch->root;
- if (NULL != c)
- {
- GML_channel_remove (c, ch->lid_root, ch);
}
-
- c = ch->dest;
- if (NULL != c)
- {
- GML_channel_remove (c, ch->lid_dest, ch);
- }
-
- channel_rel_free_all (ch->root_rel);
- channel_rel_free_all (ch->dest_rel);
-
- t = ch->t;
- GCT_remove_channel (t, ch);
- GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO);
-
- GNUNET_free (ch);
- GCT_destroy_if_empty (t);
-}
-
-
-/**
- * Get the channel's public ID.
- *
- * @param ch Channel.
- *
- * @return ID used to identify the channel with the remote peer.
- */
-CADET_ChannelNumber
-GCCH_get_id (const struct CadetChannel *ch)
-{
- return ch->gid;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
+ (GNUNET_YES == ccc->client_ready)
+ ? "out-of-order"
+ : "client-not-ready",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (ccc->ccn.channel_of_client),
+ ccc,
+ ntohl (msg->mid.mid),
+ ntohl (ch->mid_recv.mid));
+ /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
+ the sender may already be transmitting the previous one. Needs
+ experimental evaluation to see if/when this ACK helps or
+ hurts. (We might even want another option.) */
+ send_channel_data_ack (ch);
}
/**
- * Get the channel tunnel.
- *
- * @param ch Channel to get the tunnel from.
- *
- * @return tunnel of the channel.
+ * Function called once the tunnel has sent one of our messages.
+ * If the message is unreliable, simply frees the `crm`. If the
+ * message was reliable, calculate retransmission time and
+ * wait for ACK (or retransmit).
+ *
+ * @param cls the `struct CadetReliableMessage` that was sent
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
-struct CadetTunnel *
-GCCH_get_tunnel (const struct CadetChannel *ch)
-{
- return ch->t;
-}
+static void
+data_sent_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
/**
- * Get free buffer space towards the client on a specific channel.
- *
- * @param ch Channel.
- * @param fwd Is query about FWD traffic?
+ * We need to retry a transmission, the last one took too long to
+ * be acknowledged.
*
- * @return Free buffer space [0 - 64]
+ * @param cls the `struct CadetChannel` where we need to retransmit
*/
-unsigned int
-GCCH_get_buffer (struct CadetChannel *ch, int fwd)
+static void
+retry_transmission (void *cls)
{
- struct CadetChannelReliability *rel;
+ struct CadetChannel *ch = cls;
+ struct CadetReliableMessage *crm = ch->head_sent;
- rel = fwd ? ch->dest_rel : ch->root_rel;
-
- /* If rel is NULL it means that the end is not yet created,
- * most probably is a loopback channel at the point of sending
- * the ChannelCreate to itself.
- */
- if (NULL == rel)
- return 64;
-
- return (64 - rel->n_recv);
+ ch->retry_data_task = NULL;
+ GNUNET_assert (NULL == crm->qe);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrying transmission on %s of message %u\n",
+ GCCH_2s (ch),
+ (unsigned int) ntohl (crm->data_message->mid.mid));
+ crm->qe = GCT_send (ch->t,
+ &crm->data_message->header,
+ &data_sent_cb,
+ crm);
+ GNUNET_assert (NULL == ch->retry_data_task);
}
/**
- * Get flow control status of end point: is client allow to send?
+ * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
+ * the queue and tell our client that it can send more.
*
- * @param ch Channel.
- * @param fwd Is query about FWD traffic? (Request root status).
- *
- * @return #GNUNET_YES if client is allowed to send us data.
+ * @param ch the channel that got the PLAINTEXT_DATA_ACK
+ * @param cti identifier of the connection that delivered the message
+ * @param crm the message that got acknowledged
*/
-int
-GCCH_get_allowed (struct CadetChannel *ch, int fwd)
+static void
+handle_matching_ack (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
+ struct CadetReliableMessage *crm)
{
- struct CadetChannelReliability *rel;
-
- rel = fwd ? ch->root_rel : ch->dest_rel;
-
- if (NULL == rel)
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ ch->pending_messages--;
+ GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
+ GCCH_2s (ch),
+ (unsigned int) ntohl (crm->data_message->mid.mid),
+ ch->pending_messages);
+ if (NULL != crm->qe)
{
- /* Probably shutting down: root/dest NULL'ed to mark disconnection */
- GNUNET_break (GNUNET_NO != ch->destroy);
- return 0;
+ GCT_send_cancel (crm->qe);
+ crm->qe = NULL;
}
-
- return rel->client_allowed;
-}
-
-
-/**
- * Is the root client for this channel on this peer?
- *
- * @param ch Channel.
- * @param fwd Is this for fwd traffic?
- *
- * @return #GNUNET_YES in case it is.
- */
-int
-GCCH_is_origin (struct CadetChannel *ch, int fwd)
-{
- struct CadetClient *c;
-
- c = fwd ? ch->root : ch->dest;
- return NULL != c;
-}
-
-
-/**
- * Is the destination client for this channel on this peer?
- *
- * @param ch Channel.
- * @param fwd Is this for fwd traffic?
- *
- * @return #GNUNET_YES in case it is.
- */
-int
-GCCH_is_terminal (struct CadetChannel *ch, int fwd)
-{
- struct CadetClient *c;
-
- c = fwd ? ch->dest : ch->root;
- return NULL != c;
-}
-
-
-/**
- * Send an end-to-end ACK message for the most recent in-sequence payload.
- *
- * If channel is not reliable, do nothing.
- *
- * @param ch Channel this is about.
- * @param fwd Is for FWD traffic? (ACK dest->owner)
- */
-void
-GCCH_send_data_ack (struct CadetChannel *ch, int fwd)
-{
- struct GNUNET_CADET_DataACK msg;
- struct CadetChannelReliability *rel;
- struct CadetReliableMessage *copy;
- unsigned int delta;
- uint64_t mask;
- uint32_t ack;
-
- if (GNUNET_NO == ch->reliable)
- return;
-
- rel = fwd ? ch->dest_rel : ch->root_rel;
- ack = rel->mid_recv - 1;
- LOG (GNUNET_ERROR_TYPE_INFO, "===> DATA_ACK for %u\n", ack);
-
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA_ACK);
- msg.header.size = htons (sizeof (msg));
- msg.chid = htonl (ch->gid);
- msg.mid = htonl (ack);
-
- msg.futures = 0;
- for (copy = rel->head_recv; NULL != copy; copy = copy->next)
+ if ( (1 == crm->num_transmissions) &&
+ (NULL != cti) )
{
- if (copy->type != GNUNET_MESSAGE_TYPE_CADET_DATA)
+ GCC_ack_observed (cti);
+ if (0 == GNUNET_memcmp (cti,
+ &crm->connection_taken))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " Type %s, expected DATA\n",
- GC_m2s (copy->type));
- continue;
+ GCC_latency_observed (cti,
+ GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
}
- delta = copy->mid - (ack + 1);
- if (63 < delta)
- break;
- mask = 0x1LL << delta;
- msg.futures |= mask;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- " setting bit for %u (delta %u) (%llX) -> %llX\n",
- copy->mid, delta, mask, msg.futures);
}
- LOG (GNUNET_ERROR_TYPE_DEBUG, " final futures: %llX\n", ack, msg.futures);
-
- GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
+ GNUNET_free (crm->data_message);
+ GNUNET_free (crm);
+ send_ack_to_client (ch,
+ (NULL == ch->owner)
+ ? GNUNET_NO
+ : GNUNET_YES);
}
/**
- * Allow a client to send us more data, in case it was choked.
+ * We got an acknowledgement for payload data for a channel.
+ * Possibly resume transmissions.
*
- * @param ch Channel.
- * @param fwd Is this about FWD traffic? (Root client).
+ * @param ch channel that got the ack
+ * @param cti identifier of the connection that delivered the message
+ * @param ack details about what was received
*/
void
-GCCH_allow_client (struct CadetChannel *ch, int fwd)
+GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
+ const struct GNUNET_CADET_ChannelDataAckMessage *ack)
{
- struct CadetChannelReliability *rel;
- unsigned int buffer;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n");
+ struct CadetReliableMessage *crm;
+ struct CadetReliableMessage *crmn;
+ int found;
+ uint32_t mid_base;
+ uint64_t mid_mask;
+ unsigned int delta;
- if (CADET_CHANNEL_READY != ch->state)
+ GNUNET_break (GNUNET_NO == ch->is_loopback);
+ if (GNUNET_NO == ch->reliable)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n");
+ /* not expecting ACKs on unreliable channel, odd */
+ GNUNET_break_op (0);
return;
}
-
- if (GNUNET_YES == ch->reliable)
- {
- rel = fwd ? ch->root_rel : ch->dest_rel;
- if (NULL == rel)
- {
- GNUNET_break (GNUNET_NO != ch->destroy);
- return;
- }
- if (NULL != rel->head_sent)
+ /* mid_base is the MID of the next message that the
+ other peer expects (i.e. that is missing!), everything
+ LOWER (but excluding mid_base itself) was received. */
+ mid_base = ntohl (ack->mid.mid);
+ mid_mask = GNUNET_htonll (ack->futures);
+ found = GNUNET_NO;
+ for (crm = ch->head_sent;
+ NULL != crm;
+ crm = crmn)
+ {
+ crmn = crm->next;
+ delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
+ if (delta >= UINT_MAX - ch->max_pending_messages)
{
- if (64 <= rel->mid_send - rel->head_sent->mid)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n");
- return;
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n",
- rel->head_sent->mid, rel->mid_send);
- struct CadetReliableMessage *aux;
- for (aux = rel->head_sent; NULL != aux; aux = aux->next)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " - sent MID %u\n", aux->mid);
- }
- }
+ /* overflow, means crm was a bit in the past, so this ACK counts for it. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got DATA_ACK with base %u satisfying past message %u on %s\n",
+ (unsigned int) mid_base,
+ ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch));
+ handle_matching_ack (ch,
+ cti,
+ crm);
+ found = GNUNET_YES;
+ continue;
}
- else
+ delta--;
+ if (delta >= 64)
+ continue;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Testing bit %llX for mid %u (base: %u)\n",
+ (1LLU << delta),
+ ntohl (crm->data_message->mid.mid),
+ mid_base);
+ if (0 != (mid_mask & (1LLU << delta)))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got DATA_ACK with mask for %u on %s\n",
+ ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch));
+ handle_matching_ack (ch,
+ cti,
+ crm);
+ found = GNUNET_YES;
}
}
-
- if (is_loopback (ch))
- buffer = GCCH_get_buffer (ch, fwd);
- else
- buffer = GCT_get_connections_buffer (ch->t);
-
- if (0 == buffer)
+ if (GNUNET_NO == found)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n");
+ /* ACK for message we already dropped, might have been a
+ duplicate ACK? Ignore. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate DATA_ACK on %s, ignoring\n",
+ GCCH_2s (ch));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA_ACKs",
+ 1,
+ GNUNET_NO);
return;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer);
- send_client_ack (ch, fwd);
+ if (NULL != ch->retry_data_task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
+ }
+ if ( (NULL != ch->head_sent) &&
+ (NULL == ch->head_sent->qe) )
+ ch->retry_data_task
+ = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
+ &retry_transmission,
+ ch);
}
/**
- * Log channel info.
+ * Destroy channel, based on the other peer closing the
+ * connection. Also needs to remove this channel from
+ * the tunnel.
*
- * @param ch Channel.
+ * @param ch channel to destroy
+ * @param cti identifier of the connection that delivered the message,
+ * NULL if we are simulating receiving a destroy due to shutdown
*/
void
-GCCH_debug (struct CadetChannel *ch)
+GCCH_handle_remote_destroy (struct CadetChannel *ch,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
- if (NULL == ch)
+ struct CadetChannelClient *ccc;
+
+ GNUNET_assert (GNUNET_NO == ch->is_loopback);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received remote channel DESTROY for %s\n",
+ GCCH_2s (ch));
+ if (GNUNET_YES == ch->destroy)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CHANNEL ***\n");
+ /* Local client already gone, this is instant-death. */
+ channel_destroy (ch);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel %s:%X (%p)\n",
- GCT_2s (ch->t), ch->gid, ch);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " root %p/%p\n",
- ch->root, ch->root_rel);
- if (NULL != ch->root)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " cli %s\n", GML_2s (ch->root));
- LOG (GNUNET_ERROR_TYPE_DEBUG, " ready %s\n",
- ch->root_rel->client_ready ? "YES" : "NO");
- LOG (GNUNET_ERROR_TYPE_DEBUG, " id %X\n", ch->lid_root);
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, " dest %p/%p\n",
- ch->dest, ch->dest_rel);
- if (NULL != ch->dest)
+ ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
+ if ( (NULL != ccc) &&
+ (NULL != ccc->head_recv) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " cli %s\n", GML_2s (ch->dest));
- LOG (GNUNET_ERROR_TYPE_DEBUG, " ready %s\n",
- ch->dest_rel->client_ready ? "YES" : "NO");
- LOG (GNUNET_ERROR_TYPE_DEBUG, " id %X\n", ch->lid_dest);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Lost end of transmission due to remote shutdown on %s\n",
+ GCCH_2s (ch));
+ /* FIXME: change API to notify client about truncated transmission! */
}
+ ch->destroy = GNUNET_YES;
+ if (NULL != ccc)
+ GSC_handle_remote_channel_destroy (ccc->c,
+ ccc->ccn,
+ ch);
+ channel_destroy (ch);
}
/**
- * Handle an ACK given by a client.
+ * Test if element @a e1 comes before element @a e2.
*
- * Mark client as ready and send him any buffered data we could have for him.
- *
- * @param ch Channel.
- * @param fwd Is this a "FWD ACK"? (FWD ACKs are sent by dest and go BCK)
+ * @param cls closure, to a flag where we indicate duplicate packets
+ * @param crm1 an element of to sort
+ * @param crm2 another element to sort
+ * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
*/
-void
-GCCH_handle_local_ack (struct CadetChannel *ch, int fwd)
+static int
+cmp_crm_by_next_retry (void *cls,
+ struct CadetReliableMessage *crm1,
+ struct CadetReliableMessage *crm2)
{
- struct CadetChannelReliability *rel;
- struct CadetClient *c;
-
- rel = fwd ? ch->dest_rel : ch->root_rel;
- c = fwd ? ch->dest : ch->root;
-
- rel->client_ready = GNUNET_YES;
- send_client_buffered_data (ch, c, fwd);
-
- if (GNUNET_YES == ch->destroy && 0 == rel->n_recv)
- {
- send_destroy (ch, GNUNET_YES);
- GCCH_destroy (ch);
- }
- /* if loopback is marked for destruction, no need to ACK to the other peer,
- * it requested the destruction and is already gone, therefore, else if.
- */
- else if (is_loopback (ch))
- {
- unsigned int buffer;
-
- buffer = GCCH_get_buffer (ch, fwd);
- if (0 < buffer)
- GCCH_allow_client (ch, fwd);
-
- return;
- }
- GCT_send_connection_acks (ch->t);
+ if (crm1->next_retry.abs_value_us <
+ crm2->next_retry.abs_value_us)
+ return GNUNET_YES;
+ return GNUNET_NO;
}
/**
- * Handle data given by a client.
- *
- * Check whether the client is allowed to send in this tunnel, save if channel
- * is reliable and send an ACK to the client if there is still buffer space
- * in the tunnel.
- *
- * @param ch Channel.
- * @param c Client which sent the data.
- * @param message Message.
- * @param fwd Is this a FWD data?
- *
- * @return GNUNET_OK if everything goes well, GNUNET_SYSERR in case of en error.
+ * Function called once the tunnel has sent one of our messages.
+ * If the message is unreliable, simply frees the `crm`. If the
+ * message was reliable, calculate retransmission time and
+ * wait for ACK (or retransmit).
+ *
+ * @param cls the `struct CadetReliableMessage` that was sent
+ * @param cid identifier of the connection within the tunnel, NULL
+ * if transmission failed
*/
-int
-GCCH_handle_local_data (struct CadetChannel *ch,
- struct CadetClient *c,
- struct GNUNET_MessageHeader *message,
- int fwd)
+static void
+data_sent_cb (void *cls,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
- struct CadetChannelReliability *rel;
- struct GNUNET_CADET_Data *payload;
- size_t size = ntohs (message->size);
- uint16_t p2p_size = sizeof(struct GNUNET_CADET_Data) + size;
- unsigned char cbuf[p2p_size];
-
- /* Is the client in the channel? */
- if ( !( (fwd &&
- ch->root == c)
- ||
- (!fwd &&
- ch->dest == c) ) )
+ struct CadetReliableMessage *crm = cls;
+ struct CadetChannel *ch = crm->ch;
+
+ GNUNET_assert (GNUNET_NO == ch->is_loopback);
+ GNUNET_assert (NULL != crm->qe);
+ crm->qe = NULL;
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ if (GNUNET_NO == ch->reliable)
{
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ GNUNET_free (crm->data_message);
+ GNUNET_free (crm);
+ ch->pending_messages--;
+ send_ack_to_client (ch,
+ (NULL == ch->owner)
+ ? GNUNET_NO
+ : GNUNET_YES);
+ return;
}
-
- rel = fwd ? ch->root_rel : ch->dest_rel;
-
- if (GNUNET_NO == rel->client_allowed)
+ if (NULL == cid)
{
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ /* There was an error sending. */
+ crm->num_transmissions = GNUNET_SYSERR;
}
-
- rel->client_allowed = GNUNET_NO;
-
- /* Ok, everything is correct, send the message. */
- payload = (struct GNUNET_CADET_Data *) cbuf;
- payload->mid = htonl (rel->mid_send);
- rel->mid_send++;
- memcpy (&payload[1], message, size);
- payload->header.size = htons (p2p_size);
- payload->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA);
- payload->chid = htonl (ch->gid);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channel...\n");
- GCCH_send_prebuilt_message (&payload->header, ch, fwd, NULL);
-
- if (is_loopback (ch))
+ else if (GNUNET_SYSERR != crm->num_transmissions)
{
- if (GCCH_get_buffer (ch, fwd) > 0)
- GCCH_allow_client (ch, fwd);
-
- return GNUNET_OK;
+ /* Increment transmission counter, and possibly store @a cid
+ if this was the first transmission. */
+ crm->num_transmissions++;
+ if (1 == crm->num_transmissions)
+ {
+ crm->first_transmission_time = GNUNET_TIME_absolute_get ();
+ crm->connection_taken = *cid;
+ GCC_ack_expected (cid);
+ }
}
-
- if (GCT_get_connections_buffer (ch->t) > 0)
+ if ( (0 == crm->retry_delay.rel_value_us) &&
+ (NULL != cid) )
{
- GCCH_allow_client (ch, fwd);
- }
-
- return GNUNET_OK;
-}
+ struct CadetConnection *cc = GCC_lookup (cid);
-
-/**
- * Handle a channel destroy requested by a client.
- *
- * Destroy the channel and the tunnel in case this was the last channel.
- *
- * @param ch Channel.
- * @param c Client that requested the destruction (to avoid notifying him).
- * @param is_root Is the request coming from root?
- */
-void
-GCCH_handle_local_destroy (struct CadetChannel *ch,
- struct CadetClient *c,
- int is_root)
-{
- ch->destroy = GNUNET_YES;
- /* Cleanup after the tunnel */
- if (GNUNET_NO == is_root && c == ch->dest)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is destination.\n", GML_2s (c));
- GML_client_delete_channel (c, ch, ch->lid_dest);
- ch->dest = NULL;
- }
- if (GNUNET_YES == is_root && c == ch->root)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is owner.\n", GML_2s (c));
- GML_client_delete_channel (c, ch, ch->lid_root);
- ch->root = NULL;
+ if (NULL != cc)
+ crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
+ else
+ crm->retry_delay = ch->retry_time;
+ }
+ crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
+ crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
+ MIN_RTT_DELAY);
+ crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
+
+ GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
+ cmp_crm_by_next_retry,
+ NULL,
+ ch->head_sent,
+ ch->tail_sent,
+ crm);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message %u sent, next transmission on %s in %s\n",
+ (unsigned int) ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch),
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
+ GNUNET_YES));
+ if (NULL == ch->head_sent->qe)
+ {
+ if (NULL != ch->retry_data_task)
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task
+ = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
+ &retry_transmission,
+ ch);
}
-
- send_destroy (ch, GNUNET_NO);
- if (0 == ch->pending_messages)
- GCCH_destroy (ch);
}
/**
- * Handle a channel create requested by a client.
- *
- * Create the channel and the tunnel in case this was the first0 channel.
+ * Handle data given by a client.
*
- * @param c Client that requested the creation (will be the root).
- * @param msg Create Channel message.
+ * Check whether the client is allowed to send in this tunnel, save if
+ * channel is reliable and send an ACK to the client if there is still
+ * buffer space in the tunnel.
*
- * @return GNUNET_OK if everything went fine, GNUNET_SYSERR otherwise.
+ * @param ch Channel.
+ * @param sender_ccn ccn of the sender
+ * @param buf payload to transmit.
+ * @param buf_len number of bytes in @a buf
+ * @return #GNUNET_OK if everything goes well,
+ * #GNUNET_SYSERR in case of an error.
*/
int
-GCCH_handle_local_create (struct CadetClient *c,
- struct GNUNET_CADET_ChannelMessage *msg)
+GCCH_handle_local_data (struct CadetChannel *ch,
+ struct GNUNET_CADET_ClientChannelNumber sender_ccn,
+ const char *buf,
+ size_t buf_len)
{
- struct CadetChannel *ch;
- struct CadetTunnel *t;
- struct CadetPeer *peer;
- CADET_ChannelNumber chid;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " towards %s:%u\n",
- GNUNET_i2s (&msg->peer), ntohl (msg->port));
- chid = ntohl (msg->channel_id);
-
- /* Sanity check for duplicate channel IDs */
- if (NULL != GML_channel_get (c, chid))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
+ struct CadetReliableMessage *crm;
- peer = GCP_get (&msg->peer);
- GCP_add_tunnel (peer);
- t = GCP_get_tunnel (peer);
-
- if (GCP_get_short_id (peer) == myid)
- {
- GCT_change_cstate (t, CADET_TUNNEL_READY);
- }
- else
- {
- /* FIXME change to a tunnel API, eliminate ch <-> peer connection */
- GCP_connect (peer);
- }
-
- /* Create channel */
- ch = channel_new (t, c, chid);
- if (NULL == ch)
+ if (ch->pending_messages >= ch->max_pending_messages)
{
- GNUNET_break (0);
+ GNUNET_break (0); /* Fails: #5370 */
return GNUNET_SYSERR;
}
- ch->port = ntohl (msg->port);
- channel_set_options (ch, ntohl (msg->opt));
-
- /* In unreliable channels, we'll use the DLL to buffer BCK data */
- ch->root_rel = GNUNET_new (struct CadetChannelReliability);
- ch->root_rel->ch = ch;
- ch->root_rel->retry_timer = CADET_RETRANSMIT_TIME;
- ch->root_rel->expected_delay.rel_value_us = 0;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GCCH_2s (ch));
-
- send_create (ch);
-
- return GNUNET_OK;
-}
-
-
-/**
- * Handler for cadet network payload traffic.
- *
- * @param ch Channel for the message.
- * @param msg Unencryted data message.
- * @param fwd Is this message fwd? This only is meaningful in loopback channels.
- * #GNUNET_YES if message is FWD on the respective channel (loopback)
- * #GNUNET_NO if message is BCK on the respective channel (loopback)
- * #GNUNET_SYSERR if message on a one-ended channel (remote)
- */
-void
-GCCH_handle_data (struct CadetChannel *ch,
- const struct GNUNET_CADET_Data *msg,
- int fwd)
-{
- struct CadetChannelReliability *rel;
- struct CadetClient *c;
- uint32_t mid;
-
- /* If this is a remote (non-loopback) channel, find 'fwd'. */
- if (GNUNET_SYSERR == fwd)
+ if (GNUNET_YES == ch->destroy)
{
- if (is_loopback (ch))
- {
- /* It is a loopback channel after all... */
- GNUNET_break (0);
- return;
- }
- fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
+ /* we are going down, drop messages */
+ return GNUNET_OK;
}
+ ch->pending_messages++;
- /* Initialize FWD/BCK data */
- c = fwd ? ch->dest : ch->root;
- rel = fwd ? ch->dest_rel : ch->root_rel;
-
- if (NULL == c)
+ if (GNUNET_YES == ch->is_loopback)
{
- GNUNET_break (GNUNET_NO != ch->destroy);
- return;
- }
+ struct CadetChannelClient *receiver;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalData *ld;
+ int ack_to_owner;
- if (CADET_CHANNEL_READY != ch->state)
- {
- if (GNUNET_NO == fwd)
+ env = GNUNET_MQ_msg_extra (ld,
+ buf_len,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
+ if ( (NULL != ch->owner) &&
+ (sender_ccn.channel_of_client ==
+ ch->owner->ccn.channel_of_client) )
{
- /* If we are the root, this means the other peer has sent traffic before
- * receiving our ACK. Even if the SYNACK goes missing, no traffic should
- * be sent before the ACK.
- */
- GNUNET_break_op (0);
- return;
+ receiver = ch->dest;
+ ack_to_owner = GNUNET_YES;
}
- /* If we are the dest, this means that the SYNACK got to the root but
- * the ACK went missing. Treat this as an ACK.
- */
- channel_confirm (ch, GNUNET_NO);
- }
-
- GNUNET_STATISTICS_update (stats, "# data received", 1, GNUNET_NO);
-
- mid = ntohl (msg->mid);
- LOG (GNUNET_ERROR_TYPE_INFO, "<=== DATA %u %s on channel %s\n",
- mid, GC_f2s (fwd), GCCH_2s (ch));
-
- if (GNUNET_NO == ch->reliable ||
- ( !GC_is_pid_bigger (rel->mid_recv, mid) &&
- GC_is_pid_bigger (rel->mid_recv + 64, mid) ) )
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "RECV %u (%u)\n",
- mid, ntohs (msg->header.size));
- if (GNUNET_YES == ch->reliable)
+ else if ( (NULL != ch->dest) &&
+ (sender_ccn.channel_of_client ==
+ ch->dest->ccn.channel_of_client) )
{
- /* Is this the exact next expected messasge? */
- if (mid == rel->mid_recv)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "as expected, sending to client\n");
- rel->mid_recv++;
- send_client_data (ch, msg, fwd);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "save for later\n");
- add_buffered_data (msg, rel);
- }
+ receiver = ch->owner;
+ ack_to_owner = GNUNET_NO;
}
else
{
- /* Tunnel is unreliable: send to clients directly */
- /* FIXME: accept Out Of Order traffic */
- rel->mid_recv = mid + 1;
- send_client_data (ch, msg, fwd);
- }
- }
- else
- {
- GNUNET_break_op (GC_is_pid_bigger (rel->mid_recv, mid));
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "MID %u on channel %s not expected (window: %u - %u). Dropping!\n",
- mid, GCCH_2s (ch), rel->mid_recv, rel->mid_recv + 63);
- }
-
- GCCH_send_data_ack (ch, fwd);
-}
-
-
-/**
- * Handler for cadet network traffic end-to-end ACKs.
- *
- * @param ch Channel on which we got this message.
- * @param msg Data message.
- * @param fwd Is this message fwd? This only is meaningful in loopback channels.
- * #GNUNET_YES if message is FWD on the respective channel (loopback)
- * #GNUNET_NO if message is BCK on the respective channel (loopback)
- * #GNUNET_SYSERR if message on a one-ended channel (remote)
- */
-void
-GCCH_handle_data_ack (struct CadetChannel *ch,
- const struct GNUNET_CADET_DataACK *msg,
- int fwd)
-{
- struct CadetChannelReliability *rel;
- struct CadetReliableMessage *copy;
- struct CadetReliableMessage *next;
- uint32_t ack;
- int work;
-
- /* If this is a remote (non-loopback) channel, find 'fwd'. */
- if (GNUNET_SYSERR == fwd)
- {
- if (is_loopback (ch))
- {
- /* It is a loopback channel after all... */
GNUNET_break (0);
- return;
- }
- /* Inverted: if message came 'FWD' is a 'BCK ACK'. */
- fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES;
- }
-
- ack = ntohl (msg->mid);
- LOG (GNUNET_ERROR_TYPE_INFO, "<=== %s ACK %u + %X\n",
- GC_f2s (fwd), ack, msg->futures);
-
- if (GNUNET_YES == fwd)
- {
- rel = ch->root_rel;
- }
- else
- {
- rel = ch->dest_rel;
- }
- if (NULL == rel)
- {
- GNUNET_break_op (GNUNET_NO != ch->destroy);
- return;
- }
-
- /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */
- for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
- {
- if (GC_is_pid_bigger (copy->mid, ack))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " head %u, out!\n", copy->mid);
- channel_rel_free_sent (rel, msg);
- break;
+ return GNUNET_SYSERR;
}
- work = GNUNET_YES;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " id %u\n", copy->mid);
- next = copy->next;
- if (GNUNET_YES == rel_message_free (copy, GNUNET_YES))
- return;
- }
-
- /* ACK client if needed and possible */
- GCCH_allow_client (ch, fwd);
-
- /* If some message was free'd, update the retransmission delay */
- if (GNUNET_YES == work)
- {
- if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
+ GNUNET_assert (NULL != receiver);
+ ld->ccn = receiver->ccn;
+ GNUNET_memcpy (&ld[1],
+ buf,
+ buf_len);
+ if (GNUNET_YES == receiver->client_ready)
{
- GNUNET_SCHEDULER_cancel (rel->retry_task);
- rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
- if (NULL != rel->head_sent && NULL == rel->head_sent->chq)
- {
- struct GNUNET_TIME_Absolute new_target;
- struct GNUNET_TIME_Relative delay;
-
- delay = GNUNET_TIME_relative_multiply (rel->retry_timer,
- CADET_RETRANSMIT_MARGIN);
- new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
- delay);
- delay = GNUNET_TIME_absolute_get_remaining (new_target);
- rel->retry_task =
- GNUNET_SCHEDULER_add_delayed (delay,
- &channel_retransmit_message,
- rel);
- }
+ ch->pending_messages--;
+ GSC_send_to_client (receiver->c,
+ env);
+ send_ack_to_client (ch,
+ ack_to_owner);
}
else
{
- /* Work was done but no task was pending? Shouldn't happen! */
- GNUNET_break (0);
+ struct CadetOutOfOrderMessage *oom;
+
+ oom = GNUNET_new (struct CadetOutOfOrderMessage);
+ oom->env = env;
+ GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
+ receiver->tail_recv,
+ oom);
+ receiver->num_recv++;
}
+ return GNUNET_OK;
}
+
+ /* Everything is correct, send the message. */
+ crm = GNUNET_malloc (sizeof (*crm));
+ crm->ch = ch;
+ crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
+ + buf_len);
+ crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
+ crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
+ ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
+ crm->data_message->mid = ch->mid_send;
+ crm->data_message->ctn = ch->ctn;
+ GNUNET_memcpy (&crm->data_message[1],
+ buf,
+ buf_len);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message %u from local client to %s with %u bytes\n",
+ ntohl (crm->data_message->mid.mid),
+ GCCH_2s (ch),
+ buf_len);
+ if (NULL != ch->retry_data_task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
+ }
+ crm->qe = GCT_send (ch->t,
+ &crm->data_message->header,
+ &data_sent_cb,
+ crm);
+ GNUNET_assert (NULL == ch->retry_data_task);
+ return GNUNET_OK;
}
/**
- * Handler for channel create messages.
- *
- * Does not have fwd parameter because it's always 'FWD': channel is incoming.
+ * Handle ACK from client on local channel. Means the client is ready
+ * for more data, see if we have any for it.
*
- * @param t Tunnel this channel will be in.
- * @param msg Channel crate message.
+ * @param ch channel to destroy
+ * @param client_ccn ccn of the client sending the ack
*/
-struct CadetChannel *
-GCCH_handle_create (struct CadetTunnel *t,
- const struct GNUNET_CADET_ChannelCreate *msg)
+void
+GCCH_handle_local_ack (struct CadetChannel *ch,
+ struct GNUNET_CADET_ClientChannelNumber client_ccn)
{
- CADET_ChannelNumber chid;
- struct CadetChannel *ch;
- struct CadetClient *c;
- int new_channel;
- int reaction;
-
- reaction = GNUNET_NO;
- chid = ntohl (msg->chid);
- ch = GCT_get_channel (t, chid);
- if (NULL == ch)
- {
- /* Create channel */
- ch = channel_new (t, NULL, 0);
- ch->gid = chid;
- channel_set_options (ch, ntohl (msg->opt));
- new_channel = GNUNET_YES;
- }
+ struct CadetChannelClient *ccc;
+ struct CadetOutOfOrderMessage *com;
+
+ if ( (NULL != ch->owner) &&
+ (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
+ ccc = ch->owner;
+ else if ( (NULL != ch->dest) &&
+ (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
+ ccc = ch->dest;
else
+ GNUNET_assert (0);
+ ccc->client_ready = GNUNET_YES;
+ com = ccc->head_recv;
+ if (NULL == com)
{
- new_channel = GNUNET_NO;
- }
-
- if (GNUNET_YES == new_channel || GCT_is_loopback (t))
- {
- /* Find a destination client */
- ch->port = ntohl (msg->port);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n", ch->port);
- c = GML_client_get_by_port (ch->port);
- if (NULL == c)
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
+ GSC_2s (ccc->c),
+ ntohl (client_ccn.channel_of_client),
+ GCCH_2s (ch),
+ ntohl (ccc->ccn.channel_of_client),
+ ccc);
+ return; /* none pending */
+ }
+ if (GNUNET_YES == ch->is_loopback)
+ {
+ int to_owner;
+
+ /* Messages are always in-order, just send */
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ GSC_send_to_client (ccc->c,
+ com->env);
+ /* Notify sender that we can receive more */
+ if ( (NULL != ch->owner) &&
+ (ccc->ccn.channel_of_client ==
+ ch->owner->ccn.channel_of_client) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " no client has port registered\n");
- if (is_loopback (ch))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " loopback: destroy on handler\n");
- send_nack (ch);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " not loopback: destroy now\n");
- send_nack (ch);
- GCCH_destroy (ch);
- }
- return NULL;
+ to_owner = GNUNET_NO;
}
else
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " client %p has port registered\n", c);
+ GNUNET_assert ( (NULL != ch->dest) &&
+ (ccc->ccn.channel_of_client ==
+ ch->dest->ccn.channel_of_client) );
+ to_owner = GNUNET_YES;
}
-
- add_destination (ch, c);
- if (GNUNET_YES == ch->reliable)
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Not Reliable\n");
-
- send_client_create (ch);
- ch->state = CADET_CHANNEL_SENT;
+ send_ack_to_client (ch,
+ to_owner);
+ GNUNET_free (com);
+ return;
}
- else
+
+ if ( (com->mid.mid != ch->mid_recv.mid) &&
+ (GNUNET_NO == ch->out_of_order) &&
+ (GNUNET_YES == ch->reliable) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate create channel\n");
- reaction = GNUNET_YES;
- if (GNUNET_SCHEDULER_NO_TASK != ch->dest_rel->retry_task)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " clearing retry task\n");
- /* we were waiting to re-send our 'SYNACK', wait no more! */
- GNUNET_SCHEDULER_cancel (ch->dest_rel->retry_task);
- ch->dest_rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
+ GSC_2s (ccc->c),
+ ntohl (ccc->ccn.channel_of_client),
+ ntohl (com->mid.mid),
+ ntohl (ch->mid_recv.mid));
+ return; /* missing next one in-order */
}
- send_ack (ch, GNUNET_YES, reaction);
-
- return ch;
-}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
+ ntohl (com->mid.mid),
+ GSC_2s (ccc->c),
+ ntohl (ccc->ccn.channel_of_client),
+ GCCH_2s (ch));
-/**
- * Handler for channel NACK messages.
- *
- * NACK messages always go dest -> root, no need for 'fwd' or 'msg' parameter.
- *
- * @param ch Channel.
- */
-void
-GCCH_handle_nack (struct CadetChannel *ch)
-{
- send_client_nack (ch);
- GCCH_destroy (ch);
+ /* all good, pass next message to client */
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ ccc->num_recv--;
+ /* FIXME: if unreliable, this is not aggressive
+ enough, as it would be OK to have lost some! */
+
+ ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
+ ch->mid_futures >>= 1; /* equivalent to division by 2 */
+ ccc->client_ready = GNUNET_NO;
+ GSC_send_to_client (ccc->c,
+ com->env);
+ GNUNET_free (com);
+ send_channel_data_ack (ch);
+ if (NULL != ccc->head_recv)
+ return;
+ if (GNUNET_NO == ch->destroy)
+ return;
+ GCT_send_channel_destroy (ch->t,
+ ch->ctn);
+ channel_destroy (ch);
}
-/**
- * Handler for channel ack messages.
- *
- * @param ch Channel.
- * @param msg Message.
- * @param fwd Is this message fwd? This only is meaningful in loopback channels.
- * #GNUNET_YES if message is FWD on the respective channel (loopback)
- * #GNUNET_NO if message is BCK on the respective channel (loopback)
- * #GNUNET_SYSERR if message on a one-ended channel (remote)
- */
-void
-GCCH_handle_ack (struct CadetChannel *ch,
- const struct GNUNET_CADET_ChannelManage *msg,
- int fwd)
-{
- /* If this is a remote (non-loopback) channel, find 'fwd'. */
- if (GNUNET_SYSERR == fwd)
- {
- if (is_loopback (ch))
- {
- /* It is a loopback channel after all... */
- GNUNET_break (0);
- return;
- }
- fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
- }
-
- channel_confirm (ch, !fwd);
-}
+#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
/**
- * Handler for channel destroy messages.
+ * Log channel info.
*
- * @param ch Channel to be destroyed of.
- * @param msg Message.
- * @param fwd Is this message fwd? This only is meaningful in loopback channels.
- * #GNUNET_YES if message is FWD on the respective channel (loopback)
- * #GNUNET_NO if message is BCK on the respective channel (loopback)
- * #GNUNET_SYSERR if message on a one-ended channel (remote)
+ * @param ch Channel.
+ * @param level Debug level to use.
*/
void
-GCCH_handle_destroy (struct CadetChannel *ch,
- const struct GNUNET_CADET_ChannelManage *msg,
- int fwd)
+GCCH_debug (struct CadetChannel *ch,
+ enum GNUNET_ErrorType level)
{
- struct CadetChannelReliability *rel;
+#if !defined(GNUNET_CULL_LOGGING)
+ int do_log;
- /* If this is a remote (non-loopback) channel, find 'fwd'. */
- if (GNUNET_SYSERR == fwd)
- {
- if (is_loopback (ch))
- {
- /* It is a loopback channel after all... */
- GNUNET_break (0);
- return;
- }
- fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
- }
-
- GCCH_debug (ch);
- if ( (fwd && NULL == ch->dest) || (!fwd && NULL == ch->root) )
- {
- /* Not for us (don't destroy twice a half-open loopback channel) */
+ do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
+ "cadet-chn",
+ __FILE__, __FUNCTION__, __LINE__);
+ if (0 == do_log)
return;
- }
-
- rel = fwd ? ch->dest_rel : ch->root_rel;
- if (0 == rel->n_recv)
- {
- send_destroy (ch, GNUNET_YES);
- GCCH_destroy (ch);
- }
- else
- {
- ch->destroy = GNUNET_YES;
- }
-}
-
-
-/**
- * Sends an already built message on a channel.
- *
- * If the channel is on a loopback tunnel, notifies the appropriate destination
- * client locally.
- *
- * On a normal channel passes the message to the tunnel for encryption and
- * sending on a connection.
- *
- * This function DOES NOT save the message for retransmission.
- *
- * @param message Message to send. Function makes a copy of it.
- * @param ch Channel on which this message is transmitted.
- * @param fwd Is this a fwd message?
- * @param existing_copy This is a retransmission, don't save a copy.
- */
-void
-GCCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
- struct CadetChannel *ch, int fwd,
- void *existing_copy)
-{
- struct CadetChannelQueue *chq;
- uint16_t type;
-
- type = ntohs (message->type);
- LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %s on channel %s\n",
- GC_m2s (type), GC_f2s (fwd), GCCH_2s (ch));
- if (GCT_is_loopback (ch->t))
+ if (NULL == ch)
{
- handle_loopback (ch, message, fwd);
+ LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
return;
}
-
- switch (type)
+ LOG2 (level,
+ "CHN %s:%X (%p)\n",
+ GCT_2s (ch->t),
+ ch->ctn,
+ ch);
+ if (NULL != ch->owner)
{
- struct GNUNET_CADET_Data *payload;
- case GNUNET_MESSAGE_TYPE_CADET_DATA:
-
- payload = (struct GNUNET_CADET_Data *) message;
- LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %u\n",
- GC_m2s (type), ntohl (payload->mid));
- if (GNUNET_YES == ch->reliable)
- {
- chq = GNUNET_new (struct CadetChannelQueue);
- chq->type = type;
- if (NULL == existing_copy)
- chq->copy = channel_save_copy (ch, message, fwd);
- else
- {
- chq->copy = (struct CadetReliableMessage *) existing_copy;
- if (NULL != chq->copy->chq)
- {
- /* Last retransmission was queued but not yet sent!
- * This retransmission was scheduled by a ch_message_sent which
- * followed a very fast RTT, so the tiny delay made the
- * retransmission function to execute before the previous
- * retransmitted message even had a chance to leave the peer.
- * Cancel this message and wait until the pending
- * retransmission leaves the peer and ch_message_sent starts
- * the timer for the next one.
- */
- GNUNET_free (chq);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- " exisitng copy not yet transmitted!\n");
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- " using existing copy: %p {r:%p q:%p t:%u}\n",
- existing_copy,
- chq->copy->rel, chq->copy->chq, chq->copy->type);
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, " new chq: %p\n", chq);
- chq->copy->chq = chq;
- chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL,
- NULL != existing_copy,
- &ch_message_sent, chq);
- /* q itself is stored in copy */
- GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy);
- }
- else
- {
- fire_and_forget (message, ch, GNUNET_NO);
- }
- break;
-
-
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
- if (GNUNET_YES == fwd || NULL != existing_copy)
- {
- /* BCK ACK (going FWD) is just a response for a SYNACK, don't keep*/
- fire_and_forget (message, ch, GNUNET_YES);
- return;
- }
- /* fall-trough */
- case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
- chq = GNUNET_new (struct CadetChannelQueue);
- chq->type = type;
- chq->rel = fwd ? ch->root_rel : ch->dest_rel;
- if (NULL != chq->rel->uniq)
- {
- if (NULL != chq->rel->uniq->tq)
- {
- GCT_cancel (chq->rel->uniq->tq);
- /* ch_message_sent is called, freeing and NULLing uniq */
- }
- else
- {
- GNUNET_break (0);
- GNUNET_free (chq->rel->uniq);
- }
- }
- chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL, GNUNET_YES,
- &ch_message_sent, chq);
- if (NULL == chq->tq)
- {
- GNUNET_break (0);
- GCT_debug (ch->t, GNUNET_ERROR_TYPE_ERROR);
- GNUNET_free (chq);
- chq = NULL;
- return;
- }
- chq->rel->uniq = chq;
- break;
-
-
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
- fire_and_forget (message, ch, GNUNET_YES);
- break;
-
-
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "type %s unknown!\n", GC_m2s (type));
- fire_and_forget (message, ch, GNUNET_YES);
+ LOG2 (level,
+ "CHN origin %s ready %s local-id: %u\n",
+ GSC_2s (ch->owner->c),
+ ch->owner->client_ready ? "YES" : "NO",
+ ntohl (ch->owner->ccn.channel_of_client));
}
+ if (NULL != ch->dest)
+ {
+ LOG2 (level,
+ "CHN destination %s ready %s local-id: %u\n",
+ GSC_2s (ch->dest->c),
+ ch->dest->client_ready ? "YES" : "NO",
+ ntohl (ch->dest->ccn.channel_of_client));
+ }
+ LOG2 (level,
+ "CHN Message IDs recv: %d (%LLX), send: %d\n",
+ ntohl (ch->mid_recv.mid),
+ (unsigned long long) ch->mid_futures,
+ ntohl (ch->mid_send.mid));
+#endif
}
-/**
- * Get the static string for identification of the channel.
- *
- * @param ch Channel.
- *
- * @return Static string with the channel IDs.
- */
-const char *
-GCCH_2s (const struct CadetChannel *ch)
-{
- static char buf[64];
-
- if (NULL == ch)
- return "(NULL Channel)";
-
- sprintf (buf, "%s:%u gid:%X (%X / %X)",
- GCT_2s (ch->t), ch->port, ch->gid, ch->lid_root, ch->lid_dest);
- return buf;
-}
+/* end of gnunet-service-cadet-new_channel.c */