/*
- This file is part of GNUnet.
- Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
+ This file is part of GNUnet.
+ Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
SPDX-License-Identifier: AGPL3.0-or-later
*/
*
* TODO:
* Implement next:
- * - add (more) logging
- * - realize transport-to-transport flow control (needed in case
- * communicators do not offer flow control). Note that we may not
- * want to simply delay the ACKs as that may cause unnecessary
- * re-transmissions. => Introduce proper flow and congestion window(s)!
* - review retransmission logic, right now there is no smartness there!
- * => congestion control, flow control, etc [PERFORMANCE-BASICS]
+ * => congestion control, etc [PERFORMANCE-BASICS]
*
- * Optimizations:
+ * Optimizations-Statistics:
+ * - Track ACK losses based on ACK-counter [ROUTING]
+ * - Need to track total bandwidth per VirtualLink and adjust how frequently
+ * we send FC messages based on bandwidth-delay-product (and relation
+ * to the window size!). See OPTIMIZE-FC-BDP.
+ * - Consider more statistics in #check_connection_quality() [FIXME-CONQ-STATISTICS]
+ * - Adapt available_fc_window_size, using larger values for high-bandwidth
+ * and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls]
+ * - Set last_window_consum_limit promise properly based on
+ * latency and bandwidth of the respective connection [GOODPUT / utilization / stalls]
+ *
+ * Optimizations-DV:
* - When forwarding DV learn messages, if a peer is reached that
* has a *bidirectional* link to the origin beyond 1st hop,
* do NOT forward it to peers _other_ than the origin, as
* there is clearly a better path directly from the origin to
* whatever else we could reach.
- * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
- * => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
- * - queue_send_msg and route_message both by API design have to make copies
- * of the payload, and route_message on top of that requires a malloc/free.
- * Change design to approximate "zero" copy better... [CPU]
- * - could avoid copying body of message into each fragment and keep
- * fragments as just pointers into the original message and only
- * fully build fragments just before transmission (optimization, should
- * reduce CPU and memory use) [CPU, MEMORY]
- * - if messages are below MTU, consider adding ACKs and other stuff
- * to the same transmission to avoid tiny messages (requires planning at
- * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
* - When we passively learned DV (with unconfirmed freshness), we
* right now add the path to our list but with a zero path_valid_until
* time and only use it for unconfirmed routes. However, we could consider
* triggering an explicit validation mechansim ourselves, specifically routing
* a challenge-response message over the path [ROUTING]
- * - Track ACK losses based on ACK-counter [ROUTING]
+ * = if available, try to confirm unconfirmed DV paths when trying to establish
+ * virtual link for a `struct IncomingRequest`. (i.e. if DVH is
+ * unconfirmed, incoming requests cause us to try to validate a passively
+ * learned path (requires new message type!))
+ *
+ * Optimizations-Fragmentation:
+ * - Fragments send over a reliable channel could do without the
+ * AcknowledgementUUIDP altogether, as they won't be acked! [BANDWIDTH]
+ * (-> have 2nd type of acknowledgment message; low priority, as we
+ * do not have an MTU-limited *reliable* communicator) [FIXME-FRAG-REL-UUID]
+ * - if messages are below MTU, consider adding ACKs and other stuff
+ * to the same transmission to avoid tiny messages (requires planning at
+ * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
*
- * Design realizations / discussion:
- * - communicators do flow control by calling MQ "notify sent"
- * when 'ready'. They determine flow implicitly (i.e. TCP blocking)
- * or explicitly via backchannel FC ACKs. As long as the
- * channel is not full, they may 'notify sent' even if the other
- * peer has not yet confirmed receipt. The other peer confirming
- * is _only_ for FC, not for more reliable transmission; reliable
- * transmission (i.e. of fragments) is left to _transport_.
- * - ACKs sent back in uni-directional communicators are done via
- * the background channel API; here transport _may_ initially
- * broadcast (with bounded # hops) if no path is known;
- * - transport should _integrate_ DV-routing and build a view of
- * the network; then background channel traffic can be
- * routed via DV as well as explicit "DV" traffic.
- * - background channel is also used for ACKs and NAT traversal support
- * - transport service is responsible for AEAD'ing the background
- * channel, timestamps and monotonic time are used against replay
- * of old messages -> peerstore needs to be supplied with
- * "latest timestamps seen" data
- * - if transport implements DV, we likely need a 3rd peermap
- * in addition to ephemerals and (direct) neighbours
- * ==> check if stuff needs to be moved out of "Neighbour"
- * - transport should encapsualte core-level messages and do its
- * own ACKing for RTT/goodput/loss measurements _and_ fragment
- * for retransmission
+ * Optimizations-internals:
+ * - queue_send_msg by API design has to make a copy
+ * of the payload, and route_message on top of that requires a malloc/free.
+ * Change design to approximate "zero" copy better... [CPU]
+ * - could avoid copying body of message into each fragment and keep
+ * fragments as just pointers into the original message and only
+ * fully build fragments just before transmission (optimization, should
+ * reduce CPU and memory use) [CPU, MEMORY]
*/
#include "platform.h"
#include "gnunet_util_lib.h"
*/
#define MAX_CUMMULATIVE_ACKS 64
+/**
+ * What is the 1:n chance that we send a Flow control response when
+ * receiving a flow control message that did not change anything for
+ * us? Basically, this is used in the case where both peers are stuck
+ * on flow control (no window changes), but one might continue sending
+ * flow control messages to the other peer as the first FC message
+ * when things stalled got lost, and then subsequently the other peer
+ * does *usually* not respond as nothing changed. So to ensure that
+ * eventually the FC messages stop, we do send with 1/8th probability
+ * an FC message even if nothing changed. That prevents one peer
+ * being stuck in sending (useless) FC messages "forever".
+ */
+#define FC_NO_CHANGE_REPLY_PROBABILITY 8
+
/**
* What is the size we assume for a read operation in the
* absence of an MTU for the purpose of flow control?
*/
#define GOODPUT_AGING_SLOTS 4
+/**
+ * How big is the flow control window size by default;
+ * limits per-neighbour RAM utilization.
+ */
+#define DEFAULT_WINDOW_SIZE (128 * 1024)
+
+/**
+ * For how many incoming connections do we try to create a
+ * virtual link for (at the same time!). This does NOT
+ * limit the number of incoming connections, just the number
+ * for which we are actively trying to find working addresses
+ * in the absence (!) of our own applications wanting the
+ * link to go up.
+ */
+#define MAX_INCOMING_REQUEST 16
+
/**
* Maximum number of peers we select for forwarding DVInit
* messages at the same time (excluding initiator).
#define DELAY_WARN_THRESHOLD \
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+/**
+ * If a DVBox could not be forwarded after this number of
+ * seconds we drop it.
+ */
+#define DV_FORWARD_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
/**
* We only consider queues as "quality" connections when
* suppressing the generation of DV initiation messages if
struct AcknowledgementUUIDP
{
/**
- * The UUID value. Not actually a hash, but a random value.
+ * The UUID value.
*/
- struct GNUNET_ShortHashCode value;
+ struct GNUNET_Uuid value;
};
*/
struct GNUNET_MessageHeader header;
- /**
- * Reserved, always zero.
- */
- uint32_t reserved GNUNET_PACKED;
-
- /**
- * Target's peer identity (as backchannels may be transmitted
- * indirectly, or even be broadcast).
- */
- struct GNUNET_PeerIdentity target;
-
- /**
- * Ephemeral key setup by the sender for @e target, used
- * to encrypt the payload.
- */
- struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
-
- /**
- * We use an IV here as the @e ephemeral_key is re-used for
- * #EPHEMERAL_VALIDITY time to avoid re-signing it all the time.
- */
- struct GNUNET_ShortHashCode iv;
-
- /**
- * HMAC over the ciphertext of the encrypted, variable-size
- * body that follows. Verified via DH of @e target and
- * @e ephemeral_key
- */
- struct GNUNET_HashCode hmac;
+ /* Followed by *another* message header which is the message to
+ the communicator */
- /* Followed by encrypted, variable-size payload */
+ /* Followed by a 0-terminated name of the communicator */
};
*/
struct EphemeralConfirmationPS
{
-
/**
* Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
*/
* Plaintext of the variable-size payload that is encrypted
* within a `struct TransportBackchannelEncapsulationMessage`
*/
-struct TransportBackchannelRequestPayloadP
+struct TransportDVBoxPayloadP
{
-
/**
* Sender's peer identity.
*/
struct GNUNET_TIME_AbsoluteNBO monotonic_time;
/* Followed by a `struct GNUNET_MessageHeader` with a message
- for a communicator */
-
- /* Followed by a 0-termianted string specifying the name of
- the communicator which is to receive the message */
+ for the target peer */
};
* shortcut.
*
* If a peer finds itself still on the list, it must drop the message.
+ *
+ * The payload of the box can only be decrypted and verified by the
+ * ultimate receiver. Intermediaries do not learn the sender's
+ * identity and the path the message has taken. However, the first
+ * hop does learn the sender as @e total_hops would be zero and thus
+ * the predecessor must be the origin (so this is not really useful
+ * for anonymization).
*/
struct TransportDVBoxMessage
{
/**
* Number of total hops this messages travelled. In NBO.
* @e origin sets this to zero, to be incremented at
- * each hop.
+ * each hop. Peers should limit the @e total_hops value
+ * they accept from other peers.
*/
uint16_t total_hops GNUNET_PACKED;
/**
- * Number of hops this messages includes. In NBO.
+ * Number of hops this messages includes. In NBO. Reduced by one
+ * or more at each hop. Peers should limit the @e num_hops value
+ * they accept from other peers.
*/
uint16_t num_hops GNUNET_PACKED;
/**
- * Identity of the peer that originated the message.
+ * Ephemeral key setup by the sender for target, used to encrypt the
+ * payload. Intermediaries must not change this value.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+ /**
+ * We use an IV here as the @e ephemeral_key is re-used for
+ * #EPHEMERAL_VALIDITY time to avoid re-signing it all the time.
+ * Intermediaries must not change this value.
+ */
+ struct GNUNET_ShortHashCode iv;
+
+ /**
+ * HMAC over the ciphertext of the encrypted, variable-size body
+ * that follows. Verified via DH of target and @e ephemeral_key.
+ * Intermediaries must not change this value.
*/
- struct GNUNET_PeerIdentity origin;
+ struct GNUNET_HashCode hmac;
/* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
excluding the @e origin and the current peer, the last must be
the ultimate target; if @e num_hops is zero, the receiver of this
message is the ultimate target. */
- /* Followed by the actual message, which itself may be
- another box, but not a DV_LEARN or DV_BOX message! */
+ /* Followed by encrypted, variable-size payload, which
+ must begin with a `struct TransportDVBoxPayloadP` */
+
+ /* Followed by the actual message, which itself must not be a
+ a DV_LEARN or DV_BOX message! */
};
*/
struct TransportValidationChallengeMessage
{
-
/**
* Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE
*/
struct GNUNET_MessageHeader header;
/**
- * Zero.
+ * Always zero.
*/
uint32_t reserved GNUNET_PACKED;
struct ChallengeNonceP challenge;
/**
- * Timestamp of the sender, to be copied into the reply
- * to allow sender to calculate RTT.
+ * Timestamp of the sender, to be copied into the reply to allow
+ * sender to calculate RTT. Must be monotonically increasing!
*/
struct GNUNET_TIME_AbsoluteNBO sender_time;
};
*/
struct TransportValidationPS
{
-
/**
* Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE
*/
/**
- * Message send to a peer to respond to a
+ * Message send to a peer to respond to a
* #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
*/
struct TransportValidationResponseMessage
{
-
/**
* Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE
*/
struct GNUNET_MessageHeader header;
/**
- * Zero.
+ * Always zero.
*/
uint32_t reserved GNUNET_PACKED;
};
+/**
+ * Message for Transport-to-Transport Flow control. Specifies the size
+ * of the flow control window, including how much we believe to have
+ * consumed (at transmission time), how much we believe to be allowed
+ * (at transmission time), and how much the other peer is allowed to
+ * send to us, and how much data we already received from the other
+ * peer.
+ */
+struct TransportFlowControlMessage
+{
+ /**
+ * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Sequence number of the flow control message. Incremented by one
+ * for each message. Starts at zero when a virtual link goes up.
+ * Used to detect one-sided connection drops. On wrap-around, the
+ * flow control counters will be reset as if the connection had
+ * dropped.
+ */
+ uint32_t seq GNUNET_PACKED;
+
+ /**
+ * Flow control window size in bytes, in NBO.
+ * The receiver can send this many bytes at most.
+ */
+ uint64_t inbound_window_size GNUNET_PACKED;
+
+ /**
+ * How many bytes has the sender sent that count for flow control at
+ * this time. Used to allow the receiver to estimate the packet
+ * loss rate.
+ */
+ uint64_t outbound_sent GNUNET_PACKED;
+
+ /**
+ * Latest flow control window size we learned from the other peer,
+ * in bytes, in NBO. We are limited to sending at most this many
+ * bytes to the other peer. May help the other peer detect when
+ * flow control messages were lost and should thus be retransmitted.
+ * In particular, if the delta to @e outbound_sent is too small,
+ * this signals that we are stalled.
+ */
+ uint64_t outbound_window_size GNUNET_PACKED;
+
+ /**
+ * Timestamp of the sender. Must be monotonically increasing!
+ * Used to enable receiver to ignore out-of-order packets in
+ * combination with the @e seq. Note that @e seq will go down
+ * (back to zero) whenever either side believes the connection
+ * was dropped, allowing the peers to detect that they need to
+ * reset the counters for the number of bytes sent!
+ */
+ struct GNUNET_TIME_AbsoluteNBO sender_time;
+};
+
+
GNUNET_NETWORK_STRUCT_END
*/
struct LearnLaunchEntry
{
-
/**
* Kept (also) in a DLL sorted by launch time.
*/
};
-/**
- * Entry in our cache of ephemeral keys we currently use. This way, we only
- * sign an ephemeral once per @e target, and then can re-use it over multiple
- * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION messages (as
- * signing is expensive and in some cases we may use backchannel messages a
- * lot).
- */
-struct EphemeralCacheEntry
-{
-
- /**
- * Target's peer identity (we don't re-use ephemerals
- * to limit linkability of messages).
- */
- struct GNUNET_PeerIdentity target;
-
- /**
- * Signature affirming @e ephemeral_key of type
- * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
- */
- struct GNUNET_CRYPTO_EddsaSignature sender_sig;
-
- /**
- * How long is @e sender_sig valid
- */
- struct GNUNET_TIME_Absolute ephemeral_validity;
-
- /**
- * What time was @e sender_sig created
- */
- struct GNUNET_TIME_Absolute monotime;
-
- /**
- * Our ephemeral key.
- */
- struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
-
- /**
- * Our private ephemeral key.
- */
- struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
-
- /**
- * Node in the ephemeral cache for this entry.
- * Used for expiration.
- */
- struct GNUNET_CONTAINER_HeapNode *hn;
-};
-
-
/**
* Information we keep per #GOODPUT_AGING_SLOTS about historic
* (or current) transmission performance.
*/
struct DistanceVectorHop;
+/**
+ * A virtual link is another reachable peer that is known to CORE. It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`. With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink;
+
/**
* Context from #handle_incoming_msg(). Closure for many
*/
struct CommunicatorMessageContext
{
-
/**
* Kept in a DLL of `struct VirtualLink` if waiting for CORE
* flow control to unchoke.
};
+/**
+ * Closure for #core_env_sent_cb.
+ */
+struct CoreSentContext
+{
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *next;
+
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *prev;
+
+ /**
+ * Virtual link this is about.
+ */
+ struct VirtualLink *vl;
+
+ /**
+ * How big was the message.
+ */
+ uint16_t size;
+
+ /**
+ * By how much should we increment @e vl's
+ * incoming_fc_window_size_used once we are done sending to CORE?
+ * Use to ensure we do not increment twice if there is more than one
+ * CORE client.
+ */
+ uint16_t isize;
+};
+
+
/**
* A virtual link is another reachable peer that is known to CORE. It
* can be either a `struct Neighbour` with at least one confirmed
*/
struct CommunicatorMessageContext *cmc_tail;
+ /**
+ * Head of list of messages pending for this VL.
+ */
+ struct PendingMessage *pending_msg_head;
+
+ /**
+ * Tail of list of messages pending for this VL.
+ */
+ struct PendingMessage *pending_msg_tail;
+
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *csc_tail;
+
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *csc_head;
+
/**
* Task scheduled to possibly notfiy core that this peer is no
* longer counting as confirmed. Runs the #core_visibility_check(),
*/
struct GNUNET_SCHEDULER_Task *visibility_task;
+ /**
+ * Task scheduled to periodically retransmit FC messages (in
+ * case one got lost).
+ */
+ struct GNUNET_SCHEDULER_Task *fc_retransmit_task;
+
/**
* Neighbour used by this virtual link, NULL if @e dv is used.
*/
struct DistanceVector *dv;
/**
- * How many more messages can we send to core before we exhaust
+ * Sender timestamp of @e n_challenge, used to generate out-of-order
+ * challenges (as sender's timestamps must be monotonically
+ * increasing). FIXME: where do we need this?
+ */
+ struct GNUNET_TIME_Absolute n_challenge_time;
+
+ /**
+ * When did we last send a
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message?
+ * Used to determine whether it is time to re-transmit the message.
+ */
+ struct GNUNET_TIME_Absolute last_fc_transmission;
+
+ /**
+ * Sender timestamp of the last
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
+ * received. Note that we do not persist this monotonic time as we
+ * do not really have to worry about ancient flow control window
+ * sizes after restarts.
+ */
+ struct GNUNET_TIME_Absolute last_fc_timestamp;
+
+ /**
+ * Expected RTT from the last FC transmission. (Zero if the last
+ * attempt failed, but could theoretically be zero even on success.)
+ */
+ struct GNUNET_TIME_Relative last_fc_rtt;
+
+ /**
+ * Used to generate unique UUIDs for messages that are being
+ * fragmented.
+ */
+ uint64_t message_uuid_ctr;
+
+ /**
+ * Memory allocated for this virtual link. Expresses how much RAM
+ * we are willing to allocate to this virtual link. OPTIMIZE-ME:
+ * Can be adapted to dedicate more RAM to links that need it, while
+ * sticking to some overall RAM limit. For now, set to
+ * #DEFAULT_WINDOW_SIZE.
+ */
+ uint64_t available_fc_window_size;
+
+ /**
+ * Memory actually used to buffer packets on this virtual link.
+ * Expresses how much RAM we are currently using for virtual link.
+ * Note that once CORE is done with a packet, we decrement the value
+ * here.
+ */
+ uint64_t incoming_fc_window_size_ram;
+
+ /**
+ * Last flow control window size we provided to the other peer, in
+ * bytes. We are allowing the other peer to send this
+ * many bytes.
+ */
+ uint64_t incoming_fc_window_size;
+
+ /**
+ * How much of the window did the other peer successfully use (and
+ * we already passed it on to CORE)? Must be below @e
+ * incoming_fc_window_size. We should effectively signal the
+ * other peer that the window is this much bigger at the next
+ * opportunity / challenge.
+ */
+ uint64_t incoming_fc_window_size_used;
+
+ /**
+ * What is our current estimate on the message loss rate for the sender?
+ * Based on the difference between how much the sender sent according
+ * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message
+ * (@e outbound_sent field) and how much we actually received at that
+ * time (@e incoming_fc_window_size_used). This delta is then
+ * added onto the @e incoming_fc_window_size when determining the
+ * @e outbound_window_size we send to the other peer. Initially zero.
+ * May be negative if we (due to out-of-order delivery) actually received
+ * more than the sender claims to have sent in its last FC message.
+ */
+ int64_t incoming_fc_window_size_loss;
+
+ /**
+ * Our current flow control window size in bytes. We
+ * are allowed to transmit this many bytes to @a n.
+ */
+ uint64_t outbound_fc_window_size;
+
+ /**
+ * How much of our current flow control window size have we
+ * used (in bytes). Must be below
+ * @e outbound_fc_window_size.
+ */
+ uint64_t outbound_fc_window_size_used;
+
+ /**
+ * What is the most recent FC window the other peer sent us
+ * in `outbound_window_size`? This is basically the window
+ * size value the other peer has definitively received from
+ * us. If it matches @e incoming_fc_window_size, we should
+ * not send a FC message to increase the FC window. However,
+ * we may still send an FC message to notify the other peer
+ * that we received the other peer's FC message.
+ */
+ uint64_t last_outbound_window_size_received;
+
+ /**
+ * Generator for the sequence numbers of
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL messages we send.
+ */
+ uint32_t fc_seq_gen;
+
+ /**
+ * Last sequence number of a
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
+ * received.
+ */
+ uint32_t last_fc_seq;
+
+ /**
+ * How many more messages can we send to CORE before we exhaust
* the receive window of CORE for this peer? If this hits zero,
* we must tell communicators to stop providing us more messages
* for this peer. In fact, the window can go negative as we
* have multiple communicators, so per communicator we can go
- * down by one into the negative range.
+ * down by one into the negative range. Furthermore, we count
+ * delivery per CORE client, so if we had multiple cores, that
+ * might also cause a negative window size here (as one message
+ * would decrement the window by one per CORE client).
*/
int core_recv_window;
};
*/
struct PendingAcknowledgement
{
-
/**
* If @e pm is non-NULL, this is the DLL in which this acknowledgement
* is kept in relation to its pending message.
*/
struct DistanceVectorHop
{
-
/**
* Kept in a MDLL, sorted by @e timeout.
*/
*/
struct DistanceVectorHop *prev_neighbour;
- /**
- * Head of MDLL of messages routed via this path.
- */
- struct PendingMessage *pending_msg_head;
-
- /**
- * Tail of MDLL of messages routed via this path.
- */
- struct PendingMessage *pending_msg_tail;
-
/**
* Head of DLL of PAs that used our @a path.
*/
*/
struct DistanceVector
{
-
/**
* To which peer is this a route?
*/
* Do we have a confirmed working queue and are thus visible to
* CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct VirtualLink *link;
-};
-
+ struct VirtualLink *vl;
-/**
- * Entry identifying transmission in one of our `struct
- * Queue` which still awaits an ACK. This is used to
+ /**
+ * Signature affirming @e ephemeral_key of type
+ * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
+ */
+ struct GNUNET_CRYPTO_EddsaSignature sender_sig;
+
+ /**
+ * How long is @e sender_sig valid
+ */
+ struct GNUNET_TIME_Absolute ephemeral_validity;
+
+ /**
+ * What time was @e sender_sig created
+ */
+ struct GNUNET_TIME_Absolute monotime;
+
+ /**
+ * Our ephemeral key.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+ /**
+ * Our private ephemeral key.
+ */
+ struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
+};
+
+
+/**
+ * Entry identifying transmission in one of our `struct
+ * Queue` which still awaits an ACK. This is used to
* ensure we do not overwhelm a communicator and limit the number of
* messages outstanding per communicator (say in case communicator is
* CPU bound) and per queue (in case bandwidth allocation exceeds
*/
struct QueueEntry
{
-
/**
* Kept as a DLL.
*/
* Connection status for this queue.
*/
enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+ /**
+ * Set to #GNUNET_YES if this queue is idle waiting for some
+ * virtual link to give it a pending message.
+ */
+ int idle;
};
*/
struct ReassemblyContext
{
-
/**
* Original message ID for of the message that all the fragments
* belong to.
*/
struct Neighbour
{
-
/**
* Which peer is this about?
*/
*/
struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
- /**
- * Head of list of messages pending for this neighbour.
- */
- struct PendingMessage *pending_msg_head;
-
- /**
- * Tail of list of messages pending for this neighbour.
- */
- struct PendingMessage *pending_msg_tail;
-
/**
* Head of MDLL of DV hops that have this neighbour as next hop. Must be
* purged if this neighbour goes down.
* Do we have a confirmed working queue and are thus visible to
* CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct VirtualLink *link;
+ struct VirtualLink *vl;
/**
* Latest DVLearn monotonic time seen from this peer. Initialized only
*/
struct GNUNET_TIME_Absolute last_dv_learn_monotime;
- /**
- * Used to generate unique UUIDs for messages that are being
- * fragmented.
- */
- uint64_t message_uuid_ctr;
-
/**
* Do we have the lastest value for @e last_dv_learn_monotime from
* PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
};
+/**
+ * Another peer attempted to talk to us, we should try to establish
+ * a connection in the other direction.
+ */
+struct IncomingRequest
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct IncomingRequest *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct IncomingRequest *prev;
+
+ /**
+ * Handle for watching the peerstore for HELLOs for this peer.
+ */
+ struct GNUNET_PEERSTORE_WatchContext *wc;
+
+ /**
+ * Which peer is this about?
+ */
+ struct GNUNET_PeerIdentity pid;
+};
+
+
/**
* A peer that an application (client) would like us to talk to directly.
*/
struct PeerRequest
{
-
/**
* Which peer is this about?
*/
*/
enum PendingMessageType
{
-
/**
* Ordinary message received from the CORE service.
*/
PMT_RELIABILITY_BOX = 2,
/**
- * Any type of acknowledgement.
- */
- PMT_ACKNOWLEDGEMENT = 3,
-
- /**
- * Control traffic generated by the TRANSPORT service itself.
+ * Pending message created during #forward_dv_box().
*/
- PMT_CONTROL = 4
-
+ PMT_DV_BOX = 3
};
struct PendingMessage
{
/**
- * Kept in a MDLL of messages for this @a target.
+ * Kept in a MDLL of messages for this @a vl.
*/
- struct PendingMessage *next_neighbour;
+ struct PendingMessage *next_vl;
/**
- * Kept in a MDLL of messages for this @a target.
+ * Kept in a MDLL of messages for this @a vl.
*/
- struct PendingMessage *prev_neighbour;
+ struct PendingMessage *prev_vl;
/**
* Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
*/
struct PendingMessage *prev_frag;
- /**
- * Kept in a MDLL of messages using this @a dvh (if @e dvh is
- * non-NULL).
- */
- struct PendingMessage *next_dvh;
-
- /**
- * Kept in a MDLL of messages using this @a dvh (if @e dvh is
- * non-NULL).
- */
- struct PendingMessage *prev_dvh;
-
/**
* Head of DLL of PAs for this pending message.
*/
struct PendingAcknowledgement *pa_tail;
/**
- * This message, reliability boxed. Only possibly available if @e pmt is
- * #PMT_CORE.
+ * This message, reliability *or* DV-boxed. Only possibly available
+ * if @e pmt is #PMT_CORE.
*/
struct PendingMessage *bpm;
/**
- * Target of the request (for transmission, may not be ultimate
- * destination!).
- */
- struct Neighbour *target;
-
- /**
- * Distance vector path selected for this message, or
- * NULL if transmitted directly.
+ * Target of the request (always the ultimate destination!).
*/
- struct DistanceVectorHop *dvh;
+ struct VirtualLink *vl;
/**
* Set to non-NULL value if this message is currently being given to a
*/
struct MessageUUIDP msg_uuid;
+ /**
+ * UUID we use to identify this message in our logs.
+ * Generated by incrementing the "logging_uuid_gen".
+ */
+ unsigned long long logging_uuid;
+
/**
* Type of the pending message.
*/
*/
struct AddressListEntry
{
-
/**
* Kept in a DLL.
*/
*/
struct TransportClient
{
-
/**
* Kept in a DLL.
*/
union
{
-
/**
* Information for @e type #CT_CORE.
*/
struct
{
-
/**
* Head of list of messages pending for this client, sorted by
* transmission time ("next_attempt" + possibly internal prioritization).
* Tail of list of messages pending for this client.
*/
struct PendingMessage *pending_msg_tail;
-
} core;
/**
*/
struct
{
-
/**
* Peer identity to monitor the addresses of.
* Zero to monitor all neighbours. Valid if
* Is this a one-shot monitor?
*/
int one_shot;
-
} monitor;
* Characteristics of this communicator.
*/
enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
-
} communicator;
/**
*/
struct
{
-
/**
* Map of requests for peers the given client application would like to
* see connections for. Maps from PIDs to `struct PeerRequest`.
*/
struct GNUNET_CONTAINER_MultiPeerMap *requests;
-
} application;
-
} details;
};
*/
struct ValidationState
{
-
/**
* For which peer is @a address to be validated (or possibly valid)?
* Serves as key in the #validation_map.
*/
struct GNUNET_PEERSTORE_StoreContext *sc;
+ /**
+ * Self-imposed limit on the previous flow control window. (May be zero,
+ * if we never used data from the previous window or are establishing the
+ * connection for the first time).
+ */
+ uint32_t last_window_consum_limit;
+
/**
* We are technically ready to send the challenge, but we are waiting for
* the respective queue to become available for transmission.
* Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
* a `struct PendingAcknowledgement`.
*/
-static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+static struct GNUNET_CONTAINER_MultiUuidmap *pending_acks;
/**
* Map from PIDs to `struct DistanceVector` entries describing
*/
static struct GNUNET_PEERSTORE_Handle *peerstore;
-/**
- * Heap sorting `struct EphemeralCacheEntry` by their
- * key/signature validity.
- */
-static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
-
-/**
- * Hash map for looking up `struct EphemeralCacheEntry`s
- * by peer identity. (We may have ephemerals in our
- * cache for which we do not have a neighbour entry,
- * and similar many neighbours may not need ephemerals,
- * so we use a second map.)
- */
-static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
-
-/**
- * Task to free expired ephemerals.
- */
-static struct GNUNET_SCHEDULER_Task *ephemeral_task;
-
/**
* Task run to initiate DV learning.
*/
*/
static struct PendingAcknowledgement *pa_tail;
+/**
+ * List of incomming connections where we are trying
+ * to get a connection back established. Length
+ * kept in #ir_total.
+ */
+static struct IncomingRequest *ir_head;
+
+/**
+ * Tail of DLL starting at #ir_head.
+ */
+static struct IncomingRequest *ir_tail;
+
+/**
+ * Length of the DLL starting at #ir_head.
+ */
+static unsigned int ir_total;
+
+/**
+ * Generator of `logging_uuid` in `struct PendingMessage`.
+ */
+static unsigned long long logging_uuid_gen;
+
/**
* Number of entries in the #pa_head/#pa_tail DLL. Used to
* limit the size of the data structure.
}
+/**
+ * Release @a ir data structure.
+ *
+ * @param ir data structure to release
+ */
+static void
+free_incoming_request (struct IncomingRequest *ir)
+{
+ GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
+ GNUNET_assert (ir_total > 0);
+ ir_total--;
+ GNUNET_PEERSTORE_watch_cancel (ir->wc);
+ GNUNET_free (ir);
+}
+
+
/**
* Release @a pa data structure.
*
pa->queue = NULL;
}
GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multishortmap_remove (pending_acks,
- &pa->ack_uuid.value,
- pa));
+ GNUNET_CONTAINER_multiuuidmap_remove (pending_acks,
+ &pa->ack_uuid.value,
+ pa));
GNUNET_free (pa);
}
/**
- * Free cached ephemeral key.
+ * Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
+ *
+ * @param root root of the tree to free
+ */
+static void
+free_fragment_tree (struct PendingMessage *root)
+{
+ struct PendingMessage *frag;
+
+ while (NULL != (frag = root->head_frag))
+ {
+ struct PendingAcknowledgement *pa;
+
+ free_fragment_tree (frag);
+ while (NULL != (pa = frag->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+ pa->pm = NULL;
+ }
+ GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
+ GNUNET_free (frag);
+ }
+}
+
+
+/**
+ * Release memory associated with @a pm and remove @a pm from associated
+ * data structures. @a pm must be a top-level pending message and not
+ * a fragment in the tree. The entire tree is freed (if applicable).
*
- * @param ece cached signature to free
+ * @param pm the pending message to free
*/
static void
-free_ephemeral (struct EphemeralCacheEntry *ece)
+free_pending_message (struct PendingMessage *pm)
{
- GNUNET_CONTAINER_multipeermap_remove (ephemeral_map, &ece->target, ece);
- GNUNET_CONTAINER_heap_remove_node (ece->hn);
- GNUNET_free (ece);
+ struct TransportClient *tc = pm->client;
+ struct VirtualLink *vl = pm->vl;
+ struct PendingAcknowledgement *pa;
+
+ if (NULL != tc)
+ {
+ GNUNET_CONTAINER_MDLL_remove (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ }
+ if (NULL != vl)
+ {
+ GNUNET_CONTAINER_MDLL_remove (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pm);
+ }
+ while (NULL != (pa = pm->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+ pa->pm = NULL;
+ }
+
+ free_fragment_tree (pm);
+ if (NULL != pm->qe)
+ {
+ GNUNET_assert (pm == pm->qe->pm);
+ pm->qe->pm = NULL;
+ }
+ if (NULL != pm->bpm)
+ {
+ free_fragment_tree (pm->bpm);
+ GNUNET_free (pm->bpm);
+ }
+ GNUNET_free (pm);
}
static void
free_virtual_link (struct VirtualLink *vl)
{
- GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+ struct PendingMessage *pm;
+ struct CoreSentContext *csc;
+
+ while (NULL != (pm = vl->pending_msg_head))
+ free_pending_message (pm);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl));
if (NULL != vl->visibility_task)
{
GNUNET_SCHEDULER_cancel (vl->visibility_task);
vl->visibility_task = NULL;
}
+ if (NULL != vl->fc_retransmit_task)
+ {
+ GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ vl->fc_retransmit_task = NULL;
+ }
+ while (NULL != (csc = vl->csc_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
+ GNUNET_assert (vl == csc->vl);
+ csc->vl = NULL;
+ }
GNUNET_break (NULL == vl->n);
GNUNET_break (NULL == vl->dv);
GNUNET_free (vl);
static void
free_validation_state (struct ValidationState *vs)
{
- GNUNET_CONTAINER_multipeermap_remove (validation_map, &vs->pid, vs);
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (validation_map, &vs->pid, vs));
GNUNET_CONTAINER_heap_remove_node (vs->hn);
vs->hn = NULL;
if (NULL != vs->sc)
/**
- * Lookup neighbour record for peer @a pid.
+ * Lookup neighbour for peer @a pid.
*
* @param pid neighbour to look for
* @return NULL if we do not have this peer as a neighbour
}
+/**
+ * Lookup virtual link for peer @a pid.
+ *
+ * @param pid virtual link to look for
+ * @return NULL if we do not have this peer as a virtual link
+ */
+static struct VirtualLink *
+lookup_virtual_link (const struct GNUNET_PeerIdentity *pid)
+{
+ return GNUNET_CONTAINER_multipeermap_get (links, pid);
+}
+
+
/**
* Details about what to notify monitors about.
*/
struct Neighbour *n = dvh->next_hop;
struct DistanceVector *dv = dvh->dv;
struct PendingAcknowledgement *pa;
- struct PendingMessage *pm;
- while (NULL != (pm = dvh->pending_msg_head))
- {
- GNUNET_CONTAINER_MDLL_remove (dvh,
- dvh->pending_msg_head,
- dvh->pending_msg_tail,
- pm);
- pm->dvh = NULL;
- }
while (NULL != (pa = dvh->pa_head))
{
GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
/**
- * Free entry in #dv_routes. First frees all hops to the target, and
- * if there are no entries left, frees @a dv as well.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
*
- * @param dv route to free
+ * @param cls a `struct VirtualLink`
*/
static void
-free_dv_route (struct DistanceVector *dv)
-{
- struct DistanceVectorHop *dvh;
-
- while (NULL != (dvh = dv->dv_head))
- free_distance_vector_hop (dvh);
- if (NULL == dv->dv_head)
- {
- GNUNET_assert (
- GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
- if (NULL != dv->timeout_task)
- GNUNET_SCHEDULER_cancel (dv->timeout_task);
- GNUNET_free (dv);
- }
-}
+check_link_down (void *cls);
/**
- * Notify monitor @a tc about an event. That @a tc
- * cares about the event has already been checked.
- *
- * Send @a tc information in @a me about a @a peer's status with
- * respect to some @a address to all monitors that care.
+ * Send message to CORE clients that we lost a connection.
*
- * @param tc monitor to inform
- * @param peer peer the information is about
- * @param address address the information is about
- * @param nt network type associated with @a address
+ * @param pid peer the connection was for
+ */
+static void
+cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Informing CORE clients about disconnect from %s\n",
+ GNUNET_i2s (pid));
+ for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+ {
+ struct GNUNET_MQ_Envelope *env;
+ struct DisconnectInfoMessage *dim;
+
+ if (CT_CORE != tc->type)
+ continue;
+ env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+ dim->peer = *pid;
+ GNUNET_MQ_send (tc->mq, env);
+ }
+}
+
+
+/**
+ * Free entry in #dv_routes. First frees all hops to the target, and
+ * if there are no entries left, frees @a dv as well.
+ *
+ * @param dv route to free
+ */
+static void
+free_dv_route (struct DistanceVector *dv)
+{
+ struct DistanceVectorHop *dvh;
+
+ while (NULL != (dvh = dv->dv_head))
+ free_distance_vector_hop (dvh);
+ if (NULL == dv->dv_head)
+ {
+ struct VirtualLink *vl;
+
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
+ if (NULL != (vl = dv->vl))
+ {
+ GNUNET_assert (dv == vl->dv);
+ vl->dv = NULL;
+ if (NULL == vl->n)
+ {
+ cores_send_disconnect_info (&dv->target);
+ free_virtual_link (vl);
+ }
+ else
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+ }
+ dv->vl = NULL;
+ }
+
+ if (NULL != dv->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (dv->timeout_task);
+ dv->timeout_task = NULL;
+ }
+ GNUNET_free (dv);
+ }
+}
+
+
+/**
+ * Notify monitor @a tc about an event. That @a tc
+ * cares about the event has already been checked.
+ *
+ * Send @a tc information in @a me about a @a peer's status with
+ * respect to some @a address to all monitors that care.
+ *
+ * @param tc monitor to inform
+ * @param peer peer the information is about
+ * @param address address the information is about
+ * @param nt network type associated with @a address
* @param me detailed information to transmit
*/
static void
while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
{
if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)
- .rel_value_us)
+ .rel_value_us)
{
free_reassembly_context (rc);
continue;
free_neighbour (struct Neighbour *neighbour)
{
struct DistanceVectorHop *dvh;
+ struct VirtualLink *vl;
GNUNET_assert (NULL == neighbour->queue_head);
GNUNET_assert (GNUNET_YES ==
GNUNET_PEERSTORE_store_cancel (neighbour->sc);
neighbour->sc = NULL;
}
+ if (NULL != (vl = neighbour->vl))
+ {
+ GNUNET_assert (neighbour == vl->n);
+ vl->n = NULL;
+ if (NULL == vl->dv)
+ {
+ cores_send_disconnect_info (&vl->target);
+ free_virtual_link (vl);
+ }
+ else
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+ }
+ neighbour->vl = NULL;
+ }
GNUNET_free (neighbour);
}
}
-/**
- * Send message to CORE clients that we lost a connection.
- *
- * @param pid peer the connection was for
- */
-static void
-cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Informing CORE clients about disconnect from %s\n",
- GNUNET_i2s (pid));
- for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
- {
- struct GNUNET_MQ_Envelope *env;
- struct DisconnectInfoMessage *dim;
-
- if (CT_CORE != tc->type)
- continue;
- env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
- dim->peer = *pid;
- GNUNET_MQ_send (tc->mq, env);
- }
-}
-
-
/**
* We believe we are ready to transmit a message on a queue. Gives the
* message to the communicator for transmission (updating the tracker,
/**
- * Schedule next run of #transmit_on_queue(). Does NOTHING if
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards! This function must only
- * be called if the message queue is non-empty!
+ * Called whenever something changed that might effect when we
+ * try to do the next transmission on @a queue using #transmit_on_queue().
*
* @param queue the queue to do scheduling for
- * @param inside_job set to #GNUNET_YES if called from
- * #transmit_on_queue() itself and NOT setting
- * the task means running immediately
+ * @param p task priority to use, if @a queue is scheduled
*/
static void
-schedule_transmit_on_queue (struct Queue *queue, int inside_job)
+schedule_transmit_on_queue (struct Queue *queue,
+ enum GNUNET_SCHEDULER_Priority p)
{
- struct Neighbour *n = queue->neighbour;
- struct PendingMessage *pm = n->pending_msg_head;
- struct GNUNET_TIME_Relative out_delay;
-
- GNUNET_assert (NULL != pm);
if (queue->tc->details.communicator.total_queue_length >=
COMMUNICATOR_TOTAL_QUEUE_LIMIT)
{
"# Transmission throttled due to communicator queue limit",
1,
GNUNET_NO);
+ queue->idle = GNUNET_NO;
return;
}
if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
"# Transmission throttled due to queue queue limit",
1,
GNUNET_NO);
+ queue->idle = GNUNET_NO;
return;
}
-
- out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt);
- if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
- {
- GNUNET_log (
- GNUNET_ERROR_TYPE_DEBUG,
- "Schedule transmission on queue %llu of %s decides to run immediately\n",
- (unsigned long long) queue->qid,
- GNUNET_i2s (&n->pid));
- return; /* we should run immediately! */
- }
- /* queue has changed since we were scheduled, reschedule again */
+ /* queue might indeed be ready, schedule it */
+ if (NULL != queue->transmit_task)
+ GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task =
- GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
- if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Next transmission on queue `%s' in %s (high delay)\n",
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Next transmission on queue `%s' in %s\n",
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
+ GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Considering transmission on queue `%s' to %s\n",
+ queue->address,
+ GNUNET_i2s (&queue->neighbour->pid));
}
pos = pos->next_dv)
dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ {
+ vl->dv->vl = NULL;
vl->dv = NULL;
+ }
q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
- if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+ {
+ vl->n->vl = NULL;
vl->n = NULL;
+ }
if ((NULL == vl->n) && (NULL == vl->dv))
{
- cores_send_disconnect_info (&dv->target);
+ cores_send_disconnect_info (&vl->target);
free_virtual_link (vl);
return;
}
{
struct Neighbour *neighbour = queue->neighbour;
struct TransportClient *tc = queue->tc;
- struct MonitorEvent me = {.cs = GNUNET_TRANSPORT_CS_DOWN,
- .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
+ struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
+ .rtt = GNUNET_TIME_UNIT_FOREVER_REL };
struct QueueEntry *qe;
int maxxed;
struct PendingAcknowledgement *pa;
if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
tc->details.communicator.total_queue_length))
{
- /* Communicator dropped below threshold, resume all queues */
+ /* Communicator dropped below threshold, resume all _other_ queues */
GNUNET_STATISTICS_update (
GST_stats,
"# Transmission throttled due to communicator queue limit",
GNUNET_NO);
for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
s = s->next_client)
- schedule_transmit_on_queue (s, GNUNET_NO);
+ schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
GNUNET_free (queue);
- vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+ vl = lookup_virtual_link (&neighbour->pid);
if ((NULL != vl) && (neighbour == vl->n))
{
GNUNET_SCHEDULER_cancel (vl->visibility_task);
{
case CT_NONE:
break;
+
case CT_CORE: {
- struct PendingMessage *pm;
+ struct PendingMessage *pm;
- while (NULL != (pm = tc->details.core.pending_msg_head))
- {
- GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
- pm->client = NULL;
+ while (NULL != (pm = tc->details.core.pending_msg_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ pm->client = NULL;
+ }
}
- }
- break;
+ break;
+
case CT_MONITOR:
break;
+
case CT_COMMUNICATOR: {
- struct Queue *q;
- struct AddressListEntry *ale;
+ struct Queue *q;
+ struct AddressListEntry *ale;
+
+ while (NULL != (q = tc->details.communicator.queue_head))
+ free_queue (q);
+ while (NULL != (ale = tc->details.communicator.addr_head))
+ free_address_list_entry (ale);
+ GNUNET_free (tc->details.communicator.address_prefix);
+ }
+ break;
- while (NULL != (q = tc->details.communicator.queue_head))
- free_queue (q);
- while (NULL != (ale = tc->details.communicator.addr_head))
- free_address_list_entry (ale);
- GNUNET_free (tc->details.communicator.address_prefix);
- }
- break;
case CT_APPLICATION:
GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
&stop_peer_request,
GNUNET_break (0);
return GNUNET_SYSERR;
}
- size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
- if (size < sizeof (struct GNUNET_MessageHeader))
+ size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
+ if (size < sizeof(struct GNUNET_MessageHeader))
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
-/**
- * Free fragment tree below @e root, excluding @e root itself.
- * FIXME: this does NOT seem to have the intended semantics
- * based on how this is called. Seems we generally DO expect
- * @a root to be free'ed itself as well!
- *
- * @param root root of the tree to free
- */
-static void
-free_fragment_tree (struct PendingMessage *root)
-{
- struct PendingMessage *frag;
-
- while (NULL != (frag = root->head_frag))
- {
- struct PendingAcknowledgement *pa;
-
- free_fragment_tree (frag);
- while (NULL != (pa = frag->pa_head))
- {
- GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
- pa->pm = NULL;
- }
- GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
- GNUNET_free (frag);
- }
-}
-
-
-/**
- * Release memory associated with @a pm and remove @a pm from associated
- * data structures. @a pm must be a top-level pending message and not
- * a fragment in the tree. The entire tree is freed (if applicable).
- *
- * @param pm the pending message to free
- */
-static void
-free_pending_message (struct PendingMessage *pm)
-{
- struct TransportClient *tc = pm->client;
- struct Neighbour *target = pm->target;
- struct DistanceVectorHop *dvh = pm->dvh;
- struct PendingAcknowledgement *pa;
-
- if (NULL != tc)
- {
- GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
- }
- if (NULL != dvh)
- {
- GNUNET_CONTAINER_MDLL_remove (dvh,
- dvh->pending_msg_head,
- dvh->pending_msg_tail,
- pm);
- }
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- target->pending_msg_head,
- target->pending_msg_tail,
- pm);
- while (NULL != (pa = pm->pa_head))
- {
- GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
- pa->pm = NULL;
- }
-
- free_fragment_tree (pm);
- if (NULL != pm->qe)
- {
- GNUNET_assert (pm == pm->qe->pm);
- pm->qe->pm = NULL;
- }
- GNUNET_free_non_null (pm->bpm);
- GNUNET_free (pm);
-}
-
-
/**
* Send a response to the @a pm that we have processed a "send"
* request. Sends a confirmation to the "core" client responsible for
client_send_response (struct PendingMessage *pm)
{
struct TransportClient *tc = pm->client;
- struct Neighbour *target = pm->target;
- struct GNUNET_MQ_Envelope *env;
- struct SendOkMessage *som;
+ struct VirtualLink *vl = pm->vl;
if (NULL != tc)
{
+ struct GNUNET_MQ_Envelope *env;
+ struct SendOkMessage *som;
+
env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->peer = target->pid;
+ som->peer = vl->target;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Confirming transmission to %s\n",
- GNUNET_i2s (&pm->target->pid));
+ "Confirming transmission of <%llu> to %s\n",
+ pm->logging_uuid,
+ GNUNET_i2s (&vl->target));
GNUNET_MQ_send (tc->mq, env);
}
free_pending_message (pm);
}
-/**
- * Create a DV Box message.
- *
- * @param total_hops how many hops did the message take so far
- * @param num_hops length of the @a hops array
- * @param origin origin of the message
- * @param hops next peer(s) to the destination, including destination
- * @param payload payload of the box
- * @param payload_size number of bytes in @a payload
- * @return boxed message (caller must #GNUNET_free() it).
- */
-static struct TransportDVBoxMessage *
-create_dv_box (uint16_t total_hops,
- const struct GNUNET_PeerIdentity *origin,
- const struct GNUNET_PeerIdentity *target,
- uint16_t num_hops,
- const struct GNUNET_PeerIdentity *hops,
- const void *payload,
- uint16_t payload_size)
-{
- struct TransportDVBoxMessage *dvb;
- struct GNUNET_PeerIdentity *dhops;
-
- GNUNET_assert (UINT16_MAX <
- sizeof (struct TransportDVBoxMessage) +
- sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
- payload_size);
- dvb = GNUNET_malloc (sizeof (struct TransportDVBoxMessage) +
- sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
- payload_size);
- dvb->header.size =
- htons (sizeof (struct TransportDVBoxMessage) +
- sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) + payload_size);
- dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
- dvb->total_hops = htons (total_hops);
- dvb->num_hops = htons (num_hops + 1);
- dvb->origin = *origin;
- dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
- memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
- dhops[num_hops] = *target;
- memcpy (&dhops[num_hops + 1], payload, payload_size);
-
- if (GNUNET_EXTRA_LOGGING > 0)
- {
- char *path;
-
- path = GNUNET_strdup (GNUNET_i2s (&dvb->origin));
- for (unsigned int i = 0; i <= num_hops; i++)
- {
- char *tmp;
-
- GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
- GNUNET_free (path);
- path = tmp;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating DVBox for %u bytes of payload via %s\n",
- (unsigned int) payload_size,
- path);
- GNUNET_free (path);
- }
-
- return dvb;
-}
-
-
/**
* Pick @a hops_array_length random DV paths satisfying @a options
*
{
if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
(GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
- .rel_value_us == 0))
- continue; /* pos unconfirmed and confirmed required */
+ .rel_value_us == 0))
+ continue; /* pos unconfirmed and confirmed required */
num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
dv_count++;
}
if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
(GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
- .rel_value_us == 0))
- continue; /* pos unconfirmed and confirmed required */
+ .rel_value_us == 0))
+ continue; /* pos unconfirmed and confirmed required */
for (unsigned int i = 0; i < hops_array_length; i++)
if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
hops_array[dv_count++] = pos;
/**
- * Client asked for transmission to a peer. Process the request.
+ * Communicator started. Test message is well-formed.
*
* @param cls the client
- * @param obm the send message that was sent
+ * @param cam the send message that was sent
*/
-static void
-handle_client_send (void *cls, const struct OutboundMessage *obm)
+static int
+check_communicator_available (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
{
struct TransportClient *tc = cls;
- struct PendingMessage *pm;
- const struct GNUNET_MessageHeader *obmm;
- struct Neighbour *target;
- struct DistanceVector *dv;
- struct DistanceVectorHop *dvh;
- uint32_t bytes_msg;
- int was_empty;
- const void *payload;
- size_t payload_size;
- struct TransportDVBoxMessage *dvb;
- struct VirtualLink *vl;
- enum GNUNET_MQ_PriorityPreferences pp;
-
- GNUNET_assert (CT_CORE == tc->type);
- obmm = (const struct GNUNET_MessageHeader *) &obm[1];
- bytes_msg = ntohs (obmm->size);
- pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
- vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
- if (NULL == vl)
- {
- /* Failure: don't have this peer as a neighbour (anymore).
- Might have gone down asynchronously, so this is NOT
- a protocol violation by CORE. Still count the event,
- as this should be rare. */
- GNUNET_SERVICE_client_continue (tc->client);
- GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (neighbour unknown)",
- 1,
- GNUNET_NO);
- return;
- }
- target = lookup_neighbour (&obm->peer);
- if (NULL == target)
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
- else
- dv = NULL;
- GNUNET_assert ((NULL != target) || (NULL != dv));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending %u bytes to %s using %s\n",
- bytes_msg,
- GNUNET_i2s (&obm->peer),
- (NULL == target) ? "distance vector path" : "direct queue");
- if (NULL == target)
- {
- unsigned int res;
- struct DistanceVectorHop *dvh;
-
- res = pick_random_dv_hops (dv, RMO_NONE, &dvh, 1);
- GNUNET_assert (1 == res);
- target = dvh->next_hop;
- dvb = create_dv_box (0,
- &GST_my_identity,
- &obm->peer,
- dvh->distance,
- dvh->path,
- &obm[1],
- bytes_msg);
- payload = dvb;
- payload_size = ntohs (dvb->header.size);
- }
- else
- {
- dvh = NULL;
- dvb = NULL;
- payload = &obm[1];
- payload_size = bytes_msg;
- }
-
- was_empty = (NULL == target->pending_msg_head);
- pm = GNUNET_malloc (sizeof (struct PendingMessage) + payload_size);
- pm->prefs = pp;
- pm->client = tc;
- pm->target = target;
- pm->bytes_msg = payload_size;
- memcpy (&pm[1], payload, payload_size);
- GNUNET_free_non_null (dvb);
- dvb = NULL;
- pm->dvh = dvh;
- if (NULL != dvh)
- {
- GNUNET_CONTAINER_MDLL_insert (dvh,
- dvh->pending_msg_head,
- dvh->pending_msg_tail,
- pm);
- }
- GNUNET_CONTAINER_MDLL_insert (neighbour,
- target->pending_msg_head,
- target->pending_msg_tail,
- pm);
- GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
- if (! was_empty)
- return; /* all queues must already be busy */
- for (struct Queue *queue = target->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- {
- /* try transmission on any queue that is idle */
- if (NULL == queue->transmit_task)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queue %llu to %s is idle, triggering transmission\n",
- (unsigned long long) queue->qid,
- GNUNET_i2s (&queue->neighbour->pid));
- queue->transmit_task =
- GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
- }
- }
-}
-
-
-/**
- * Communicator started. Test message is well-formed.
- *
- * @param cls the client
- * @param cam the send message that was sent
- */
-static int
-check_communicator_available (
- void *cls,
- const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
-{
- struct TransportClient *tc = cls;
- uint16_t size;
+ uint16_t size;
if (CT_NONE != tc->type)
{
return GNUNET_SYSERR;
}
tc->type = CT_COMMUNICATOR;
- size = ntohs (cam->header.size) - sizeof (*cam);
+ size = ntohs (cam->header.size) - sizeof(*cam);
if (0 == size)
return GNUNET_OK; /* receive-only communicator */
GNUNET_MQ_check_zero_termination (cam);
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+ vl = lookup_virtual_link (&rom->peer);
if (NULL == vl)
{
GNUNET_STATISTICS_update (GST_stats,
struct TransportClient *tc = cls;
uint16_t size;
- size = ntohs (cam->header.size) - sizeof (*cam);
+ size = ntohs (cam->header.size) - sizeof(*cam);
if (0 == size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Receive-only communicator connected\n");
- return; /* receive-only communicator */
+ return; /* receive-only communicator */
}
tc->details.communicator.address_prefix =
GNUNET_strdup ((const char *) &cam[1]);
uint16_t isize;
(void) cls;
- msize = ntohs (cb->header.size) - sizeof (*cb);
- if (((size_t) (UINT16_MAX - msize)) >
- sizeof (struct TransportBackchannelEncapsulationMessage) +
- sizeof (struct TransportBackchannelRequestPayloadP))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
+ msize = ntohs (cb->header.size) - sizeof(*cb);
inbox = (const struct GNUNET_MessageHeader *) &cb[1];
isize = ntohs (inbox->size);
if (isize >= msize)
is = (const char *) inbox;
is += isize;
msize -= isize;
- GNUNET_assert (msize > 0);
+ GNUNET_assert (0 < msize);
if ('\0' != is[msize - 1])
{
GNUNET_break (0);
/**
- * Remove memory used by expired ephemeral keys.
+ * Ensure ephemeral keys in our @a dv are current. If no current one exists,
+ * set it up.
*
- * @param cls NULL
- */
-static void
-expire_ephemerals (void *cls)
-{
- struct EphemeralCacheEntry *ece;
-
- (void) cls;
- ephemeral_task = NULL;
- while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
- {
- if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
- .rel_value_us)
- {
- free_ephemeral (ece);
- continue;
- }
- ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
- &expire_ephemerals,
- NULL);
- return;
- }
-}
-
-
-/**
- * Lookup ephemeral key in our #ephemeral_map. If no valid one exists,
- * generate one, cache it and return it.
- *
- * @param pid peer to look up ephemeral for
- * @param private_key[out] set to the private key
- * @param ephemeral_key[out] set to the key
- * @param ephemeral_sender_sig[out] set to the signature
- * @param monotime[out] set to the monotime used for the signature
+ * @param dv[in,out] virtual link to update ephemeral for
*/
static void
-lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
- struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
- struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
- struct GNUNET_TIME_Absolute *monotime)
+update_ephemeral (struct DistanceVector *dv)
{
- struct EphemeralCacheEntry *ece;
struct EphemeralConfirmationPS ec;
- ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map, pid);
- if ((NULL != ece) &&
- (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity)
- .rel_value_us))
- {
- free_ephemeral (ece);
- ece = NULL;
- }
- if (NULL == ece)
- {
- ece = GNUNET_new (struct EphemeralCacheEntry);
- ece->target = *pid;
- ece->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
- ece->ephemeral_validity =
- GNUNET_TIME_absolute_add (ece->monotime, EPHEMERAL_VALIDITY);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
- GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, &ece->ephemeral_key);
- ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
- ec.purpose.size = htonl (sizeof (ec));
- ec.target = *pid;
- ec.ephemeral_key = ece->ephemeral_key;
- GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
- &ec.purpose,
- &ece->sender_sig));
- ece->hn =
- GNUNET_CONTAINER_heap_insert (ephemeral_heap,
- ece,
- ece->ephemeral_validity.abs_value_us);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (
- ephemeral_map,
- &ece->target,
- ece,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- if (NULL == ephemeral_task)
- ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
- &expire_ephemerals,
- NULL);
- }
- *private_key = ece->private_key;
- *ephemeral_key = ece->ephemeral_key;
- *ephemeral_sender_sig = ece->sender_sig;
- *monotime = ece->monotime;
+ if (0 !=
+ GNUNET_TIME_absolute_get_remaining (dv->ephemeral_validity).rel_value_us)
+ return;
+ dv->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+ dv->ephemeral_validity =
+ GNUNET_TIME_absolute_add (dv->monotime, EPHEMERAL_VALIDITY);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CRYPTO_ecdhe_key_create2 (&dv->private_key));
+ GNUNET_CRYPTO_ecdhe_key_get_public (&dv->private_key, &dv->ephemeral_key);
+ ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+ ec.purpose.size = htonl (sizeof(ec));
+ ec.target = dv->target;
+ ec.ephemeral_key = dv->ephemeral_key;
+ GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+ &ec,
+ &dv->sender_sig);
}
/**
- * Send the control message @a payload on @a queue.
+ * Send the message @a payload on @a queue.
*
* @param queue the queue to use for transmission
* @param pm pending message to update once transmission is done, may be NULL!
struct GNUNET_TRANSPORT_SendMessageTo *smt;
struct GNUNET_MQ_Envelope *env;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing %u bytes of payload for transmission on queue %llu to %s\n",
- (unsigned int) payload_size,
- (unsigned long long) queue->qid,
- GNUNET_i2s (&queue->neighbour->pid));
+ queue->idle = GNUNET_NO;
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
+ (unsigned int) payload_size,
+ (NULL == pm) ? 0 : pm->logging_uuid,
+ (unsigned long long) queue->qid,
+ GNUNET_i2s (&queue->neighbour->pid));
env = GNUNET_MQ_msg_extra (smt,
payload_size,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
queue->queue_length++;
queue->tc->details.communicator.total_queue_length++;
+ if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
+ queue->tc->details.communicator.total_queue_length)
+ queue->idle = GNUNET_NO;
+ if (QUEUE_LENGTH_LIMIT == queue->queue_length)
+ queue->idle = GNUNET_NO;
GNUNET_MQ_send (queue->tc->mq, env);
}
}
* @param hdr message to send as payload
* @param options whether queues must be confirmed or not,
* and whether we may pick multiple (2) queues
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
*/
-static void
+static struct GNUNET_TIME_Relative
route_via_neighbour (const struct Neighbour *n,
const struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
unsigned int candidates;
unsigned int sel1;
unsigned int sel2;
+ struct GNUNET_TIME_Relative rtt;
/* Pick one or two 'random' queues from n (under constraints of options) */
now = GNUNET_TIME_absolute_get ();
"# route selection failed (all no valid queue)",
1,
GNUNET_NO);
- return;
+ return GNUNET_TIME_UNIT_FOREVER_REL;
}
+
+ rtt = GNUNET_TIME_UNIT_FOREVER_REL;
sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
if (0 == (options & RMO_REDUNDANT))
sel2 = candidates; /* picks none! */
GNUNET_i2s (&n->pid),
pos->address,
(sel1 == candidates) ? 1 : 2);
+ rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
}
candidates++;
}
}
-}
-
-
-/**
- * Given a distance vector path @a dvh route @a payload to
- * the ultimate destination respecting @a options.
- * Sets up the boxed message and queues it at the next hop.
- *
- * @param dvh choice of the path for the message
- * @param payload body to transmit
- * @param options options to use for control
- */
-static void
-forward_via_dvh (const struct DistanceVectorHop *dvh,
- const struct GNUNET_MessageHeader *payload,
- enum RouteMessageOptions options)
-{
- struct TransportDVBoxMessage *dvb;
-
- dvb = create_dv_box (0,
- &GST_my_identity,
- &dvh->dv->target,
- dvh->distance,
- dvh->path,
- payload,
- ntohs (payload->size));
- route_via_neighbour (dvh->next_hop, &dvb->header, options);
- GNUNET_free (dvb);
-}
-
-
-/**
- * Pick a path of @a dv under constraints @a options and schedule
- * transmission of @a hdr.
- *
- * @param n neighbour to send to
- * @param hdr message to send as payload
- * @param options whether path must be confirmed or not
- * and whether we may pick multiple (2) paths
- */
-static void
-route_via_dv (const struct DistanceVector *dv,
- const struct GNUNET_MessageHeader *hdr,
- enum RouteMessageOptions options)
-{
- struct DistanceVectorHop *hops[2];
- unsigned int res;
-
- res = pick_random_dv_hops (dv,
- options,
- hops,
- (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
- for (unsigned int i = 0; i < res; i++)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Routing message of type %u to %s using DV (#%u/%u)\n",
- ntohs (hdr->type),
- GNUNET_i2s (&dv->target),
- i + 1,
- res + 1);
- forward_via_dvh (hops[i], hdr, options & (~RMO_REDUNDANT));
- }
-}
-
-
-/**
- * We need to transmit @a hdr to @a target. If necessary, this may
- * involve DV routing.
- *
- * @param target peer to receive @a hdr
- * @param hdr header of the message to route and #GNUNET_free()
- * @param options which transmission channels are allowed
- */
-static void
-route_message (const struct GNUNET_PeerIdentity *target,
- struct GNUNET_MessageHeader *hdr,
- enum RouteMessageOptions options)
-{
- struct VirtualLink *vl;
- struct Neighbour *n;
- struct DistanceVector *dv;
-
- vl = GNUNET_CONTAINER_multipeermap_get (links, target);
- n = vl->n;
- dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
- if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
- {
- /* if confirmed is required, and we do not have anything
- confirmed, drop respective options */
- if (NULL == n)
- n = lookup_neighbour (target);
- if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
- }
- if ((NULL == n) && (NULL == dv))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Cannot route message of type %u to %s: no route\n",
- ntohs (hdr->type),
- GNUNET_i2s (target));
- GNUNET_STATISTICS_update (GST_stats,
- "# Messages dropped in routing: no acceptable method",
- 1,
- GNUNET_NO);
- GNUNET_free (hdr);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Routing message of type %u to %s with options %X\n",
- ntohs (hdr->type),
- GNUNET_i2s (target),
- (unsigned int) options);
- /* If both dv and n are possible and we must choose:
- flip a coin for the choice between the two; for now 50/50 */
- if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
- {
- if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 2))
- n = NULL;
- else
- dv = NULL;
- }
- if ((NULL != n) && (NULL != dv))
- options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
- enough for redunancy, so clear the flag. */
- if (NULL != n)
- {
- route_via_neighbour (n, hdr, options);
- }
- if (NULL != dv)
- {
- route_via_dv (dv, hdr, options);
- }
- GNUNET_free (hdr);
+ return rtt;
}
/**
* Structure of the key material used to encrypt backchannel messages.
*/
-struct BackchannelKeyState
+struct DVKeyState
{
/**
* State of our block cipher.
*/
struct
{
-
/**
* Key used for HMAC calculations (via #GNUNET_CRYPTO_hmac()).
*/
* Counter value to use during setup.
*/
char aes_ctr[128 / 8];
-
} material;
};
* @param key[out] symmetric cipher and HMAC state to generate
*/
static void
-bc_setup_key_state_from_km (const struct GNUNET_HashCode *km,
+dv_setup_key_state_from_km (const struct GNUNET_HashCode *km,
const struct GNUNET_ShortHashCode *iv,
- struct BackchannelKeyState *key)
+ struct DVKeyState *key)
{
/* must match #dh_key_derive_eph_pub */
GNUNET_assert (GNUNET_YES ==
GNUNET_CRYPTO_kdf (&key->material,
- sizeof (key->material),
+ sizeof(key->material),
"transport-backchannel-key",
strlen ("transport-backchannel-key"),
&km,
- sizeof (km),
+ sizeof(km),
iv,
- sizeof (*iv)));
+ sizeof(*iv)));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Deriving backchannel key based on KM %s and IV %s\n",
GNUNET_h2s (km),
0 /* flags */);
gcry_cipher_setkey (key->cipher,
&key->material.aes_key,
- sizeof (key->material.aes_key));
+ sizeof(key->material.aes_key));
gcry_cipher_setctr (key->cipher,
&key->material.aes_ctr,
- sizeof (key->material.aes_ctr));
+ sizeof(key->material.aes_ctr));
}
const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
const struct GNUNET_PeerIdentity *target,
const struct GNUNET_ShortHashCode *iv,
- struct BackchannelKeyState *key)
+ struct DVKeyState *key)
{
struct GNUNET_HashCode km;
GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_ecdh_eddsa (priv_ephemeral,
&target->public_key,
&km));
- bc_setup_key_state_from_km (&km, iv, key);
+ dv_setup_key_state_from_km (&km, iv, key);
}
static void
dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
const struct GNUNET_ShortHashCode *iv,
- struct BackchannelKeyState *key)
+ struct DVKeyState *key)
{
struct GNUNET_HashCode km;
GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_eddsa_ecdh (GST_my_private_key,
pub_ephemeral,
&km));
- bc_setup_key_state_from_km (&km, iv, key);
+ dv_setup_key_state_from_km (&km, iv, key);
}
* @param data_size number of bytes in @a data
*/
static void
-bc_hmac (const struct BackchannelKeyState *key,
+dv_hmac (const struct DVKeyState *key,
struct GNUNET_HashCode *hmac,
const void *data,
size_t data_size)
* @param in_size number of bytes of input in @a in and available at @a dst
*/
static void
-bc_encrypt (struct BackchannelKeyState *key,
- const void *in,
- void *dst,
- size_t in_size)
+dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
{
GNUNET_assert (0 ==
gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
* @param out_size number of bytes of input in @a ciph and available in @a out
*/
static void
-bc_decrypt (struct BackchannelKeyState *key,
+dv_decrypt (struct DVKeyState *key,
void *out,
const void *ciph,
size_t out_size)
* @param key key material to clean up (memory must not be free'd!)
*/
static void
-bc_key_clean (struct BackchannelKeyState *key)
+dv_key_clean (struct DVKeyState *key)
{
gcry_cipher_close (key->cipher);
- GNUNET_CRYPTO_zero_keys (&key->material, sizeof (key->material));
+ GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
}
/**
- * Communicator requests backchannel transmission. Process the request.
+ * Function to call to further operate on the now DV encapsulated
+ * message @a hdr, forwarding it via @a next_hop under respect of
+ * @a options.
*
- * @param cls the client
- * @param cb the send message that was sent
+ * @param cls closure
+ * @param next_hop next hop of the DV path
+ * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
+ * @param options options of the original message
*/
-static void
-handle_communicator_backchannel (
- void *cls,
- const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
-{
- struct TransportClient *tc = cls;
- struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
- struct GNUNET_TIME_Absolute monotime;
- struct TransportBackchannelEncapsulationMessage *enc;
- struct TransportBackchannelRequestPayloadP ppay;
- struct BackchannelKeyState key;
- char *mpos;
- uint16_t msize;
+typedef void (*DVMessageHandler) (void *cls,
+ struct Neighbour *next_hop,
+ const struct GNUNET_MessageHeader *hdr,
+ enum RouteMessageOptions options);
- {
- const struct GNUNET_MessageHeader *inbox;
- const char *is;
+/**
+ * Pick a path of @a dv under constraints @a options and schedule
+ * transmission of @a hdr.
+ *
+ * @param target neighbour to ultimately send to
+ * @param num_dvhs length of the @a dvhs array
+ * @param dvhs array of hops to send the message to
+ * @param hdr message to send as payload
+ * @param use function to call with the encapsulated message
+ * @param use_cls closure for @a use
+ * @param options whether path must be confirmed or not, to be passed to @a use
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
+ */
+static struct GNUNET_TIME_Relative
+encapsulate_for_dv (struct DistanceVector *dv,
+ unsigned int num_dvhs,
+ struct DistanceVectorHop **dvhs,
+ const struct GNUNET_MessageHeader *hdr,
+ DVMessageHandler use,
+ void *use_cls,
+ enum RouteMessageOptions options)
+{
+ struct TransportDVBoxMessage box_hdr;
+ struct TransportDVBoxPayloadP payload_hdr;
+ uint16_t enc_body_size = ntohs (hdr->size);
+ char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
+ struct TransportDVBoxPayloadP *enc_payload_hdr =
+ (struct TransportDVBoxPayloadP *) enc;
+ struct DVKeyState key;
+ struct GNUNET_TIME_Relative rtt;
- inbox = (const struct GNUNET_MessageHeader *) &cb[1];
- /* 0-termination of 'is' was checked already in
- #check_communicator_backchannel() */
- is = (const char *) &cb[1];
- is += ntohs (inbox->size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing backchannel transmission to %s:%s of type %u\n",
- GNUNET_i2s (&cb->pid),
- is,
- ntohs (inbox->size));
- }
- /* encapsulate and encrypt message */
- msize = ntohs (cb->header.size) - sizeof (*cb) +
- sizeof (struct TransportBackchannelRequestPayloadP);
- enc = GNUNET_malloc (sizeof (*enc) + msize);
- enc->header.type =
- htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
- enc->header.size = htons (sizeof (*enc) + msize);
- enc->target = cb->pid;
- lookup_ephemeral (&cb->pid,
- &private_key,
- &enc->ephemeral_key,
- &ppay.sender_sig,
- &monotime);
+ /* Encrypt payload */
+ box_hdr.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
+ box_hdr.total_hops = htons (0);
+ update_ephemeral (dv);
+ box_hdr.ephemeral_key = dv->ephemeral_key;
+ payload_hdr.sender_sig = dv->sender_sig;
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &enc->iv,
- sizeof (enc->iv));
- dh_key_derive_eph_pid (&private_key, &cb->pid, &enc->iv, &key);
- ppay.monotonic_time = GNUNET_TIME_absolute_hton (monotime);
- mpos = (char *) &enc[1];
- bc_encrypt (&key, &ppay, mpos, sizeof (ppay));
- bc_encrypt (&key,
- &cb[1],
- &mpos[sizeof (ppay)],
- ntohs (cb->header.size) - sizeof (*cb));
- bc_hmac (&key,
- &enc->hmac,
- mpos,
- sizeof (ppay) + ntohs (cb->header.size) - sizeof (*cb));
- bc_key_clean (&key);
- route_message (&cb->pid, &enc->header, RMO_DV_ALLOWED);
- GNUNET_SERVICE_client_continue (tc->client);
-}
-
+ &box_hdr.iv,
+ sizeof(box_hdr.iv));
+ dh_key_derive_eph_pid (&dv->private_key, &dv->target, &box_hdr.iv, &key);
+ payload_hdr.sender = GST_my_identity;
+ payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
+ dv_encrypt (&key, &payload_hdr, enc_payload_hdr, sizeof(payload_hdr));
+ dv_encrypt (&key,
+ hdr,
+ &enc[sizeof(struct TransportDVBoxPayloadP)],
+ enc_body_size);
+ dv_hmac (&key, &box_hdr.hmac, enc, sizeof(enc));
+ dv_key_clean (&key);
+ rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+ /* For each selected path, take the pre-computed header and body
+ and add the path in the middle of the message; then send it. */
+ for (unsigned int i = 0; i < num_dvhs; i++)
+ {
+ struct DistanceVectorHop *dvh = dvhs[i];
+ unsigned int num_hops = dvh->distance + 1;
+ char buf[sizeof(struct TransportDVBoxMessage)
+ + sizeof(struct GNUNET_PeerIdentity) * num_hops
+ + sizeof(struct TransportDVBoxPayloadP)
+ + enc_body_size] GNUNET_ALIGN;
+ struct GNUNET_PeerIdentity *dhops;
+
+ box_hdr.header.size = htons (sizeof(buf));
+ box_hdr.num_hops = htons (num_hops);
+ memcpy (buf, &box_hdr, sizeof(box_hdr));
+ dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
+ memcpy (dhops,
+ dvh->path,
+ dvh->distance * sizeof(struct GNUNET_PeerIdentity));
+ dhops[dvh->distance] = dv->target;
+ if (GNUNET_EXTRA_LOGGING > 0)
+ {
+ char *path;
-/**
- * Address of our peer added. Test message is well-formed.
- *
- * @param cls the client
- * @param aam the send message that was sent
- * @return #GNUNET_OK if message is well-formed
- */
-static int
-check_add_address (void *cls,
- const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
-{
- struct TransportClient *tc = cls;
+ path = GNUNET_strdup (GNUNET_i2s (&GST_my_identity));
+ for (unsigned int j = 0; j <= num_hops; j++)
+ {
+ char *tmp;
- if (CT_COMMUNICATOR != tc->type)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
+ GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
+ GNUNET_free (path);
+ path = tmp;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
+ ntohs (hdr->type),
+ GNUNET_i2s (&dv->target),
+ i + 1,
+ num_dvhs + 1,
+ path);
+ GNUNET_free (path);
+ }
+ rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
+ memcpy (&dhops[num_hops], enc, sizeof(enc));
+ use (use_cls,
+ dvh->next_hop,
+ (const struct GNUNET_MessageHeader *) buf,
+ options);
}
- GNUNET_MQ_check_zero_termination (aam);
- return GNUNET_OK;
+ return rtt;
}
/**
- * Ask peerstore to store our address.
+ * Wrapper around #route_via_neighbour() that matches the
+ * #DVMessageHandler structure.
*
- * @param cls an `struct AddressListEntry *`
+ * @param cls unused
+ * @param next_hop where to send next
+ * @param hdr header of the message to send
+ * @param options message options for queue selection
*/
static void
-store_pi (void *cls);
+send_dv_to_neighbour (void *cls,
+ struct Neighbour *next_hop,
+ const struct GNUNET_MessageHeader *hdr,
+ enum RouteMessageOptions options)
+{
+ (void) cls;
+ (void) route_via_neighbour (next_hop, hdr, options);
+}
/**
- * Function called when peerstore is done storing our address.
+ * We need to transmit @a hdr to @a target. If necessary, this may
+ * involve DV routing. This function routes without applying flow
+ * control or congestion control and should only be used for control
+ * traffic.
*
- * @param cls a `struct AddressListEntry`
- * @param success #GNUNET_YES if peerstore was successful
+ * @param target peer to receive @a hdr
+ * @param hdr header of the message to route and #GNUNET_free()
+ * @param options which transmission channels are allowed
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
*/
-static void
-peerstore_store_own_cb (void *cls, int success)
+static struct GNUNET_TIME_Relative
+route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_MessageHeader *hdr,
+ enum RouteMessageOptions options)
{
- struct AddressListEntry *ale = cls;
-
+ struct VirtualLink *vl;
+ struct Neighbour *n;
+ struct DistanceVector *dv;
+ struct GNUNET_TIME_Relative rtt1;
+ struct GNUNET_TIME_Relative rtt2;
+
+ vl = lookup_virtual_link (target);
+ GNUNET_assert (NULL != vl);
+ n = vl->n;
+ dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
+ if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
+ {
+ /* if confirmed is required, and we do not have anything
+ confirmed, drop respective options */
+ if (NULL == n)
+ n = lookup_neighbour (target);
+ if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
+ }
+ if ((NULL == n) && (NULL == dv))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Cannot route message of type %u to %s: no route\n",
+ ntohs (hdr->type),
+ GNUNET_i2s (target));
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Messages dropped in routing: no acceptable method",
+ 1,
+ GNUNET_NO);
+ return GNUNET_TIME_UNIT_FOREVER_REL;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Routing message of type %u to %s with options %X\n",
+ ntohs (hdr->type),
+ GNUNET_i2s (target),
+ (unsigned int) options);
+ /* If both dv and n are possible and we must choose:
+ flip a coin for the choice between the two; for now 50/50 */
+ if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
+ {
+ if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 2))
+ n = NULL;
+ else
+ dv = NULL;
+ }
+ if ((NULL != n) && (NULL != dv))
+ options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
+ enough for redunancy, so clear the flag. */
+ rtt1 = GNUNET_TIME_UNIT_FOREVER_REL;
+ rtt2 = GNUNET_TIME_UNIT_FOREVER_REL;
+ if (NULL != n)
+ {
+ rtt1 = route_via_neighbour (n, hdr, options);
+ }
+ if (NULL != dv)
+ {
+ struct DistanceVectorHop *hops[2];
+ unsigned int res;
+
+ res = pick_random_dv_hops (dv,
+ options,
+ hops,
+ (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
+ if (0 == res)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to route message, could not determine DV path\n");
+ return rtt1;
+ }
+ rtt2 = encapsulate_for_dv (dv,
+ res,
+ hops,
+ hdr,
+ &send_dv_to_neighbour,
+ NULL,
+ options & (~RMO_REDUNDANT));
+ }
+ return GNUNET_TIME_relative_min (rtt1, rtt2);
+}
+
+
+/**
+ * Something changed on the virtual link with respect to flow
+ * control. Consider retransmitting the FC window size.
+ *
+ * @param cls a `struct VirtualLink` to work with
+ */
+static void
+consider_sending_fc (void *cls)
+{
+ struct VirtualLink *vl = cls;
+ struct GNUNET_TIME_Absolute monotime;
+ struct TransportFlowControlMessage fc;
+ struct GNUNET_TIME_Relative duration;
+ struct GNUNET_TIME_Relative rtt;
+
+ duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission);
+ /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
+ it always! */
+ /* For example, we should probably ONLY do this if a bit more than
+ an RTT has passed, or if the window changed "significantly" since
+ then. See vl->last_fc_rtt! NOTE: to do this properly, we also
+ need an estimate for the bandwidth-delay-product for the entire
+ VL, as that determines "significantly". We have the delay, but
+ the bandwidth statistics need to be added for the VL!*/(void) duration;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending FC seq %u to %s with new window %llu\n",
+ (unsigned int) vl->fc_seq_gen,
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) vl->incoming_fc_window_size);
+ monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+ vl->last_fc_transmission = monotime;
+ fc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL);
+ fc.header.size = htons (sizeof(fc));
+ fc.seq = htonl (vl->fc_seq_gen++);
+ fc.inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size);
+ fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
+ fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
+ fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
+ rtt = route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE);
+ if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
+ {
+ rtt = GNUNET_TIME_UNIT_SECONDS;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "FC retransmission to %s failed, will retry in %s\n",
+ GNUNET_i2s (&vl->target),
+ GNUNET_STRINGS_relative_time_to_string (rtt, GNUNET_YES));
+ vl->last_fc_rtt = GNUNET_TIME_UNIT_ZERO;
+ }
+ else
+ {
+ /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
+ vl->last_fc_rtt = rtt;
+ }
+ if (NULL != vl->fc_retransmit_task)
+ GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ vl->fc_retransmit_task =
+ GNUNET_SCHEDULER_add_delayed (rtt, &consider_sending_fc, vl);
+}
+
+
+/**
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
+ *
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
+ */
+static void
+check_vl_transmission (struct VirtualLink *vl)
+{
+ struct Neighbour *n = vl->n;
+ struct DistanceVector *dv = vl->dv;
+ struct GNUNET_TIME_Absolute now;
+ int elig;
+
+ /* Check that we have an eligible pending message!
+ (cheaper than having #transmit_on_queue() find out!) */
+ elig = GNUNET_NO;
+ for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+ pm = pm->next_vl)
+ {
+ if (NULL != pm->qe)
+ continue; /* not eligible, is in a queue! */
+ if (pm->bytes_msg + vl->outbound_fc_window_size_used >
+ vl->outbound_fc_window_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Stalled transmision on VL %s due to flow control: %llu < %llu\n",
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) vl->outbound_fc_window_size,
+ (unsigned long long) (pm->bytes_msg
+ + vl->outbound_fc_window_size_used));
+ consider_sending_fc (vl);
+ return; /* We have a message, but flow control says "nope" */
+ }
+ elig = GNUNET_YES;
+ break;
+ }
+ if (GNUNET_NO == elig)
+ return;
+
+ /* Notify queues at direct neighbours that we are interested */
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL != n)
+ {
+ for (struct Queue *queue = n->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ }
+ /* Notify queues via DV that we are interested */
+ if (NULL != dv)
+ {
+ /* Do DV with lower scheduler priority, which effectively means that
+ IF a neighbour exists and is available, we prefer it. */
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ struct Neighbour *nh = pos->next_hop;
+
+ if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+ continue; /* skip this one: path not validated */
+ for (struct Queue *queue = nh->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ schedule_transmit_on_queue (queue,
+ GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+ }
+ }
+}
+
+
+/**
+ * Client asked for transmission to a peer. Process the request.
+ *
+ * @param cls the client
+ * @param obm the send message that was sent
+ */
+static void
+handle_client_send (void *cls, const struct OutboundMessage *obm)
+{
+ struct TransportClient *tc = cls;
+ struct PendingMessage *pm;
+ const struct GNUNET_MessageHeader *obmm;
+ uint32_t bytes_msg;
+ struct VirtualLink *vl;
+ enum GNUNET_MQ_PriorityPreferences pp;
+
+ GNUNET_assert (CT_CORE == tc->type);
+ obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+ bytes_msg = ntohs (obmm->size);
+ pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
+ vl = lookup_virtual_link (&obm->peer);
+ if (NULL == vl)
+ {
+ /* Failure: don't have this peer as a neighbour (anymore).
+ Might have gone down asynchronously, so this is NOT
+ a protocol violation by CORE. Still count the event,
+ as this should be rare. */
+ GNUNET_SERVICE_client_continue (tc->client);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# messages dropped (neighbour unknown)",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+
+ pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
+ pm->logging_uuid = logging_uuid_gen++;
+ pm->prefs = pp;
+ pm->client = tc;
+ pm->vl = vl;
+ pm->bytes_msg = bytes_msg;
+ memcpy (&pm[1], obmm, bytes_msg);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u bytes as <%llu> to %s\n",
+ bytes_msg,
+ pm->logging_uuid,
+ GNUNET_i2s (&obm->peer));
+ GNUNET_CONTAINER_MDLL_insert (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ GNUNET_CONTAINER_MDLL_insert (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pm);
+ check_vl_transmission (vl);
+}
+
+
+/**
+ * Communicator requests backchannel transmission. Process the request.
+ * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *`
+ * (which for now has exactly the same format, only a different message type)
+ * and passes it on for routing.
+ *
+ * @param cls the client
+ * @param cb the send message that was sent
+ */
+static void
+handle_communicator_backchannel (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+{
+ struct TransportClient *tc = cls;
+ const struct GNUNET_MessageHeader *inbox =
+ (const struct GNUNET_MessageHeader *) &cb[1];
+ uint16_t isize = ntohs (inbox->size);
+ const char *is = ((const char *) &cb[1]) + isize;
+ char
+ mbuf[isize
+ + sizeof(struct
+ TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN;
+ struct TransportBackchannelEncapsulationMessage *be =
+ (struct TransportBackchannelEncapsulationMessage *) mbuf;
+
+ /* 0-termination of 'is' was checked already in
+ #check_communicator_backchannel() */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing backchannel transmission to %s:%s of type %u\n",
+ GNUNET_i2s (&cb->pid),
+ is,
+ ntohs (inbox->size));
+ /* encapsulate and encrypt message */
+ be->header.type =
+ htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
+ be->header.size = htons (sizeof(mbuf));
+ memcpy (&be[1], inbox, isize);
+ memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
+ + isize],
+ is,
+ strlen (is) + 1);
+ route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
+/**
+ * Address of our peer added. Test message is well-formed.
+ *
+ * @param cls the client
+ * @param aam the send message that was sent
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_add_address (void *cls,
+ const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
+{
+ struct TransportClient *tc = cls;
+
+ if (CT_COMMUNICATOR != tc->type)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_MQ_check_zero_termination (aam);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Ask peerstore to store our address.
+ *
+ * @param cls an `struct AddressListEntry *`
+ */
+static void
+store_pi (void *cls);
+
+
+/**
+ * Function called when peerstore is done storing our address.
+ *
+ * @param cls a `struct AddressListEntry`
+ * @param success #GNUNET_YES if peerstore was successful
+ */
+static void
+peerstore_store_own_cb (void *cls, int success)
+{
+ struct AddressListEntry *ale = cls;
+
ale->sc = NULL;
if (GNUNET_YES != success)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Communicator added address `%s'!\n",
(const char *) &aam[1]);
- slen = ntohs (aam->header.size) - sizeof (*aam);
- ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
+ slen = ntohs (aam->header.size) - sizeof(*aam);
+ ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
ale->tc = tc;
ale->address = (const char *) &ale[1];
ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
{
struct TransportClient *tc = cls;
+ struct AddressListEntry *alen;
if (CT_COMMUNICATOR != tc->type)
{
}
for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
NULL != ale;
- ale = ale->next)
+ ale = alen)
{
+ alen = ale->next;
if (dam->aid != ale->aid)
continue;
GNUNET_assert (ale->tc == tc);
const struct GNUNET_MessageHeader *msg);
+/**
+ * Function called when we are done giving a message of a certain
+ * size to CORE and should thus decrement the number of bytes of
+ * RAM reserved for that peer's MQ.
+ *
+ * @param cls a `struct CoreSentContext`
+ */
+static void
+core_env_sent_cb (void *cls)
+{
+ struct CoreSentContext *ctx = cls;
+ struct VirtualLink *vl = ctx->vl;
+
+ if (NULL == vl)
+ {
+ /* lost the link in the meantime, ignore */
+ GNUNET_free (ctx);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx);
+ GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
+ vl->incoming_fc_window_size_ram -= ctx->size;
+ vl->incoming_fc_window_size_used += ctx->isize;
+ consider_sending_fc (vl);
+ GNUNET_free (ctx);
+}
+
+
/**
* Communicator gave us an unencapsulated message to pass as-is to
* CORE. Process the request.
uint16_t size = ntohs (mh->size);
int have_core;
- if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
- (size < sizeof (struct GNUNET_MessageHeader)))
+ if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
+ (size < sizeof(struct GNUNET_MessageHeader)))
{
struct GNUNET_SERVICE_Client *client = cmc->tc->client;
GNUNET_SERVICE_client_drop (client);
return;
}
- vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+ vl = lookup_virtual_link (&cmc->im.sender);
if (NULL == vl)
{
/* FIXME: sender is giving us messages for CORE but we don't have
we pass this on, CORE would be confused (link down, messages
arrive). We should investigate more if this happens often,
or in a persistent manner, and possibly do "something" about
- it. Thus logging as error for now. */
- GNUNET_break_op (0);
+ it. Thus logging as error for now. */GNUNET_break_op (0);
GNUNET_STATISTICS_update (GST_stats,
"# CORE messages droped (virtual link still down)",
1,
finish_cmc_handling (cmc);
return;
}
+ if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (FC arithmetic overflow)",
+ 1,
+ GNUNET_NO);
+
+ finish_cmc_handling (cmc);
+ return;
+ }
+ if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (FC window overflow)",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
+ }
+
/* Forward to all CORE clients */
have_core = GNUNET_NO;
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
struct GNUNET_MQ_Envelope *env;
struct InboundMessage *im;
+ struct CoreSentContext *ctx;
if (CT_CORE != tc->type)
continue;
- have_core = GNUNET_YES;
+ vl->incoming_fc_window_size_ram += size;
env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ ctx = GNUNET_new (struct CoreSentContext);
+ ctx->vl = vl;
+ ctx->size = size;
+ ctx->isize = (GNUNET_NO == have_core) ? size : 0;
+ have_core = GNUNET_YES;
+ GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx);
+ GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx);
im->peer = cmc->im.sender;
memcpy (&im[1], mh, size);
GNUNET_MQ_send (tc->mq, env);
+ vl->core_recv_window--;
}
- vl->core_recv_window--;
if (GNUNET_NO == have_core)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Dropped message to CORE: no CORE client connected!\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivered message from %s of type %u to CORE\n",
- GNUNET_i2s (&cmc->im.sender),
- ntohs (mh->type));
- if (vl->core_recv_window > 0)
- {
+ /* Nevertheless, count window as used, as it is from the
+ perspective of the other peer! */
+ vl->incoming_fc_window_size_used += size;
+ /* TODO-M1 */
+ finish_cmc_handling (cmc);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delivered message from %s of type %u to CORE\n",
+ GNUNET_i2s (&cmc->im.sender),
+ ntohs (mh->type));
+ if (vl->core_recv_window > 0)
+ {
finish_cmc_handling (cmc);
return;
}
check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
{
uint16_t size = ntohs (fb->header.size);
- uint16_t bsize = size - sizeof (*fb);
+ uint16_t bsize = size - sizeof(*fb);
(void) cls;
if (0 == bsize)
transmit_cummulative_ack_cb (void *cls)
{
struct AcknowledgementCummulator *ac = cls;
- struct TransportReliabilityAckMessage *ack;
+ char buf[sizeof(struct TransportReliabilityAckMessage)
+ + ac->ack_counter
+ * sizeof(struct TransportCummulativeAckPayloadP)] GNUNET_ALIGN;
+ struct TransportReliabilityAckMessage *ack =
+ (struct TransportReliabilityAckMessage *) buf;
struct TransportCummulativeAckPayloadP *ap;
ac->task = NULL;
ac->ack_counter,
GNUNET_i2s (&ac->target));
GNUNET_assert (0 < ac->ack_counter);
- ack = GNUNET_malloc (sizeof (*ack) +
- ac->ack_counter *
- sizeof (struct TransportCummulativeAckPayloadP));
ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
ack->header.size =
- htons (sizeof (*ack) +
- ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+ htons (sizeof(*ack)
+ + ac->ack_counter * sizeof(struct TransportCummulativeAckPayloadP));
ack->ack_counter = htonl (ac->ack_counter++);
ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
for (unsigned int i = 0; i < ac->ack_counter; i++)
ap[i].ack_delay = GNUNET_TIME_relative_hton (
GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
}
- route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+ route_control_message_without_fc (&ac->target, &ack->header, RMO_DV_ALLOWED);
ac->num_acks = 0;
ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
&destroy_ack_cummulator,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling ACK %s for transmission to %s\n",
- GNUNET_sh2s (&ack_uuid->value),
+ GNUNET_uuid2s (&ack_uuid->value),
GNUNET_i2s (pid));
ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
if (NULL == ac)
msize = ntohs (fb->msg_size);
fc.message_uuid = fb->msg_uuid;
fc.rc = NULL;
- GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
- fb->msg_uuid.uuid,
- &find_by_message_uuid,
- &fc);
+ (void) GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
+ fb->msg_uuid.uuid,
+ &find_by_message_uuid,
+ &fc);
if (NULL == (rc = fc.rc))
{
- rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */
- (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
+ rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
+ + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
rc->msg_uuid = fb->msg_uuid;
rc->neighbour = n;
rc->msg_size = msize;
}
/* reassemble */
- fsize = ntohs (fb->header.size) - sizeof (*fb);
+ fsize = ntohs (fb->header.size) - sizeof(*fb);
if (0 == fsize)
{
GNUNET_break (0);
return;
}
frag_off = ntohs (fb->frag_off);
+ if (frag_off + fsize > msize)
+ {
+ /* Fragment (plus fragment size) exceeds message size! */
+ GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
memcpy (&target[frag_off], &fb[1], fsize);
/* update bitfield and msg_missing */
for (unsigned int i = frag_off; i < frag_off + fsize; i++)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Fragment reassembly complete for message %u\n",
(unsigned int) fb->msg_uuid.uuid);
-
+ /* FIXME: check that the resulting msg is NOT a
+ DV Box or Reliability Box, as that is NOT allowed! */
demultiplex_with_cmc (cmc, msg);
/* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
en-route and we forget that we finished this reassembly immediately!
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received reliability box from %s with UUID %s of type %u\n",
GNUNET_i2s (&cmc->im.sender),
- GNUNET_sh2s (&rb->ack_uuid.value),
+ GNUNET_uuid2s (&rb->ack_uuid.value),
(unsigned int) ntohs (inbox->type));
rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
do not really have an RTT for the
- *incoming* queue (should we have
+ * incoming* queue (should we have
the sender add it to the rb message?) */
cummulative_ack (
&cmc->im.sender,
&rb->ack_uuid,
(0 == ntohl (rb->ack_countdown))
- ? GNUNET_TIME_UNIT_ZERO_ABS
- : GNUNET_TIME_relative_to_absolute (
- GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
+ ? GNUNET_TIME_UNIT_ZERO_ABS
+ : GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
/* continue with inner message */
+ /* FIXME: check that inbox is NOT a DV Box, fragment or another
+ reliability box (not allowed!) */
demultiplex_with_cmc (cmc, inbox);
}
}
+/**
+ * We have completed transmission of @a pm, remove it from
+ * the transmission queues (and if it is a fragment, continue
+ * up the tree as necessary).
+ *
+ * @param pm pending message that was transmitted
+ */
+static void
+completed_pending_message (struct PendingMessage *pm)
+{
+ struct PendingMessage *pos;
+
+ switch (pm->pmt)
+ {
+ case PMT_CORE:
+ case PMT_RELIABILITY_BOX:
+ /* Full message sent, we are done */
+ client_send_response (pm);
+ return;
+
+ case PMT_FRAGMENT_BOX:
+ /* Fragment sent over reliabile channel */
+ free_fragment_tree (pm);
+ pos = pm->frag_parent;
+ GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+ GNUNET_free (pm);
+ /* check if subtree is done */
+ while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
+ (pos != pm))
+ {
+ pm = pos;
+ pos = pm->frag_parent;
+ GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+ GNUNET_free (pm);
+ }
+
+ /* Was this the last applicable fragmment? */
+ if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
+ (pos->frag_off == pos->bytes_msg))
+ client_send_response (pos);
+ return;
+
+ case PMT_DV_BOX:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Completed transmission of message %llu (DV Box)\n",
+ pm->logging_uuid);
+ free_pending_message (pm);
+ return;
+ }
+}
+
+
/**
* The @a pa was acknowledged, process the acknowledgement.
*
handle_acknowledged (struct PendingAcknowledgement *pa,
struct GNUNET_TIME_Relative ack_delay)
{
- struct PendingMessage *pm = pa->pm;
struct GNUNET_TIME_Relative delay;
delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
update_queue_performance (pa->queue, delay, pa->message_size);
if (NULL != pa->dvh)
update_dvh_performance (pa->dvh, delay, pa->message_size);
- if (NULL != pm)
- {
- if (NULL != pm->frag_parent)
- {
- pm = pm->frag_parent;
- free_fragment_tree (pa->pm);
- }
- while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
- {
- struct PendingMessage *parent = pm->frag_parent;
-
- free_fragment_tree (pm);
- pm = parent;
- }
- if (NULL != pm->head_frag)
- pm = NULL; /* we are done, otherwise free 'pm' below */
- }
- if (NULL != pm)
- free_pending_message (pm);
+ if (NULL != pa->pm)
+ completed_pending_message (pa->pm);
free_pending_acknowledgement (pa);
}
unsigned int n_acks;
(void) cls;
- n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
- sizeof (struct TransportCummulativeAckPayloadP);
+ n_acks = (ntohs (ra->header.size) - sizeof(*ra))
+ / sizeof(struct TransportCummulativeAckPayloadP);
if (0 == n_acks)
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- if ((ntohs (ra->header.size) - sizeof (*ra)) !=
- n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+ if ((ntohs (ra->header.size) - sizeof(*ra)) !=
+ n_acks * sizeof(struct TransportCummulativeAckPayloadP))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
{
struct CommunicatorMessageContext *cmc = cls;
const struct TransportCummulativeAckPayloadP *ack;
- struct PendingAcknowledgement *pa;
unsigned int n_acks;
uint32_t ack_counter;
- n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
- sizeof (struct TransportCummulativeAckPayloadP);
+ n_acks = (ntohs (ra->header.size) - sizeof(*ra))
+ / sizeof(struct TransportCummulativeAckPayloadP);
ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
for (unsigned int i = 0; i < n_acks; i++)
{
- pa =
- GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
+ struct PendingAcknowledgement *pa =
+ GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
if (NULL == pa)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Received ACK from %s with UUID %s which is unknown to us!\n",
GNUNET_i2s (&cmc->im.sender),
- GNUNET_sh2s (&ack[i].ack_uuid.value));
+ GNUNET_uuid2s (&ack[i].ack_uuid.value));
GNUNET_STATISTICS_update (
GST_stats,
"# FRAGMENT_ACKS dropped, no matching pending message",
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received ACK from %s with UUID %s\n",
GNUNET_i2s (&cmc->im.sender),
- GNUNET_sh2s (&ack[i].ack_uuid.value));
+ GNUNET_uuid2s (&ack[i].ack_uuid.value));
handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
}
ack_counter = htonl (ra->ack_counter);
- (void) ack_counter; /* silence compiler warning for now */
+ (void) ack_counter; /* silence compiler warning for now */
// FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
// (DV and/or Neighbour?)
finish_cmc_handling (cmc);
void *cls,
const struct TransportBackchannelEncapsulationMessage *be)
{
- uint16_t size = ntohs (be->header.size);
+ uint16_t size = ntohs (be->header.size) - sizeof(*be);
+ const struct GNUNET_MessageHeader *inbox =
+ (const struct GNUNET_MessageHeader *) &be[1];
+ const char *is;
+ uint16_t isize;
(void) cls;
- if ((size - sizeof (*be)) <
- (sizeof (struct TransportBackchannelRequestPayloadP) +
- sizeof (struct GNUNET_MessageHeader)))
+ if (ntohs (inbox->size) >= size)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ isize = ntohs (inbox->size);
+ is = ((const char *) inbox) + isize;
+ size -= isize;
+ if ('\0' != is[size - 1])
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
/**
- * We received the plaintext @a msg from backtalker @a b. Forward
- * it to the respective communicator.
+ * Communicator gave us a backchannel encapsulation. Process the request.
+ * (We are the destination of the backchannel here.)
*
- * @param b a backtalker
- * @param msg a message, consisting of a `struct GNUNET_MessageHeader`
- * followed by the target name of the communicator
- * @param msg_size number of bytes in @a msg
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param be the message that was received
*/
static void
-forward_backchannel_payload (struct Backtalker *b,
- const void *msg,
- size_t msg_size)
+handle_backchannel_encapsulation (
+ void *cls,
+ const struct TransportBackchannelEncapsulationMessage *be)
{
+ struct CommunicatorMessageContext *cmc = cls;
struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
struct GNUNET_MQ_Envelope *env;
struct TransportClient *tc;
- const struct GNUNET_MessageHeader *mh;
- const char *target_communicator;
- uint16_t mhs;
+ const struct GNUNET_MessageHeader *inbox =
+ (const struct GNUNET_MessageHeader *) &be[1];
+ uint16_t isize = ntohs (inbox->size);
+ const char *target_communicator = ((const char *) inbox) + isize;
- /* Determine target_communicator and check @a msg is well-formed */
- mh = msg;
- mhs = ntohs (mh->size);
- if (mhs <= msg_size)
- {
- GNUNET_break_op (0);
- return;
- }
- target_communicator = &((const char *) msg)[ntohs (mh->size)];
- if ('\0' != target_communicator[msg_size - mhs - 1])
- {
- GNUNET_break_op (0);
- return;
- }
/* Find client providing this communicator */
for (tc = clients_head; NULL != tc; tc = tc->next)
if ((CT_COMMUNICATOR == tc->type) &&
/* Finally, deliver backchannel message to communicator */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Delivering backchannel message from %s of type %u to %s\n",
- GNUNET_i2s (&b->pid),
- ntohs (mh->type),
+ GNUNET_i2s (&cmc->im.sender),
+ ntohs (inbox->type),
target_communicator);
env = GNUNET_MQ_msg_extra (
cbi,
- msg_size,
+ isize,
GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
- cbi->pid = b->pid;
- memcpy (&cbi[1], msg, msg_size);
+ cbi->pid = cmc->im.sender;
+ memcpy (&cbi[1], inbox, isize);
GNUNET_MQ_send (tc->mq, env);
}
/**
- * Free data structures associated with @a b.
+ * Task called when we should check if any of the DV paths
+ * we have learned to a target are due for garbage collection.
*
- * @param b data structure to release
+ * Collects stale paths, and possibly frees the entire DV
+ * entry if no paths are left. Otherwise re-schedules itself.
+ *
+ * @param cls a `struct DistanceVector`
*/
static void
-free_backtalker (struct Backtalker *b)
+path_cleanup_cb (void *cls)
{
- if (NULL != b->get)
- {
- GNUNET_PEERSTORE_iterate_cancel (b->get);
- b->get = NULL;
- GNUNET_assert (NULL != b->cmc);
- finish_cmc_handling (b->cmc);
- b->cmc = NULL;
- }
- if (NULL != b->task)
+ struct DistanceVector *dv = cls;
+ struct DistanceVectorHop *pos;
+
+ dv->timeout_task = NULL;
+ while (NULL != (pos = dv->dv_head))
{
- GNUNET_SCHEDULER_cancel (b->task);
- b->task = NULL;
+ GNUNET_assert (dv == pos->dv);
+ if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
+ break;
+ free_distance_vector_hop (pos);
}
- if (NULL != b->sc)
+ if (NULL == pos)
{
- GNUNET_PEERSTORE_store_cancel (b->sc);
- b->sc = NULL;
+ free_dv_route (dv);
+ return;
}
- GNUNET_assert (
- GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
- GNUNET_free (b);
-}
-
-
-/**
- * Callback to free backtalker records.
- *
- * @param cls NULL
- * @param pid unused
- * @param value a `struct Backtalker`
- * @return #GNUNET_OK (always)
- */
-static int
-free_backtalker_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
-{
- struct Backtalker *b = value;
-
- (void) cls;
- (void) pid;
- free_backtalker (b);
- return GNUNET_OK;
+ dv->timeout_task =
+ GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
}
/**
- * Function called when it is time to clean up a backtalker.
+ * The @a hop is a validated path to the respective target
+ * peer and we should tell core about it -- and schedule
+ * a job to revoke the state.
*
- * @param cls a `struct Backtalker`
+ * @param hop a path to some peer that is the reason for activation
*/
static void
-backtalker_timeout_cb (void *cls)
+activate_core_visible_dv_path (struct DistanceVectorHop *hop)
{
- struct Backtalker *b = cls;
+ struct DistanceVector *dv = hop->dv;
+ struct VirtualLink *vl;
- b->task = NULL;
- if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
+ vl = lookup_virtual_link (&dv->target);
+ if (NULL != vl)
{
- b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+ /* Link was already up, remember dv is also now available and we are done */
+ vl->dv = dv;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Virtual link to %s could now also use DV!\n",
+ GNUNET_i2s (&dv->target));
return;
}
- GNUNET_assert (NULL == b->sc);
- free_backtalker (b);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new virtual link to %s using DV!\n",
+ GNUNET_i2s (&dv->target));
+ vl = GNUNET_new (struct VirtualLink);
+ vl->message_uuid_ctr =
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+ vl->target = dv->target;
+ vl->dv = dv;
+ dv->vl = vl;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ consider_sending_fc (vl);
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&dv->target);
}
/**
- * Function called with the monotonic time of a backtalker
- * by PEERSTORE. Updates the time and continues processing.
+ * We have learned a @a path through the network to some other peer, add it to
+ * our DV data structure (returning #GNUNET_YES on success).
*
- * @param cls a `struct Backtalker`
- * @param record the information found, NULL for the last call
- * @param emsg error message
+ * We do not add paths if we have a sufficient number of shorter
+ * paths to this target already (returning #GNUNET_NO).
+ *
+ * We also do not add problematic paths, like those where we lack the first
+ * hop in our neighbour list (i.e. due to a topology change) or where some
+ * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
+ *
+ * @param path the path we learned, path[0] should be us,
+ * and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
+ * @param path_len number of entries on the @a path, at least three!
+ * @param network_latency how long does the message take from us to
+ * `path[path_len-1]`? set to "forever" if unknown
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
+ * @return #GNUNET_YES on success,
+ * #GNUNET_NO if we have better path(s) to the target
+ * #GNUNET_SYSERR if the path is useless and/or invalid
+ * (i.e. path[1] not a direct neighbour
+ * or path[i+1] is a direct neighbour for i>0)
*/
-static void
-backtalker_monotime_cb (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
+static int
+learn_dv_path (const struct GNUNET_PeerIdentity *path,
+ unsigned int path_len,
+ struct GNUNET_TIME_Relative network_latency,
+ struct GNUNET_TIME_Absolute path_valid_until)
{
- struct Backtalker *b = cls;
- struct GNUNET_TIME_AbsoluteNBO *mtbe;
- struct GNUNET_TIME_Absolute mt;
+ struct DistanceVectorHop *hop;
+ struct DistanceVector *dv;
+ struct Neighbour *next_hop;
+ unsigned int shorter_distance;
- (void) emsg;
- if (NULL == record)
+ if (path_len < 3)
{
- /* we're done with #backtalker_monotime_cb() invocations,
- continue normal processing */
- b->get = NULL;
- GNUNET_assert (NULL != b->cmc);
- finish_cmc_handling (b->cmc);
- b->cmc = NULL;
- if (0 != b->body_size)
- forward_backchannel_payload (b, &b[1], b->body_size);
- return;
+ /* what a boring path! not allowed! */
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
}
- if (sizeof (*mtbe) != record->value_size)
+ GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
+ next_hop = lookup_neighbour (&path[1]);
+ if (NULL == next_hop)
{
+ /* next hop must be a neighbour, otherwise this whole thing is useless! */
GNUNET_break (0);
- return;
+ return GNUNET_SYSERR;
}
- mtbe = record->value;
- mt = GNUNET_TIME_absolute_ntoh (*mtbe);
- if (mt.abs_value_us > b->monotonic_time.abs_value_us)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Backtalker message from %s dropped, monotime in the past\n",
- GNUNET_i2s (&b->pid));
- GNUNET_STATISTICS_update (
- GST_stats,
- "# Backchannel messages dropped: monotonic time not increasing",
- 1,
- GNUNET_NO);
- b->monotonic_time = mt;
- /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
- */
- b->body_size = 0;
- return;
+ for (unsigned int i = 2; i < path_len; i++)
+ if (NULL != lookup_neighbour (&path[i]))
+ {
+ /* Useless path: we have a direct connection to some hop
+ in the middle of the path, so this one is not even
+ terribly useful for redundancy */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Path of %u hops useless: directly link to hop %u (%s)\n",
+ path_len,
+ i,
+ GNUNET_i2s (&path[i]));
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Useless DV path ignored: hop is neighbour",
+ 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
+ if (NULL == dv)
+ {
+ dv = GNUNET_new (struct DistanceVector);
+ dv->target = path[path_len - 1];
+ dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
+ &path_cleanup_cb,
+ dv);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (
+ dv_routes,
+ &dv->target,
+ dv,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ }
+ /* Check if we have this path already! */
+ shorter_distance = 0;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ if (pos->distance < path_len - 2)
+ shorter_distance++;
+ /* Note that the distances in 'pos' excludes us (path[0]) and
+ the next_hop (path[1]), so we need to subtract two
+ and check next_hop explicitly */
+ if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop))
+ {
+ int match = GNUNET_YES;
+
+ for (unsigned int i = 0; i < pos->distance; i++)
+ {
+ if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
+ {
+ match = GNUNET_NO;
+ break;
+ }
+ }
+ if (GNUNET_YES == match)
+ {
+ struct GNUNET_TIME_Relative last_timeout;
+
+ /* Re-discovered known path, update timeout */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Known DV path refreshed",
+ 1,
+ GNUNET_NO);
+ last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
+ pos->timeout =
+ GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+ pos->path_valid_until =
+ GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
+ GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
+ GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
+ if (0 <
+ GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
+ activate_core_visible_dv_path (pos);
+ if (last_timeout.rel_value_us <
+ GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
+ DV_PATH_DISCOVERY_FREQUENCY)
+ .rel_value_us)
+ {
+ /* Some peer send DV learn messages too often, we are learning
+ the same path faster than it would be useful; do not forward! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Rediscovered path too quickly, not forwarding further\n");
+ return GNUNET_NO;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Refreshed known path to %s, forwarding further\n",
+ GNUNET_i2s (&dv->target));
+ return GNUNET_YES;
+ }
+ }
+ }
+ /* Count how many shorter paths we have (incl. direct
+ neighbours) before simply giving up on this one! */
+ if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
+ {
+ /* We have a shorter path already! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have many shorter DV paths %s, not forwarding further\n",
+ GNUNET_i2s (&dv->target));
+ return GNUNET_NO;
}
+ /* create new DV path entry */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Discovered new DV path to %s\n",
+ GNUNET_i2s (&dv->target));
+ hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
+ + sizeof(struct GNUNET_PeerIdentity) * (path_len - 2));
+ hop->next_hop = next_hop;
+ hop->dv = dv;
+ hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
+ memcpy (&hop[1],
+ &path[2],
+ sizeof(struct GNUNET_PeerIdentity) * (path_len - 2));
+ hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
+ hop->path_valid_until = path_valid_until;
+ hop->distance = path_len - 2;
+ hop->pd.aged_rtt = network_latency;
+ GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
+ GNUNET_CONTAINER_MDLL_insert (neighbour,
+ next_hop->dv_head,
+ next_hop->dv_tail,
+ hop);
+ if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
+ activate_core_visible_dv_path (hop);
+ return GNUNET_YES;
}
/**
- * Function called by PEERSTORE when the store operation of
- * a backtalker's monotonic time is complete.
+ * Communicator gave us a DV learn message. Check the message.
*
- * @param cls the `struct Backtalker`
- * @param success #GNUNET_OK on success
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvl the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
*/
-static void
-backtalker_monotime_store_cb (void *cls, int success)
+static int
+check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
{
- struct Backtalker *b = cls;
+ uint16_t size = ntohs (dvl->header.size);
+ uint16_t num_hops = ntohs (dvl->num_hops);
+ const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
- if (GNUNET_OK != success)
+ (void) cls;
+ if (size != sizeof(*dvl) + num_hops * sizeof(struct DVPathEntryP))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to store backtalker's monotonic time in PEERSTORE!\n");
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- b->sc = NULL;
- b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+ if (num_hops > MAX_DV_HOPS_ALLOWED)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ for (unsigned int i = 0; i < num_hops; i++)
+ {
+ if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ }
+ return GNUNET_YES;
}
/**
- * The backtalker @a b monotonic time changed. Update PEERSTORE.
+ * Build and forward a DV learn message to @a next_hop.
*
- * @param b a backtalker with updated monotonic time
+ * @param next_hop peer to send the message to
+ * @param msg message received
+ * @param bi_history bitmask specifying hops on path that were bidirectional
+ * @param nhops length of the @a hops array
+ * @param hops path the message traversed so far
+ * @param in_time when did we receive the message, used to calculate network
+ * delay
*/
static void
-update_backtalker_monotime (struct Backtalker *b)
+forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
+ const struct TransportDVLearnMessage *msg,
+ uint16_t bi_history,
+ uint16_t nhops,
+ const struct DVPathEntryP *hops,
+ struct GNUNET_TIME_Absolute in_time)
{
- struct GNUNET_TIME_AbsoluteNBO mtbe;
+ struct DVPathEntryP *dhops;
+ char buf[sizeof(struct TransportDVLearnMessage)
+ + (nhops + 1) * sizeof(struct DVPathEntryP)] GNUNET_ALIGN;
+ struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
+ struct GNUNET_TIME_Relative nnd;
- if (NULL != b->sc)
- {
- GNUNET_PEERSTORE_store_cancel (b->sc);
- b->sc = NULL;
- }
- else
+ /* compute message for forwarding */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forwarding DV learn message originating from %s to %s\n",
+ GNUNET_i2s (&msg->initiator),
+ GNUNET_i2s2 (next_hop));
+ GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
+ fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
+ fwd->header.size = htons (sizeof(struct TransportDVLearnMessage)
+ + (nhops + 1) * sizeof(struct DVPathEntryP));
+ fwd->num_hops = htons (nhops + 1);
+ fwd->bidirectional = htons (bi_history);
+ nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
+ GNUNET_TIME_relative_ntoh (
+ msg->non_network_delay));
+ fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
+ fwd->init_sig = msg->init_sig;
+ fwd->initiator = msg->initiator;
+ fwd->challenge = msg->challenge;
+ dhops = (struct DVPathEntryP *) &fwd[1];
+ GNUNET_memcpy (dhops, hops, sizeof(struct DVPathEntryP) * nhops);
+ dhops[nhops].hop = GST_my_identity;
{
- GNUNET_SCHEDULER_cancel (b->task);
- b->task = NULL;
+ struct DvHopPS dhp = {
+ .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+ .purpose.size = htonl (sizeof(dhp)),
+ .pred = dhops[nhops - 1].hop,
+ .succ = *next_hop,
+ .challenge = msg->challenge
+ };
+
+ GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+ &dhp,
+ &dhops[nhops].hop_sig);
}
- mtbe = GNUNET_TIME_absolute_hton (b->monotonic_time);
- b->sc =
- GNUNET_PEERSTORE_store (peerstore,
- "transport",
- &b->pid,
- GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
- &mtbe,
- sizeof (mtbe),
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- &backtalker_monotime_store_cb,
- b);
+ route_control_message_without_fc (next_hop,
+ &fwd->header,
+ RMO_UNCONFIRMED_ALLOWED);
}
/**
- * Communicator gave us a backchannel encapsulation. Process the request.
- * (We are not the origin of the backchannel here, the communicator simply
- * received a backchannel message and we are expected to forward it.)
+ * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
*
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param be the message that was received
+ * @param sender_monotonic_time monotonic time of the initiator
+ * @param init the signer
+ * @param challenge the challenge that was signed
+ * @param init_sig signature presumably by @a init
+ * @return #GNUNET_OK if the signature is valid
*/
-static void
-handle_backchannel_encapsulation (
- void *cls,
- const struct TransportBackchannelEncapsulationMessage *be)
+static int
+validate_dv_initiator_signature (
+ struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
+ const struct GNUNET_PeerIdentity *init,
+ const struct ChallengeNonceP *challenge,
+ const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
{
- struct CommunicatorMessageContext *cmc = cls;
- struct BackchannelKeyState key;
- struct GNUNET_HashCode hmac;
- const char *hdr;
- size_t hdr_len;
+ struct DvInitPS ip = { .purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+ .purpose.size = htonl (sizeof(ip)),
+ .monotonic_time = sender_monotonic_time,
+ .challenge = *challenge };
- if (0 != GNUNET_memcmp (&be->target, &GST_my_identity))
+ if (
+ GNUNET_OK !=
+ GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
+ &ip,
+ init_sig,
+ &init->public_key))
{
- /* not for me, try to route to target */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Forwarding backtalk to %s\n",
- GNUNET_i2s (&be->target));
- route_message (&be->target,
- GNUNET_copy_message (&be->header),
- RMO_DV_ALLOWED);
- finish_cmc_handling (cmc);
- return;
- }
- dh_key_derive_eph_pub (&be->ephemeral_key, &be->iv, &key);
- hdr = (const char *) &be[1];
- hdr_len = ntohs (be->header.size) - sizeof (*be);
- bc_hmac (&key, &hmac, hdr, hdr_len);
- if (0 != GNUNET_memcmp (&hmac, &be->hmac))
- {
- /* HMAC missmatch, disard! */
GNUNET_break_op (0);
- finish_cmc_handling (cmc);
- return;
+ return GNUNET_SYSERR;
}
- /* begin actual decryption */
- {
- struct Backtalker *b;
- struct GNUNET_TIME_Absolute monotime;
- struct TransportBackchannelRequestPayloadP ppay;
- char body[hdr_len - sizeof (ppay)];
+ return GNUNET_OK;
+}
- GNUNET_assert (hdr_len >=
- sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
- bc_decrypt (&key, &ppay, hdr, sizeof (ppay));
- bc_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
- bc_key_clean (&key);
- monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Decrypted backtalk from %s\n",
- GNUNET_i2s (&ppay.sender));
- b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
- if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
- {
- GNUNET_STATISTICS_update (
- GST_stats,
- "# Backchannel messages dropped: monotonic time not increasing",
- 1,
- GNUNET_NO);
- finish_cmc_handling (cmc);
- return;
- }
- if ((NULL == b) ||
- (0 != GNUNET_memcmp (&b->last_ephemeral, &be->ephemeral_key)))
- {
- /* Check signature */
- struct EphemeralConfirmationPS ec;
- ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
- ec.purpose.size = htonl (sizeof (ec));
- ec.target = GST_my_identity;
- ec.ephemeral_key = be->ephemeral_key;
- if (
- GNUNET_OK !=
- GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
- &ec.purpose,
- &ppay.sender_sig,
- &ppay.sender.public_key))
- {
- /* Signature invalid, disard! */
- GNUNET_break_op (0);
- finish_cmc_handling (cmc);
- return;
- }
- }
- if (NULL != b)
- {
- /* update key cache and mono time */
- b->last_ephemeral = be->ephemeral_key;
- b->monotonic_time = monotime;
- update_backtalker_monotime (b);
- forward_backchannel_payload (b, body, sizeof (body));
- b->timeout =
- GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
- finish_cmc_handling (cmc);
- return;
- }
- /* setup data structure to cache signature AND check
- monotonic time with PEERSTORE before forwarding backchannel payload */
- b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
- b->pid = ppay.sender;
- b->body_size = sizeof (body);
- memcpy (&b[1], body, sizeof (body));
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_put (
- backtalkers,
- &b->pid,
- b,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- b->monotonic_time = monotime; /* NOTE: to be checked still! */
- b->cmc = cmc;
- b->timeout =
- GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
- b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
- b->get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport",
- &b->pid,
- GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
- &backtalker_monotime_cb,
- b);
- }
-}
+/**
+ * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
+ */
+struct NeighbourSelectionContext
+{
+ /**
+ * Original message we received.
+ */
+ const struct TransportDVLearnMessage *dvl;
+
+ /**
+ * The hops taken.
+ */
+ const struct DVPathEntryP *hops;
+
+ /**
+ * Time we received the message.
+ */
+ struct GNUNET_TIME_Absolute in_time;
+
+ /**
+ * Offsets of the selected peers.
+ */
+ uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
+
+ /**
+ * Number of peers eligible for selection.
+ */
+ unsigned int num_eligible;
+
+ /**
+ * Number of peers that were selected for forwarding.
+ */
+ unsigned int num_selections;
+
+ /**
+ * Number of hops in @e hops
+ */
+ uint16_t nhops;
+
+ /**
+ * Bitmap of bidirectional connections encountered.
+ */
+ uint16_t bi_history;
+};
/**
- * Task called when we should check if any of the DV paths
- * we have learned to a target are due for garbage collection.
- *
- * Collects stale paths, and possibly frees the entire DV
- * entry if no paths are left. Otherwise re-schedules itself.
+ * Function called for each neighbour during #handle_dv_learn.
*
- * @param cls a `struct DistanceVector`
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
*/
-static void
-path_cleanup_cb (void *cls)
+static int
+dv_neighbour_selection (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
- struct DistanceVector *dv = cls;
- struct DistanceVectorHop *pos;
+ struct NeighbourSelectionContext *nsc = cls;
- dv->timeout_task = NULL;
- while (NULL != (pos = dv->dv_head))
- {
- GNUNET_assert (dv == pos->dv);
- if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
- break;
- free_distance_vector_hop (pos);
- }
- if (NULL == pos)
- {
- free_dv_route (dv);
- return;
- }
- dv->timeout_task =
- GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
+ (void) value;
+ if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+ return GNUNET_YES; /* skip initiator */
+ for (unsigned int i = 0; i < nsc->nhops; i++)
+ if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+ return GNUNET_YES;
+ /* skip peers on path */
+ nsc->num_eligible++;
+ return GNUNET_YES;
}
/**
- * The @a hop is a validated path to the respective target
- * peer and we should tell core about it -- and schedule
- * a job to revoke the state.
+ * Function called for each neighbour during #handle_dv_learn.
+ * We call #forward_dv_learn() on the neighbour(s) selected
+ * during #dv_neighbour_selection().
*
- * @param hop a path to some peer that is the reason for activation
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
*/
-static void
-activate_core_visible_dv_path (struct DistanceVectorHop *hop)
+static int
+dv_neighbour_transmission (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
- struct DistanceVector *dv = hop->dv;
- struct VirtualLink *vl;
+ struct NeighbourSelectionContext *nsc = cls;
- vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
- if (NULL != vl)
+ (void) value;
+ if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+ return GNUNET_YES; /* skip initiator */
+ for (unsigned int i = 0; i < nsc->nhops; i++)
+ if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+ return GNUNET_YES;
+ /* skip peers on path */
+ for (unsigned int i = 0; i < nsc->num_selections; i++)
{
- /* Link was already up, remember dv is also now available and we are done */
- vl->dv = dv;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Virtual link to %s could now also use DV!\n",
- GNUNET_i2s (&dv->target));
- return;
+ if (nsc->selections[i] == nsc->num_eligible)
+ {
+ forward_dv_learn (pid,
+ nsc->dvl,
+ nsc->bi_history,
+ nsc->nhops,
+ nsc->hops,
+ nsc->in_time);
+ break;
+ }
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new virtual link to %s using DV!\n",
- GNUNET_i2s (&dv->target));
- vl = GNUNET_new (struct VirtualLink);
- vl->target = dv->target;
- vl->dv = dv;
- vl->core_recv_window = RECV_WINDOW_SIZE;
- vl->visibility_task =
- GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_put (
- links,
- &vl->target,
- vl,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- /* We lacked a confirmed connection to the target
- before, so tell CORE about it (finally!) */
- cores_send_connect_info (&dv->target);
+ nsc->num_eligible++;
+ return GNUNET_YES;
}
/**
- * We have learned a @a path through the network to some other peer, add it to
- * our DV data structure (returning #GNUNET_YES on success).
+ * Computes the number of neighbours we should forward a DVInit
+ * message to given that it has so far taken @a hops_taken hops
+ * though the network and that the number of neighbours we have
+ * in total is @a neighbour_count, out of which @a eligible_count
+ * are not yet on the path.
*
- * We do not add paths if we have a sufficient number of shorter
- * paths to this target already (returning #GNUNET_NO).
+ * NOTE: technically we might want to include NSE in the formula to
+ * get a better grip on the overall network size. However, for now
+ * using NSE here would create a dependency issue in the build system.
+ * => Left for later, hardcoded to 50 for now.
*
- * We also do not add problematic paths, like those where we lack the first
- * hop in our neighbour list (i.e. due to a topology change) or where some
- * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
+ * The goal of the fomula is that we want to reach a total of LOG(NSE)
+ * peers via DV (`target_total`). We want the reach to be spread out
+ * over various distances to the origin, with a bias towards shorter
+ * distances.
*
- * @param path the path we learned, path[0] should be us,
- * and then path contains a valid path from us to
- * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
- * @param path_len number of entries on the @a path, at least three!
- * @param network_latency how long does the message take from us to
- * `path[path_len-1]`? set to "forever" if unknown
- * @param path_valid_until how long is this path considered validated? Maybe
- * be zero.
- * @return #GNUNET_YES on success,
- * #GNUNET_NO if we have better path(s) to the target
- * #GNUNET_SYSERR if the path is useless and/or invalid
- * (i.e. path[1] not a direct neighbour
- * or path[i+1] is a direct neighbour for i>0)
+ * We make the strong assumption that the network topology looks
+ * "similar" at other hops, in particular the @a neighbour_count
+ * should be comparable at other hops.
+ *
+ * If the local neighbourhood is densely connected, we expect that @a
+ * eligible_count is close to @a neighbour_count minus @a hops_taken
+ * as a lot of the path is already known. In that case, we should
+ * forward to few(er) peers to try to find a path out of the
+ * neighbourhood. OTOH, if @a eligible_count is close to @a
+ * neighbour_count, we should forward to many peers as we are either
+ * still close to the origin (i.e. @a hops_taken is small) or because
+ * we managed to get beyond a local cluster. We express this as
+ * the `boost_factor` using the square of the fraction of eligible
+ * neighbours (so if only 50% are eligible, we boost by 1/4, but if
+ * 99% are eligible, the 'boost' will be almost 1).
+ *
+ * Second, the more hops we have taken, the larger the problem of an
+ * exponential traffic explosion gets. So we take the `target_total`,
+ * and compute our degree such that at each distance d 2^{-d} peers
+ * are selected (corrected by the `boost_factor`).
+ *
+ * @param hops_taken number of hops DVInit has travelled so far
+ * @param neighbour_count number of neighbours we have in total
+ * @param eligible_count number of neighbours we could in
+ * theory forward to
*/
-static int
-learn_dv_path (const struct GNUNET_PeerIdentity *path,
- unsigned int path_len,
- struct GNUNET_TIME_Relative network_latency,
- struct GNUNET_TIME_Absolute path_valid_until)
+static unsigned int
+calculate_fork_degree (unsigned int hops_taken,
+ unsigned int neighbour_count,
+ unsigned int eligible_count)
{
- struct DistanceVectorHop *hop;
- struct DistanceVector *dv;
- struct Neighbour *next_hop;
- unsigned int shorter_distance;
+ double target_total = 50.0; /* FIXME: use LOG(NSE)? */
+ double eligible_ratio =
+ ((double) eligible_count) / ((double) neighbour_count);
+ double boost_factor = eligible_ratio * eligible_ratio;
+ unsigned int rnd;
+ double left;
- if (path_len < 3)
+ if (hops_taken >= 64)
{
- /* what a boring path! not allowed! */
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return 0; /* precaution given bitshift below */
}
- GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
- next_hop = lookup_neighbour (&path[1]);
- if (NULL == next_hop)
+ for (unsigned int i = 1; i < hops_taken; i++)
{
- /* next hop must be a neighbour, otherwise this whole thing is useless! */
- GNUNET_break (0);
- return GNUNET_SYSERR;
+ /* For each hop, subtract the expected number of targets
+ reached at distance d (so what remains divided by 2^d) */
+ target_total -= (target_total * boost_factor / (1LLU << i));
}
- for (unsigned int i = 2; i < path_len; i++)
- if (NULL != lookup_neighbour (&path[i]))
+ rnd =
+ (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
+ /* round up or down probabilistically depending on how close we were
+ when floor()ing to rnd */
+ left = target_total - (double) rnd;
+ if (UINT32_MAX * left >
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
+ rnd++; /* round up */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
+ hops_taken,
+ rnd,
+ eligible_count,
+ neighbour_count);
+ return rnd;
+}
+
+
+/**
+ * Function called when peerstore is done storing a DV monotonic time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param success #GNUNET_YES if peerstore was successful
+ */
+static void
+neighbour_store_dvmono_cb (void *cls, int success)
+{
+ struct Neighbour *n = cls;
+
+ n->sc = NULL;
+ if (GNUNET_YES != success)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to store other peer's monotonic time in peerstore!\n");
+}
+
+
+/**
+ * Communicator gave us a DV learn message. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param dvl the message that was received
+ */
+static void
+handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+ enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
+ int bi_hop;
+ uint16_t nhops;
+ uint16_t bi_history;
+ const struct DVPathEntryP *hops;
+ int do_fwd;
+ int did_initiator;
+ struct GNUNET_TIME_Absolute in_time;
+ struct Neighbour *n;
+
+ nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
+ bi_history = ntohs (dvl->bidirectional);
+ hops = (const struct DVPathEntryP *) &dvl[1];
+ if (0 == nhops)
+ {
+ /* sanity check */
+ if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
+ {
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ }
+ else
+ {
+ /* sanity check */
+ if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
+ {
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ }
+
+ GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
+ cc = cmc->tc->details.communicator.cc;
+ bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
+ cc); // FIXME: add bi-directional flag to cc?
+ in_time = GNUNET_TIME_absolute_get ();
+
+ /* continue communicator here, everything else can happen asynchronous! */
+ finish_cmc_handling (cmc);
+
+ n = lookup_neighbour (&dvl->initiator);
+ if (NULL != n)
+ {
+ if ((n->dv_monotime_available == GNUNET_YES) &&
+ (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
+ n->last_dv_learn_monotime.abs_value_us))
{
- /* Useless path: we have a direct connection to some hop
- in the middle of the path, so this one is not even
- terribly useful for redundancy */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Path of %u hops useless: directly link to hop %u (%s)\n",
- path_len,
- i,
- GNUNET_i2s (&path[i]));
GNUNET_STATISTICS_update (GST_stats,
- "# Useless DV path ignored: hop is neighbour",
+ "# DV learn discarded due to time travel",
1,
GNUNET_NO);
- return GNUNET_SYSERR;
+ return;
}
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
- if (NULL == dv)
+ if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
+ &dvl->initiator,
+ &dvl->challenge,
+ &dvl->init_sig))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
+ if (GNUNET_YES == n->dv_monotime_available)
+ {
+ if (NULL != n->sc)
+ GNUNET_PEERSTORE_store_cancel (n->sc);
+ n->sc =
+ GNUNET_PEERSTORE_store (peerstore,
+ "transport",
+ &dvl->initiator,
+ GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+ &dvl->monotonic_time,
+ sizeof(dvl->monotonic_time),
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ &neighbour_store_dvmono_cb,
+ n);
+ }
+ }
+ /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
+ If signature verification load too high, implement random drop strategy */
+ for (unsigned int i = 0; i < nhops; i++)
{
- dv = GNUNET_new (struct DistanceVector);
- dv->target = path[path_len - 1];
- dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
- &path_cleanup_cb,
- dv);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (
- dv_routes,
- &dv->target,
- dv,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ struct DvHopPS dhp = { .purpose.purpose =
+ htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+ .purpose.size = htonl (sizeof(dhp)),
+ .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+ .succ = (nhops == i + 1) ? GST_my_identity
+ : hops[i + 1].hop,
+ .challenge = dvl->challenge };
+
+ if (GNUNET_OK !=
+ GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
+ &dhp,
+ &hops[i].hop_sig,
+ &hops[i].hop.public_key))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
}
- /* Check if we have this path already! */
- shorter_distance = 0;
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
+
+ if (GNUNET_EXTRA_LOGGING > 0)
{
- if (pos->distance < path_len - 2)
- shorter_distance++;
- /* Note that the distances in 'pos' excludes us (path[0]) and
- the next_hop (path[1]), so we need to subtract two
- and check next_hop explicitly */
- if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop))
+ char *path;
+
+ path = GNUNET_strdup (GNUNET_i2s (&dvl->initiator));
+ for (unsigned int i = 0; i < nhops; i++)
{
- int match = GNUNET_YES;
+ char *tmp;
- for (unsigned int i = 0; i < pos->distance; i++)
+ GNUNET_asprintf (&tmp,
+ "%s%s%s",
+ path,
+ (bi_history & (1 << (nhops - i))) ? "<->" : "-->",
+ GNUNET_i2s (&hops[i].hop));
+ GNUNET_free (path);
+ path = tmp;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DVInit via %s%s%s\n",
+ path,
+ bi_hop ? "<->" : "-->",
+ GNUNET_i2s (&GST_my_identity));
+ GNUNET_free (path);
+ }
+
+ do_fwd = GNUNET_YES;
+ if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
+ {
+ struct GNUNET_PeerIdentity path[nhops + 1];
+ struct GNUNET_TIME_Relative host_latency_sum;
+ struct GNUNET_TIME_Relative latency;
+ struct GNUNET_TIME_Relative network_latency;
+
+ /* We initiated this, learn the forward path! */
+ path[0] = GST_my_identity;
+ path[1] = hops[0].hop;
+ host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
+
+ // Need also something to lookup initiation time
+ // to compute RTT! -> add RTT argument here?
+ latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
+ // (based on dvl->challenge, we can identify time of origin!)
+
+ network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
+ /* assumption: latency on all links is the same */
+ network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
+
+ for (unsigned int i = 2; i <= nhops; i++)
+ {
+ struct GNUNET_TIME_Relative ilat;
+
+ /* assumption: linear latency increase per hop */
+ ilat = GNUNET_TIME_relative_multiply (network_latency, i);
+ path[i] = hops[i - 1].hop;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Learned path with %u hops to %s with latency %s\n",
+ i,
+ GNUNET_i2s (&path[i]),
+ GNUNET_STRINGS_relative_time_to_string (ilat, GNUNET_YES));
+ learn_dv_path (path,
+ i,
+ ilat,
+ GNUNET_TIME_relative_to_absolute (
+ ADDRESS_VALIDATION_LIFETIME));
+ }
+ /* as we initiated, do not forward again (would be circular!) */
+ do_fwd = GNUNET_NO;
+ return;
+ }
+ if (bi_hop)
+ {
+ /* last hop was bi-directional, we could learn something here! */
+ struct GNUNET_PeerIdentity path[nhops + 2];
+
+ path[0] = GST_my_identity;
+ path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
+ for (unsigned int i = 0; i < nhops; i++)
+ {
+ int iret;
+
+ if (0 == (bi_history & (1 << i)))
+ break; /* i-th hop not bi-directional, stop learning! */
+ if (i == nhops - 1)
{
- if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
- {
- match = GNUNET_NO;
- break;
- }
+ path[i + 2] = dvl->initiator;
}
- if (GNUNET_YES == match)
+ else
{
- struct GNUNET_TIME_Relative last_timeout;
+ path[i + 2] = hops[nhops - i - 2].hop;
+ }
- /* Re-discovered known path, update timeout */
- GNUNET_STATISTICS_update (GST_stats,
- "# Known DV path refreshed",
- 1,
- GNUNET_NO);
- last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
- pos->timeout =
- GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
- pos->path_valid_until =
- GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
- GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
- GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
- if (0 <
- GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
- activate_core_visible_dv_path (pos);
- if (last_timeout.rel_value_us <
- GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
- DV_PATH_DISCOVERY_FREQUENCY)
- .rel_value_us)
- {
- /* Some peer send DV learn messages too often, we are learning
- the same path faster than it would be useful; do not forward! */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Rediscovered path too quickly, not forwarding further\n");
- return GNUNET_NO;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Refreshed known path to %s, forwarding further\n",
- GNUNET_i2s (&dv->target));
- return GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Learned inverse path with %u hops to %s\n",
+ i + 1,
+ GNUNET_i2s (&path[i + 2]));
+ iret = learn_dv_path (path,
+ i + 2,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_TIME_UNIT_ZERO_ABS);
+ if (GNUNET_SYSERR == iret)
+ {
+ /* path invalid or too long to be interesting for US, thus should also
+ not be interesting to our neighbours, cut path when forwarding to
+ 'i' hops, except of course for the one that goes back to the
+ initiator */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV learn not forwarded due invalidity of path",
+ 1,
+ GNUNET_NO);
+ do_fwd = GNUNET_NO;
+ break;
+ }
+ if ((GNUNET_NO == iret) && (nhops == i + 1))
+ {
+ /* we have better paths, and this is the longest target,
+ so there cannot be anything interesting later */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV learn not forwarded, got better paths",
+ 1,
+ GNUNET_NO);
+ do_fwd = GNUNET_NO;
+ break;
}
}
}
- /* Count how many shorter paths we have (incl. direct
- neighbours) before simply giving up on this one! */
- if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
+
+ if (MAX_DV_HOPS_ALLOWED == nhops)
{
- /* We have a shorter path already! */
+ /* At limit, we're out of here! */
+ finish_cmc_handling (cmc);
+ return;
+ }
+
+ /* Forward to initiator, if path non-trivial and possible */
+ bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
+ did_initiator = GNUNET_NO;
+ if ((1 < nhops) &&
+ (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_contains (neighbours, &dvl->initiator)))
+ {
+ /* send back to origin! */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Have many shorter DV paths %s, not forwarding further\n",
- GNUNET_i2s (&dv->target));
- return GNUNET_NO;
+ "Sending DVL back to initiator %s\n",
+ GNUNET_i2s (&dvl->initiator));
+ forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
+ did_initiator = GNUNET_YES;
+ }
+ /* We forward under two conditions: either we still learned something
+ ourselves (do_fwd), or the path was darn short and thus the initiator is
+ likely to still be very interested in this (and we did NOT already
+ send it back to the initiator) */
+ if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
+ (GNUNET_NO == did_initiator)))
+ {
+ /* Pick random neighbours that are not yet on the path */
+ struct NeighbourSelectionContext nsc;
+ unsigned int n_cnt;
+
+ n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
+ nsc.nhops = nhops;
+ nsc.dvl = dvl;
+ nsc.bi_history = bi_history;
+ nsc.hops = hops;
+ nsc.in_time = in_time;
+ nsc.num_eligible = 0;
+ GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+ &dv_neighbour_selection,
+ &nsc);
+ if (0 == nsc.num_eligible)
+ return; /* done here, cannot forward to anyone else */
+ nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
+ nsc.num_selections =
+ GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forwarding DVL to %u other peers\n",
+ nsc.num_selections);
+ for (unsigned int i = 0; i < nsc.num_selections; i++)
+ nsc.selections[i] =
+ (nsc.num_selections == n_cnt)
+ ? i /* all were selected, avoid collisions by chance */
+ : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
+ nsc.num_eligible = 0;
+ GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+ &dv_neighbour_transmission,
+ &nsc);
}
- /* create new DV path entry */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Discovered new DV path to %s\n",
- GNUNET_i2s (&dv->target));
- hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
- sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
- hop->next_hop = next_hop;
- hop->dv = dv;
- hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
- memcpy (&hop[1],
- &path[2],
- sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
- hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
- hop->path_valid_until = path_valid_until;
- hop->distance = path_len - 2;
- hop->pd.aged_rtt = network_latency;
- GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
- GNUNET_CONTAINER_MDLL_insert (neighbour,
- next_hop->dv_head,
- next_hop->dv_tail,
- hop);
- if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
- activate_core_visible_dv_path (hop);
- return GNUNET_YES;
}
/**
- * Communicator gave us a DV learn message. Check the message.
+ * Communicator gave us a DV box. Check the message.
*
* @param cls a `struct CommunicatorMessageContext`
- * @param dvl the send message that was sent
+ * @param dvb the send message that was sent
* @return #GNUNET_YES if message is well-formed
*/
static int
-check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
+check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
{
- uint16_t size = ntohs (dvl->header.size);
- uint16_t num_hops = ntohs (dvl->num_hops);
- const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
+ uint16_t size = ntohs (dvb->header.size);
+ uint16_t num_hops = ntohs (dvb->num_hops);
+ const struct GNUNET_PeerIdentity *hops =
+ (const struct GNUNET_PeerIdentity *) &dvb[1];
(void) cls;
- if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (num_hops > MAX_DV_HOPS_ALLOWED)
+ if (size < sizeof(*dvb) + num_hops * sizeof(struct GNUNET_PeerIdentity)
+ + sizeof(struct GNUNET_MessageHeader))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
+ /* This peer must not be on the path */
for (unsigned int i = 0; i < num_hops; i++)
- {
- if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
+ if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- }
return GNUNET_YES;
}
/**
- * Build and forward a DV learn message to @a next_hop.
+ * Create a DV Box message and queue it for transmission to
+ * @ea next_hop.
*
- * @param next_hop peer to send the message to
- * @param msg message received
- * @param bi_history bitmask specifying hops on path that were bidirectional
- * @param nhops length of the @a hops array
- * @param hops path the message traversed so far
- * @param in_time when did we receive the message, used to calculate network
- * delay
+ * @param next_hop peer to receive the message next
+ * @param total_hops how many hops did the message take so far
+ * @param num_hops length of the @a hops array
+ * @param origin origin of the message
+ * @param hops next peer(s) to the destination, including destination
+ * @param payload payload of the box
+ * @param payload_size number of bytes in @a payload
*/
static void
-forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
- const struct TransportDVLearnMessage *msg,
- uint16_t bi_history,
- uint16_t nhops,
- const struct DVPathEntryP *hops,
- struct GNUNET_TIME_Absolute in_time)
+forward_dv_box (struct Neighbour *next_hop,
+ const struct TransportDVBoxMessage *hdr,
+ uint16_t total_hops,
+ uint16_t num_hops,
+ const struct GNUNET_PeerIdentity *hops,
+ const void *enc_payload,
+ uint16_t enc_payload_size)
{
- struct DVPathEntryP *dhops;
- struct TransportDVLearnMessage *fwd;
- struct GNUNET_TIME_Relative nnd;
+ struct VirtualLink *vl = next_hop->vl;
+ struct PendingMessage *pm;
+ size_t msg_size;
+ char *buf;
+ struct GNUNET_PeerIdentity *dhops;
- /* compute message for forwarding */
+ GNUNET_assert (NULL != vl);
+ msg_size = sizeof(struct TransportDVBoxMessage)
+ + num_hops * sizeof(struct GNUNET_PeerIdentity) + enc_payload_size;
+ pm = GNUNET_malloc (sizeof(struct PendingMessage) + msg_size);
+ pm->pmt = PMT_DV_BOX;
+ pm->vl = vl;
+ pm->timeout = GNUNET_TIME_relative_to_absolute (DV_FORWARD_TIMEOUT);
+ pm->logging_uuid = logging_uuid_gen++;
+ pm->prefs = GNUNET_MQ_PRIO_BACKGROUND;
+ pm->bytes_msg = msg_size;
+ buf = (char *) &pm[1];
+ memcpy (buf, hdr, sizeof(*hdr));
+ dhops =
+ (struct GNUNET_PeerIdentity *) &buf[sizeof(struct TransportDVBoxMessage)];
+ memcpy (dhops, hops, num_hops * sizeof(struct GNUNET_PeerIdentity));
+ memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
+ GNUNET_CONTAINER_MDLL_insert (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pm);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Forwarding DV learn message originating from %s to %s\n",
- GNUNET_i2s (&msg->initiator),
- GNUNET_i2s2 (next_hop));
- GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
- fwd = GNUNET_malloc (sizeof (struct TransportDVLearnMessage) +
- (nhops + 1) * sizeof (struct DVPathEntryP));
- fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
- fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
- (nhops + 1) * sizeof (struct DVPathEntryP));
- fwd->num_hops = htons (nhops + 1);
- fwd->bidirectional = htons (bi_history);
- nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
- GNUNET_TIME_relative_ntoh (
- msg->non_network_delay));
- fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
- fwd->init_sig = msg->init_sig;
- fwd->initiator = msg->initiator;
- fwd->challenge = msg->challenge;
- dhops = (struct DVPathEntryP *) &fwd[1];
- GNUNET_memcpy (dhops, hops, sizeof (struct DVPathEntryP) * nhops);
- dhops[nhops].hop = GST_my_identity;
- {
- struct DvHopPS dhp = {.purpose.purpose =
- htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
- .purpose.size = htonl (sizeof (dhp)),
- .pred = dhops[nhops - 1].hop,
- .succ = *next_hop,
- .challenge = msg->challenge};
-
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
- &dhp.purpose,
- &dhops[nhops].hop_sig));
- }
- route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+ "Created pending message %llu for DV Box with next hop %s (%u/%u)\n",
+ pm->logging_uuid,
+ GNUNET_i2s (&next_hop->pid),
+ (unsigned int) num_hops,
+ (unsigned int) total_hops);
+ check_vl_transmission (vl);
}
/**
- * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
+ * Free data structures associated with @a b.
*
- * @param sender_monotonic_time monotonic time of the initiator
- * @param init the signer
- * @param challenge the challenge that was signed
- * @param init_sig signature presumably by @a init
- * @return #GNUNET_OK if the signature is valid
- */
-static int
-validate_dv_initiator_signature (
- struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
- const struct GNUNET_PeerIdentity *init,
- const struct ChallengeNonceP *challenge,
- const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
-{
- struct DvInitPS ip = {.purpose.purpose = htonl (
- GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
- .purpose.size = htonl (sizeof (ip)),
- .monotonic_time = sender_monotonic_time,
- .challenge = *challenge};
-
- if (
- GNUNET_OK !=
- GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
- &ip.purpose,
- init_sig,
- &init->public_key))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
+ * @param b data structure to release
*/
-struct NeighbourSelectionContext
+static void
+free_backtalker (struct Backtalker *b)
{
- /**
- * Original message we received.
- */
- const struct TransportDVLearnMessage *dvl;
-
- /**
- * The hops taken.
- */
- const struct DVPathEntryP *hops;
-
- /**
- * Time we received the message.
- */
- struct GNUNET_TIME_Absolute in_time;
-
- /**
- * Offsets of the selected peers.
- */
- uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
-
- /**
- * Number of peers eligible for selection.
- */
- unsigned int num_eligible;
-
- /**
- * Number of peers that were selected for forwarding.
- */
- unsigned int num_selections;
-
- /**
- * Number of hops in @e hops
- */
- uint16_t nhops;
-
- /**
- * Bitmap of bidirectional connections encountered.
- */
- uint16_t bi_history;
-};
+ if (NULL != b->get)
+ {
+ GNUNET_PEERSTORE_iterate_cancel (b->get);
+ b->get = NULL;
+ GNUNET_assert (NULL != b->cmc);
+ finish_cmc_handling (b->cmc);
+ b->cmc = NULL;
+ }
+ if (NULL != b->task)
+ {
+ GNUNET_SCHEDULER_cancel (b->task);
+ b->task = NULL;
+ }
+ if (NULL != b->sc)
+ {
+ GNUNET_PEERSTORE_store_cancel (b->sc);
+ b->sc = NULL;
+ }
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
+ GNUNET_free (b);
+}
/**
- * Function called for each neighbour during #handle_dv_learn.
+ * Callback to free backtalker records.
*
- * @param cls a `struct NeighbourSelectionContext *`
- * @param pid identity of the peer
- * @param value a `struct Neighbour`
- * @return #GNUNET_YES (always)
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct Backtalker`
+ * @return #GNUNET_OK (always)
*/
static int
-dv_neighbour_selection (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+free_backtalker_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
- struct NeighbourSelectionContext *nsc = cls;
+ struct Backtalker *b = value;
- (void) value;
- if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
- return GNUNET_YES; /* skip initiator */
- for (unsigned int i = 0; i < nsc->nhops; i++)
- if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
- return GNUNET_YES; /* skip peers on path */
- nsc->num_eligible++;
- return GNUNET_YES;
+ (void) cls;
+ (void) pid;
+ free_backtalker (b);
+ return GNUNET_OK;
}
/**
- * Function called for each neighbour during #handle_dv_learn.
- * We call #forward_dv_learn() on the neighbour(s) selected
- * during #dv_neighbour_selection().
+ * Function called when it is time to clean up a backtalker.
*
- * @param cls a `struct NeighbourSelectionContext *`
- * @param pid identity of the peer
- * @param value a `struct Neighbour`
- * @return #GNUNET_YES (always)
+ * @param cls a `struct Backtalker`
*/
-static int
-dv_neighbour_transmission (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+static void
+backtalker_timeout_cb (void *cls)
{
- struct NeighbourSelectionContext *nsc = cls;
+ struct Backtalker *b = cls;
- (void) value;
- if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
- return GNUNET_YES; /* skip initiator */
- for (unsigned int i = 0; i < nsc->nhops; i++)
- if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
- return GNUNET_YES; /* skip peers on path */
- for (unsigned int i = 0; i < nsc->num_selections; i++)
+ b->task = NULL;
+ if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
{
- if (nsc->selections[i] == nsc->num_eligible)
- {
- forward_dv_learn (pid,
- nsc->dvl,
- nsc->bi_history,
- nsc->nhops,
- nsc->hops,
- nsc->in_time);
- break;
- }
+ b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+ return;
}
- nsc->num_eligible++;
- return GNUNET_YES;
+ GNUNET_assert (NULL == b->sc);
+ free_backtalker (b);
}
/**
- * Computes the number of neighbours we should forward a DVInit
- * message to given that it has so far taken @a hops_taken hops
- * though the network and that the number of neighbours we have
- * in total is @a neighbour_count, out of which @a eligible_count
- * are not yet on the path.
- *
- * NOTE: technically we might want to include NSE in the formula to
- * get a better grip on the overall network size. However, for now
- * using NSE here would create a dependency issue in the build system.
- * => Left for later, hardcoded to 50 for now.
- *
- * The goal of the fomula is that we want to reach a total of LOG(NSE)
- * peers via DV (`target_total`). We want the reach to be spread out
- * over various distances to the origin, with a bias towards shorter
- * distances.
- *
- * We make the strong assumption that the network topology looks
- * "similar" at other hops, in particular the @a neighbour_count
- * should be comparable at other hops.
- *
- * If the local neighbourhood is densely connected, we expect that @a
- * eligible_count is close to @a neighbour_count minus @a hops_taken
- * as a lot of the path is already known. In that case, we should
- * forward to few(er) peers to try to find a path out of the
- * neighbourhood. OTOH, if @a eligible_count is close to @a
- * neighbour_count, we should forward to many peers as we are either
- * still close to the origin (i.e. @a hops_taken is small) or because
- * we managed to get beyond a local cluster. We express this as
- * the `boost_factor` using the square of the fraction of eligible
- * neighbours (so if only 50% are eligible, we boost by 1/4, but if
- * 99% are eligible, the 'boost' will be almost 1).
- *
- * Second, the more hops we have taken, the larger the problem of an
- * exponential traffic explosion gets. So we take the `target_total`,
- * and compute our degree such that at each distance d 2^{-d} peers
- * are selected (corrected by the `boost_factor`).
+ * Function called with the monotonic time of a backtalker
+ * by PEERSTORE. Updates the time and continues processing.
*
- * @param hops_taken number of hops DVInit has travelled so far
- * @param neighbour_count number of neighbours we have in total
- * @param eligible_count number of neighbours we could in
- * theory forward to
+ * @param cls a `struct Backtalker`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
*/
-static unsigned int
-calculate_fork_degree (unsigned int hops_taken,
- unsigned int neighbour_count,
- unsigned int eligible_count)
+static void
+backtalker_monotime_cb (void *cls,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
- double target_total = 50.0; /* FIXME: use LOG(NSE)? */
- double eligible_ratio =
- ((double) eligible_count) / ((double) neighbour_count);
- double boost_factor = eligible_ratio * eligible_ratio;
- unsigned int rnd;
- double left;
+ struct Backtalker *b = cls;
+ struct GNUNET_TIME_AbsoluteNBO *mtbe;
+ struct GNUNET_TIME_Absolute mt;
- if (hops_taken >= 64)
+ (void) emsg;
+ if (NULL == record)
+ {
+ /* we're done with #backtalker_monotime_cb() invocations,
+ continue normal processing */
+ b->get = NULL;
+ GNUNET_assert (NULL != b->cmc);
+ if (0 != b->body_size)
+ demultiplex_with_cmc (b->cmc,
+ (const struct GNUNET_MessageHeader *) &b[1]);
+ else
+ finish_cmc_handling (b->cmc);
+ b->cmc = NULL;
+ return;
+ }
+ if (sizeof(*mtbe) != record->value_size)
{
GNUNET_break (0);
- return 0; /* precaution given bitshift below */
+ return;
}
- for (unsigned int i = 1; i < hops_taken; i++)
+ mtbe = record->value;
+ mt = GNUNET_TIME_absolute_ntoh (*mtbe);
+ if (mt.abs_value_us > b->monotonic_time.abs_value_us)
{
- /* For each hop, subtract the expected number of targets
- reached at distance d (so what remains divided by 2^d) */
- target_total -= (target_total * boost_factor / (1LLU << i));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Backtalker message from %s dropped, monotime in the past\n",
+ GNUNET_i2s (&b->pid));
+ GNUNET_STATISTICS_update (
+ GST_stats,
+ "# Backchannel messages dropped: monotonic time not increasing",
+ 1,
+ GNUNET_NO);
+ b->monotonic_time = mt;
+ /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+ */
+ b->body_size = 0;
+ return;
}
- rnd =
- (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
- /* round up or down probabilistically depending on how close we were
- when floor()ing to rnd */
- left = target_total - (double) rnd;
- if (UINT32_MAX * left >
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
- rnd++; /* round up */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
- hops_taken,
- rnd,
- eligible_count,
- neighbour_count);
- return rnd;
}
/**
- * Function called when peerstore is done storing a DV monotonic time.
+ * Function called by PEERSTORE when the store operation of
+ * a backtalker's monotonic time is complete.
*
- * @param cls a `struct Neighbour`
- * @param success #GNUNET_YES if peerstore was successful
+ * @param cls the `struct Backtalker`
+ * @param success #GNUNET_OK on success
*/
static void
-neighbour_store_dvmono_cb (void *cls, int success)
+backtalker_monotime_store_cb (void *cls, int success)
{
- struct Neighbour *n = cls;
+ struct Backtalker *b = cls;
- n->sc = NULL;
- if (GNUNET_YES != success)
+ if (GNUNET_OK != success)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to store other peer's monotonic time in peerstore!\n");
+ "Failed to store backtalker's monotonic time in PEERSTORE!\n");
+ }
+ b->sc = NULL;
+ b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
}
-// FIXME: add logging logic from here!
+/**
+ * The backtalker @a b monotonic time changed. Update PEERSTORE.
+ *
+ * @param b a backtalker with updated monotonic time
+ */
+static void
+update_backtalker_monotime (struct Backtalker *b)
+{
+ struct GNUNET_TIME_AbsoluteNBO mtbe;
+
+ if (NULL != b->sc)
+ {
+ GNUNET_PEERSTORE_store_cancel (b->sc);
+ b->sc = NULL;
+ }
+ else
+ {
+ GNUNET_SCHEDULER_cancel (b->task);
+ b->task = NULL;
+ }
+ mtbe = GNUNET_TIME_absolute_hton (b->monotonic_time);
+ b->sc =
+ GNUNET_PEERSTORE_store (peerstore,
+ "transport",
+ &b->pid,
+ GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+ &mtbe,
+ sizeof(mtbe),
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ &backtalker_monotime_store_cb,
+ b);
+}
/**
- * Communicator gave us a DV learn message. Process the request.
+ * Communicator gave us a DV box. Process the request.
*
* @param cls a `struct CommunicatorMessageContext` (must call
* #finish_cmc_handling() when done)
- * @param dvl the message that was received
+ * @param dvb the message that was received
*/
static void
-handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
+handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
{
struct CommunicatorMessageContext *cmc = cls;
- enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
- int bi_hop;
- uint16_t nhops;
- uint16_t bi_history;
- const struct DVPathEntryP *hops;
- int do_fwd;
- int did_initiator;
- struct GNUNET_TIME_Absolute in_time;
- struct Neighbour *n;
+ uint16_t size = ntohs (dvb->header.size) - sizeof(*dvb);
+ uint16_t num_hops = ntohs (dvb->num_hops);
+ const struct GNUNET_PeerIdentity *hops =
+ (const struct GNUNET_PeerIdentity *) &dvb[1];
+ const char *enc_payload = (const char *) &hops[num_hops];
+ uint16_t enc_payload_size =
+ size - (num_hops * sizeof(struct GNUNET_PeerIdentity));
+ struct DVKeyState key;
+ struct GNUNET_HashCode hmac;
+ const char *hdr;
+ size_t hdr_len;
- nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
- bi_history = ntohs (dvl->bidirectional);
- hops = (const struct DVPathEntryP *) &dvl[1];
- if (0 == nhops)
+ if (GNUNET_EXTRA_LOGGING > 0)
{
- /* sanity check */
- if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
+ char *path;
+
+ path = GNUNET_strdup (GNUNET_i2s (&GST_my_identity));
+ for (unsigned int i = 0; i < num_hops; i++)
{
- GNUNET_break (0);
- finish_cmc_handling (cmc);
- return;
+ char *tmp;
+
+ GNUNET_asprintf (&tmp, "%s->%s", path, GNUNET_i2s (&hops[i]));
+ GNUNET_free (path);
+ path = tmp;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DVBox with remainig path %s\n",
+ path);
+ GNUNET_free (path);
}
- else
+
+ if (num_hops > 0)
{
- /* sanity check */
- if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
+ /* We're trying from the end of the hops array, as we may be
+ able to find a shortcut unknown to the origin that way */
+ for (int i = num_hops - 1; i >= 0; i--)
{
- GNUNET_break (0);
+ struct Neighbour *n;
+
+ if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
+ {
+ GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ n = lookup_neighbour (&hops[i]);
+ if (NULL == n)
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipping %u/%u hops ahead while routing DV Box\n",
+ i,
+ num_hops);
+ forward_dv_box (n,
+ dvb,
+ ntohs (dvb->total_hops) + 1,
+ num_hops - i - 1, /* number of hops left */
+ &hops[i + 1], /* remaining hops */
+ enc_payload,
+ enc_payload_size);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV hops skipped routing boxes",
+ i,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV boxes routed (total)",
+ 1,
+ GNUNET_NO);
finish_cmc_handling (cmc);
return;
}
+ /* Woopsie, next hop not in neighbours, drop! */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV Boxes dropped: next hop unknown",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
}
+ /* We are the target. Unbox and handle message. */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV boxes opened (ultimate target)",
+ 1,
+ GNUNET_NO);
+ cmc->total_hops = ntohs (dvb->total_hops);
- GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
- cc = cmc->tc->details.communicator.cc;
- bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
- cc); // FIXME: add bi-directional flag to cc?
- in_time = GNUNET_TIME_absolute_get ();
-
- /* continue communicator here, everything else can happen asynchronous! */
- finish_cmc_handling (cmc);
-
- n = lookup_neighbour (&dvl->initiator);
- if (NULL != n)
+ dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, &key);
+ hdr = (const char *) &dvb[1];
+ hdr_len = ntohs (dvb->header.size) - sizeof(*dvb);
+ dv_hmac (&key, &hmac, hdr, hdr_len);
+ if (0 != GNUNET_memcmp (&hmac, &dvb->hmac))
{
- if ((n->dv_monotime_available == GNUNET_YES) &&
- (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
- n->last_dv_learn_monotime.abs_value_us))
+ /* HMAC missmatch, disard! */
+ GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* begin actual decryption */
+ {
+ struct Backtalker *b;
+ struct GNUNET_TIME_Absolute monotime;
+ struct TransportDVBoxPayloadP ppay;
+ char body[hdr_len - sizeof(ppay)] GNUNET_ALIGN;
+ const struct GNUNET_MessageHeader *mh =
+ (const struct GNUNET_MessageHeader *) body;
+
+ GNUNET_assert (hdr_len >=
+ sizeof(ppay) + sizeof(struct GNUNET_MessageHeader));
+ dv_decrypt (&key, &ppay, hdr, sizeof(ppay));
+ dv_decrypt (&key, &body, &hdr[sizeof(ppay)], hdr_len - sizeof(ppay));
+ dv_key_clean (&key);
+ if (ntohs (mh->size) != sizeof(body))
{
- GNUNET_STATISTICS_update (GST_stats,
- "# DV learn discarded due to time travel",
- 1,
- GNUNET_NO);
+ GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
return;
}
- if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
- &dvl->initiator,
- &dvl->challenge,
- &dvl->init_sig))
+ /* need to prevent box-in-a-box (and DV_LEARN) so check inbox type! */
+ switch (ntohs (mh->type))
{
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX:
GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
return;
- }
- n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
- if (GNUNET_YES == n->dv_monotime_available)
- {
- if (NULL != n->sc)
- GNUNET_PEERSTORE_store_cancel (n->sc);
- n->sc =
- GNUNET_PEERSTORE_store (peerstore,
- "transport",
- &dvl->initiator,
- GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
- &dvl->monotonic_time,
- sizeof (dvl->monotonic_time),
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- &neighbour_store_dvmono_cb,
- n);
- }
- }
- /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
- If signature verification load too high, implement random drop strategy */
- for (unsigned int i = 0; i < nhops; i++)
- {
- struct DvHopPS dhp = {.purpose.purpose =
- htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
- .purpose.size = htonl (sizeof (dhp)),
- .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
- .succ = (nhops - 1 == i) ? GST_my_identity
- : hops[i + 1].hop,
- .challenge = dvl->challenge};
- if (GNUNET_OK !=
- GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
- &dhp.purpose,
- &hops[i].hop_sig,
- &hops[i].hop.public_key))
- {
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN:
GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
return;
- }
- }
-
- do_fwd = GNUNET_YES;
- if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
- {
- struct GNUNET_PeerIdentity path[nhops + 1];
- struct GNUNET_TIME_Relative host_latency_sum;
- struct GNUNET_TIME_Relative latency;
- struct GNUNET_TIME_Relative network_latency;
-
- /* We initiated this, learn the forward path! */
- path[0] = GST_my_identity;
- path[1] = hops[0].hop;
- host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
-
- // Need also something to lookup initiation time
- // to compute RTT! -> add RTT argument here?
- latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
- // (based on dvl->challenge, we can identify time of origin!)
-
- network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
- /* assumption: latency on all links is the same */
- network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
- for (unsigned int i = 2; i <= nhops; i++)
+ default:
+ /* permitted, continue */
+ break;
+ }
+ monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Decrypted backtalk from %s\n",
+ GNUNET_i2s (&ppay.sender));
+ b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
+ if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
{
- struct GNUNET_TIME_Relative ilat;
-
- /* assumption: linear latency increase per hop */
- ilat = GNUNET_TIME_relative_multiply (network_latency, i);
- path[i] = hops[i - 1].hop;
- learn_dv_path (path,
- i,
- ilat,
- GNUNET_TIME_relative_to_absolute (
- ADDRESS_VALIDATION_LIFETIME));
+ GNUNET_STATISTICS_update (
+ GST_stats,
+ "# Backchannel messages dropped: monotonic time not increasing",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
}
- /* as we initiated, do not forward again (would be circular!) */
- do_fwd = GNUNET_NO;
- return;
- }
- else if (bi_hop)
- {
- /* last hop was bi-directional, we could learn something here! */
- struct GNUNET_PeerIdentity path[nhops + 2];
-
- path[0] = GST_my_identity;
- path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
- for (unsigned int i = 0; i < nhops; i++)
+ if ((NULL == b) ||
+ (0 != GNUNET_memcmp (&b->last_ephemeral, &dvb->ephemeral_key)))
{
- int iret;
-
- if (0 == (bi_history & (1 << i)))
- break; /* i-th hop not bi-directional, stop learning! */
- if (i == nhops)
- {
- path[i + 2] = dvl->initiator;
- }
- else
- {
- path[i + 2] = hops[nhops - i - 2].hop;
- }
+ /* Check signature */
+ struct EphemeralConfirmationPS ec;
- iret = learn_dv_path (path,
- i + 2,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_TIME_UNIT_ZERO_ABS);
- if (GNUNET_SYSERR == iret)
- {
- /* path invalid or too long to be interesting for US, thus should also
- not be interesting to our neighbours, cut path when forwarding to
- 'i' hops, except of course for the one that goes back to the
- initiator */
- GNUNET_STATISTICS_update (GST_stats,
- "# DV learn not forwarded due invalidity of path",
- 1,
- GNUNET_NO);
- do_fwd = GNUNET_NO;
- break;
- }
- if ((GNUNET_NO == iret) && (nhops == i + 1))
+ ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+ ec.purpose.size = htonl (sizeof(ec));
+ ec.target = GST_my_identity;
+ ec.ephemeral_key = dvb->ephemeral_key;
+ if (
+ GNUNET_OK !=
+ GNUNET_CRYPTO_eddsa_verify (
+ GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL,
+ &ec,
+ &ppay.sender_sig,
+ &ppay.sender.public_key))
{
- /* we have better paths, and this is the longest target,
- so there cannot be anything interesting later */
- GNUNET_STATISTICS_update (GST_stats,
- "# DV learn not forwarded, got better paths",
- 1,
- GNUNET_NO);
- do_fwd = GNUNET_NO;
- break;
- }
- }
- }
-
- if (MAX_DV_HOPS_ALLOWED == nhops)
- {
- /* At limit, we're out of here! */
- finish_cmc_handling (cmc);
- return;
- }
+ /* Signature invalid, disard! */
+ GNUNET_break_op (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ }
+ /* Update sender, we now know the real origin! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "DVBox received for me from %s via %s\n",
+ GNUNET_i2s2 (&ppay.sender),
+ GNUNET_i2s (&cmc->im.sender));
+ cmc->im.sender = ppay.sender;
- /* Forward to initiator, if path non-trivial and possible */
- bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
- did_initiator = GNUNET_NO;
- if ((1 < nhops) &&
- (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_contains (neighbours, &dvl->initiator)))
- {
- /* send back to origin! */
- forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
- did_initiator = GNUNET_YES;
- }
- /* We forward under two conditions: either we still learned something
- ourselves (do_fwd), or the path was darn short and thus the initiator is
- likely to still be very interested in this (and we did NOT already
- send it back to the initiator) */
- if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
- (GNUNET_NO == did_initiator)))
- {
- /* Pick random neighbours that are not yet on the path */
- struct NeighbourSelectionContext nsc;
- unsigned int n_cnt;
+ if (NULL != b)
+ {
+ /* update key cache and mono time */
+ b->last_ephemeral = dvb->ephemeral_key;
+ b->monotonic_time = monotime;
+ update_backtalker_monotime (b);
+ b->timeout =
+ GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
- n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
- nsc.nhops = nhops;
- nsc.dvl = dvl;
- nsc.bi_history = bi_history;
- nsc.hops = hops;
- nsc.in_time = in_time;
- nsc.num_eligible = 0;
- GNUNET_CONTAINER_multipeermap_iterate (neighbours,
- &dv_neighbour_selection,
- &nsc);
- if (0 == nsc.num_eligible)
- return; /* done here, cannot forward to anyone else */
- nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
- nsc.num_selections =
- GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
- for (unsigned int i = 0; i < nsc.num_selections; i++)
- nsc.selections[i] =
- (nsc.num_selections == n_cnt)
- ? i /* all were selected, avoid collisions by chance */
- : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
- nsc.num_eligible = 0;
- GNUNET_CONTAINER_multipeermap_iterate (neighbours,
- &dv_neighbour_transmission,
- &nsc);
- }
+ demultiplex_with_cmc (cmc, mh);
+ return;
+ }
+ /* setup data structure to cache signature AND check
+ monotonic time with PEERSTORE before forwarding backchannel payload */
+ b = GNUNET_malloc (sizeof(struct Backtalker) + sizeof(body));
+ b->pid = ppay.sender;
+ b->body_size = sizeof(body);
+ memcpy (&b[1], body, sizeof(body));
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ backtalkers,
+ &b->pid,
+ b,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ b->monotonic_time = monotime; /* NOTE: to be checked still! */
+ b->cmc = cmc;
+ b->timeout =
+ GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
+ b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
+ b->get =
+ GNUNET_PEERSTORE_iterate (peerstore,
+ "transport",
+ &b->pid,
+ GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+ &backtalker_monotime_cb,
+ b);
+ } /* end actual decryption */
}
/**
- * Communicator gave us a DV box. Check the message.
+ * Client notified us about transmission from a peer. Process the request.
*
- * @param cls a `struct CommunicatorMessageContext`
- * @param dvb the send message that was sent
+ * @param cls a `struct TransportClient` which sent us the message
+ * @param obm the send message that was sent
* @return #GNUNET_YES if message is well-formed
*/
static int
-check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
+check_incoming_msg (void *cls,
+ const struct GNUNET_TRANSPORT_IncomingMessage *im)
{
- uint16_t size = ntohs (dvb->header.size);
- uint16_t num_hops = ntohs (dvb->num_hops);
- const struct GNUNET_PeerIdentity *hops =
- (const struct GNUNET_PeerIdentity *) &dvb[1];
- const struct GNUNET_MessageHeader *inbox =
- (const struct GNUNET_MessageHeader *) &hops[num_hops];
- uint16_t isize;
- uint16_t itype;
+ struct TransportClient *tc = cls;
- (void) cls;
- if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) +
- sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- isize = ntohs (inbox->size);
- if (size !=
- sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- itype = ntohs (inbox->type);
- if ((GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
- (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (0 == GNUNET_memcmp (&dvb->origin, &GST_my_identity))
+ if (CT_COMMUNICATOR != tc->type)
{
- GNUNET_break_op (0);
+ GNUNET_break (0);
return GNUNET_SYSERR;
}
- return GNUNET_YES;
+ GNUNET_MQ_check_boxed_message (im);
+ return GNUNET_OK;
}
/**
- * Create a DV Box message and queue it for transmission to
- * @ea next_hop.
+ * Closure for #check_known_address.
+ */
+struct CheckKnownAddressContext
+{
+ /**
+ * Set to the address we are looking for.
+ */
+ const char *address;
+
+ /**
+ * Set to a matching validation state, if one was found.
+ */
+ struct ValidationState *vs;
+};
+
+
+/**
+ * Test if the validation state in @a value matches the
+ * address from @a cls.
*
- * @param next_hop peer to receive the message next
- * @param total_hops how many hops did the message take so far
- * @param num_hops length of the @a hops array
- * @param origin origin of the message
- * @param hops next peer(s) to the destination, including destination
- * @param payload payload of the box
- * @param payload_size number of bytes in @a payload
+ * @param cls a `struct CheckKnownAddressContext`
+ * @param pid unused (must match though)
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
+ */
+static int
+check_known_address (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct CheckKnownAddressContext *ckac = cls;
+ struct ValidationState *vs = value;
+
+ (void) pid;
+ if (0 != strcmp (vs->address, ckac->address))
+ return GNUNET_OK;
+ ckac->vs = vs;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Task run periodically to validate some address based on #validation_heap.
+ *
+ * @param cls NULL
*/
static void
-forward_dv_box (struct Neighbour *next_hop,
- uint16_t total_hops,
- uint16_t num_hops,
- const struct GNUNET_PeerIdentity *origin,
- const struct GNUNET_PeerIdentity *hops,
- const void *payload,
- uint16_t payload_size)
+validation_start_cb (void *cls);
+
+
+/**
+ * Set the time for next_challenge of @a vs to @a new_time.
+ * Updates the heap and if necessary reschedules the job.
+ *
+ * @param vs validation state to update
+ * @param new_time new time for revalidation
+ */
+static void
+update_next_challenge_time (struct ValidationState *vs,
+ struct GNUNET_TIME_Absolute new_time)
{
- struct TransportDVBoxMessage *dvb;
+ struct GNUNET_TIME_Relative delta;
- dvb = create_dv_box (total_hops,
- origin,
- &hops[num_hops - 1] /* == target */,
- num_hops - 1 /* do not count target twice */,
- hops,
- payload,
- payload_size);
- route_message (&next_hop->pid, &dvb->header, RMO_NONE);
- GNUNET_free (dvb);
+ if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
+ return; /* be lazy */
+ vs->next_challenge = new_time;
+ if (NULL == vs->hn)
+ vs->hn =
+ GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
+ else
+ GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
+ if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
+ (NULL != validation_task))
+ return;
+ if (NULL != validation_task)
+ GNUNET_SCHEDULER_cancel (validation_task);
+ /* randomize a bit */
+ delta.rel_value_us =
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
+ new_time = GNUNET_TIME_absolute_add (new_time, delta);
+ validation_task =
+ GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
}
/**
- * Communicator gave us a DV box. Process the request.
+ * Start address validation.
*
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param dvb the message that was received
+ * @param pid peer the @a address is for
+ * @param address an address to reach @a pid (presumably)
*/
static void
-handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
+start_address_validation (const struct GNUNET_PeerIdentity *pid,
+ const char *address)
{
- struct CommunicatorMessageContext *cmc = cls;
- uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
- uint16_t num_hops = ntohs (dvb->num_hops);
- const struct GNUNET_PeerIdentity *hops =
- (const struct GNUNET_PeerIdentity *) &dvb[1];
- const struct GNUNET_MessageHeader *inbox =
- (const struct GNUNET_MessageHeader *) &hops[num_hops];
+ struct GNUNET_TIME_Absolute now;
+ struct ValidationState *vs;
+ struct CheckKnownAddressContext ckac = { .address = address, .vs = NULL };
- if (num_hops > 0)
+ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+ pid,
+ &check_known_address,
+ &ckac);
+ if (NULL != (vs = ckac.vs))
{
- /* We're trying from the end of the hops array, as we may be
- able to find a shortcut unknown to the origin that way */
- for (int i = num_hops - 1; i >= 0; i--)
+ /* if 'vs' is not currently valid, we need to speed up retrying the
+ * validation */
+ if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
{
- struct Neighbour *n;
-
- if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
- {
- GNUNET_break_op (0);
- finish_cmc_handling (cmc);
- return;
- }
- n = lookup_neighbour (&hops[i]);
- if (NULL == n)
- continue;
- forward_dv_box (n,
- ntohs (dvb->total_hops) + 1,
- num_hops - i - 1, /* number of hops left */
- &dvb->origin,
- &hops[i + 1], /* remaining hops */
- (const void *) &dvb[1],
- size);
- finish_cmc_handling (cmc);
- return;
+ /* reduce backoff as we got a fresh advertisement */
+ vs->challenge_backoff =
+ GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
+ GNUNET_TIME_relative_divide (
+ vs->challenge_backoff,
+ 2));
+ update_next_challenge_time (vs,
+ GNUNET_TIME_relative_to_absolute (
+ vs->challenge_backoff));
}
- /* Woopsie, next hop not in neighbours, drop! */
- GNUNET_STATISTICS_update (GST_stats,
- "# DV Boxes dropped: next hop unknown",
- 1,
- GNUNET_NO);
- finish_cmc_handling (cmc);
return;
}
- /* We are the target. Unbox and handle message. */
- cmc->im.sender = dvb->origin;
- cmc->total_hops = ntohs (dvb->total_hops);
- demultiplex_with_cmc (cmc, inbox);
+ now = GNUNET_TIME_absolute_get ();
+ vs = GNUNET_new (struct ValidationState);
+ vs->pid = *pid;
+ vs->valid_until =
+ GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
+ vs->first_challenge_use = now;
+ vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &vs->challenge,
+ sizeof(vs->challenge));
+ vs->address = GNUNET_strdup (address);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting address validation `%s' of peer %s using challenge %s\n",
+ address,
+ GNUNET_i2s (pid),
+ GNUNET_sh2s (&vs->challenge.value));
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ validation_map,
+ &vs->pid,
+ vs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ update_next_challenge_time (vs, now);
}
/**
- * Client notified us about transmission from a peer. Process the request.
+ * Function called by PEERSTORE for each matching record.
*
- * @param cls a `struct TransportClient` which sent us the message
- * @param obm the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param cls closure, a `struct IncomingRequest`
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
*/
-static int
-check_incoming_msg (void *cls,
- const struct GNUNET_TRANSPORT_IncomingMessage *im)
+static void
+handle_hello_for_incoming (void *cls,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
- struct TransportClient *tc = cls;
+ struct IncomingRequest *ir = cls;
+ const char *val;
- if (CT_COMMUNICATOR != tc->type)
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got failure from PEERSTORE: %s\n",
+ emsg);
+ return;
+ }
+ val = record->value;
+ if ((0 == record->value_size) || ('\0' != val[record->value_size - 1]))
{
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return;
}
- GNUNET_MQ_check_boxed_message (im);
- return GNUNET_OK;
+ start_address_validation (&ir->pid, (const char *) record->value);
}
const struct TransportValidationChallengeMessage *tvc)
{
struct CommunicatorMessageContext *cmc = cls;
- struct TransportValidationResponseMessage *tvr;
+ struct TransportValidationResponseMessage tvr;
+ struct VirtualLink *vl;
+ struct GNUNET_TIME_RelativeNBO validity_duration;
+ struct IncomingRequest *ir;
+ struct Neighbour *n;
+ struct GNUNET_PeerIdentity sender;
+ /* DV-routed messages are not allowed for validation challenges */
if (cmc->total_hops > 0)
{
- /* DV routing is not allowed for validation challenges! */
GNUNET_break_op (0);
finish_cmc_handling (cmc);
return;
}
- tvr = GNUNET_new (struct TransportValidationResponseMessage);
- tvr->header.type =
+ validity_duration = cmc->im.expected_address_validity;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received address validation challenge %s\n",
+ GNUNET_sh2s (&tvc->challenge.value));
+ /* If we have a virtual link, we use this mechanism to signal the
+ size of the flow control window, and to allow the sender
+ to ask for increases. If for us the virtual link is still down,
+ we will always give a window size of zero. */
+ tvr.header.type =
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
- tvr->header.size = htons (sizeof (*tvr));
- tvr->challenge = tvc->challenge;
- tvr->origin_time = tvc->sender_time;
- tvr->validity_duration = cmc->im.expected_address_validity;
+ tvr.header.size = htons (sizeof(tvr));
+ tvr.reserved = htonl (0);
+ tvr.challenge = tvc->challenge;
+ tvr.origin_time = tvc->sender_time;
+ tvr.validity_duration = validity_duration;
{
/* create signature */
- struct TransportValidationPS tvp =
- {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
- .purpose.size = htonl (sizeof (tvp)),
- .validity_duration = tvr->validity_duration,
- .challenge = tvc->challenge};
-
- GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
- &tvp.purpose,
- &tvr->signature));
- }
- route_message (&cmc->im.sender,
- &tvr->header,
- RMO_ANYTHING_GOES | RMO_REDUNDANT);
+ struct TransportValidationPS tvp = {
+ .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+ .purpose.size = htonl (sizeof(tvp)),
+ .validity_duration = validity_duration,
+ .challenge = tvc->challenge
+ };
+
+ GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+ &tvp,
+ &tvr.signature);
+ }
+ route_control_message_without_fc (&cmc->im.sender,
+ &tvr.header,
+ RMO_ANYTHING_GOES | RMO_REDUNDANT);
+ sender = cmc->im.sender;
finish_cmc_handling (cmc);
+ vl = lookup_virtual_link (&sender);
+ if (NULL != vl)
+ return;
+
+ /* For us, the link is still down, but we need bi-directional
+ connections (for flow-control and for this to be useful for
+ CORE), so we must try to bring the link up! */
+
+ /* (1) Check existing queues, if any, we may be lucky! */
+ n = lookup_neighbour (&sender);
+ if (NULL != n)
+ for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+ start_address_validation (&sender, q->address);
+ /* (2) Also try to see if we have addresses in PEERSTORE for this peer
+ we could use */
+ for (ir = ir_head; NULL != ir; ir = ir->next)
+ if (0 == GNUNET_memcmp (&ir->pid, &sender))
+ return;
+ /* we are already trying */
+ ir = GNUNET_new (struct IncomingRequest);
+ ir->pid = sender;
+ GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
+ ir->wc = GNUNET_PEERSTORE_watch (peerstore,
+ "transport",
+ &ir->pid,
+ GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
+ &handle_hello_for_incoming,
+ ir);
+ ir_total++;
+ /* Bound attempts we do in parallel here, might otherwise get excessive */
+ while (ir_total > MAX_INCOMING_REQUEST)
+ free_incoming_request (ir_head);
}
}
-/**
- * Task run periodically to validate some address based on #validation_heap.
- *
- * @param cls NULL
- */
-static void
-validation_start_cb (void *cls);
-
-
-/**
- * Set the time for next_challenge of @a vs to @a new_time.
- * Updates the heap and if necessary reschedules the job.
- *
- * @param vs validation state to update
- * @param new_time new time for revalidation
- */
-static void
-update_next_challenge_time (struct ValidationState *vs,
- struct GNUNET_TIME_Absolute new_time)
-{
- struct GNUNET_TIME_Relative delta;
-
- if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
- return; /* be lazy */
- vs->next_challenge = new_time;
- if (NULL == vs->hn)
- vs->hn =
- GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
- else
- GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us);
- if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
- (NULL != validation_task))
- return;
- if (NULL != validation_task)
- GNUNET_SCHEDULER_cancel (validation_task);
- /* randomize a bit */
- delta.rel_value_us =
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
- MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
- new_time = GNUNET_TIME_absolute_add (new_time, delta);
- validation_task =
- GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
-}
-
-
/**
* Find the queue matching @a pid and @a address.
*
{
struct CommunicatorMessageContext *cmc = cls;
struct ValidationState *vs;
- struct CheckKnownChallengeContext ckac = {.challenge = &tvr->challenge,
- .vs = NULL};
+ struct CheckKnownChallengeContext ckac = { .challenge = &tvr->challenge,
+ .vs = NULL };
struct GNUNET_TIME_Absolute origin_time;
struct Queue *q;
struct Neighbour *n;
"# Validations dropped, challenge unknown",
1,
GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Validation response %s dropped, challenge unknown\n",
+ GNUNET_sh2s (&tvr->challenge.value));
finish_cmc_handling (cmc);
return;
}
{
/* check signature */
- struct TransportValidationPS tvp =
- {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
- .purpose.size = htonl (sizeof (tvp)),
- .validity_duration = tvr->validity_duration,
- .challenge = tvr->challenge};
+ struct TransportValidationPS tvp = {
+ .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
+ .purpose.size = htonl (sizeof(tvp)),
+ .validity_duration = tvr->validity_duration,
+ .challenge = tvr->challenge
+ };
if (
GNUNET_OK !=
GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
- &tvp.purpose,
+ &tvp,
&tvr->signature,
&cmc->im.sender.public_key))
{
vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&vs->challenge,
- sizeof (vs->challenge));
+ sizeof(vs->challenge));
vs->first_challenge_use = GNUNET_TIME_absolute_subtract (
vs->validated_until,
GNUNET_TIME_relative_multiply (vs->validation_rtt,
vs->last_challenge_use =
GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
update_next_challenge_time (vs, vs->first_challenge_use);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Validation response %s accepted, address valid until %s\n",
+ GNUNET_sh2s (&tvr->challenge.value),
+ GNUNET_STRINGS_absolute_time_to_string (vs->valid_until));
vs->sc = GNUNET_PEERSTORE_store (peerstore,
"transport",
&cmc->im.sender,
q->validated_until = vs->validated_until;
q->pd.aged_rtt = vs->validation_rtt;
n = q->neighbour;
- vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+ vl = lookup_virtual_link (&vs->pid);
if (NULL != vl)
{
/* Link was already up, remember n is also now available and we are done */
- vl->n = n;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Virtual link to %s could now also direct neighbour!\n",
- GNUNET_i2s (&vs->pid));
+ if (NULL == vl->n)
+ {
+ vl->n = n;
+ n->vl = vl;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Virtual link to %s could now also direct neighbour!\n",
+ GNUNET_i2s (&vs->pid));
+ }
+ else
+ {
+ GNUNET_assert (n == vl->n);
+ }
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
vl = GNUNET_new (struct VirtualLink);
vl->target = n->pid;
vl->n = n;
+ n->vl = vl;
vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
vl->visibility_task =
GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
GNUNET_break (GNUNET_YES ==
&vl->target,
vl,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ consider_sending_fc (vl);
/* We lacked a confirmed connection to the target
before, so tell CORE about it (finally!) */
cores_send_connect_info (&n->pid);
cmc->tc = tc;
cmc->im = *im;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message via communicator from peer %s\n",
+ GNUNET_i2s (&im->sender));
demultiplex_with_cmc (cmc, (const struct GNUNET_MessageHeader *) &im[1]);
}
+/**
+ * Communicator gave us a transport address validation response. Process the
+ * request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param fc the message that was received
+ */
+static void
+handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+ struct VirtualLink *vl;
+ uint32_t seq;
+ struct GNUNET_TIME_Absolute st;
+ uint64_t os;
+ uint64_t wnd;
+
+ vl = lookup_virtual_link (&cmc->im.sender);
+ if (NULL == vl)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# FC dropped: virtual link unknown",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
+ if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
+ {
+ /* out of order, drop */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# FC dropped: message out of order",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ seq = ntohl (fc->seq);
+ if (seq < vl->last_fc_seq)
+ {
+ /* Wrap-around/reset of other peer; start all counters from zero */
+ vl->outbound_fc_window_size_used = 0;
+ }
+ vl->last_fc_seq = seq;
+ vl->last_fc_timestamp = st;
+ vl->outbound_fc_window_size = GNUNET_ntohll (fc->inbound_window_size);
+ os = GNUNET_ntohll (fc->outbound_sent);
+ vl->incoming_fc_window_size_loss =
+ (int64_t) (os - vl->incoming_fc_window_size_used);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received FC from %s, seq %u, new window %llu (loss at %lld)\n",
+ GNUNET_i2s (&vl->target),
+ (unsigned int) seq,
+ (unsigned long long) vl->outbound_fc_window_size,
+ (long long) vl->incoming_fc_window_size_loss);
+ wnd = GNUNET_ntohll (fc->outbound_window_size);
+ if ((wnd < vl->incoming_fc_window_size) ||
+ (vl->last_outbound_window_size_received != wnd) ||
+ (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX)
+ % FC_NO_CHANGE_REPLY_PROBABILITY))
+ {
+ /* Consider re-sending our FC message, as clearly the
+ other peer's idea of the window is not up-to-date */
+ consider_sending_fc (vl);
+ }
+ if ((wnd == vl->incoming_fc_window_size) &&
+ (vl->last_outbound_window_size_received == wnd) &&
+ (NULL != vl->fc_retransmit_task))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Stopping FC retransmission to %s: peer is current at window %llu\n",
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) wnd);
+ GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ vl->fc_retransmit_task = NULL;
+ }
+ vl->last_outbound_window_size_received = wnd;
+ /* FC window likely increased, check transmission possibilities! */
+ check_vl_transmission (vl);
+ finish_cmc_handling (cmc);
+}
+
+
/**
* Given an inbound message @a msg from a communicator @a cmc,
* demultiplex it based on the type calling the right handler.
const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_MQ_MessageHandler handlers[] =
- {GNUNET_MQ_hd_var_size (fragment_box,
- GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
- struct TransportFragmentBoxMessage,
- &cmc),
- GNUNET_MQ_hd_var_size (reliability_box,
- GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
- struct TransportReliabilityBoxMessage,
- &cmc),
- GNUNET_MQ_hd_var_size (reliability_ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
- struct TransportReliabilityAckMessage,
- &cmc),
- GNUNET_MQ_hd_var_size (backchannel_encapsulation,
- GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
- struct TransportBackchannelEncapsulationMessage,
- &cmc),
- GNUNET_MQ_hd_var_size (dv_learn,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
- struct TransportDVLearnMessage,
- &cmc),
- GNUNET_MQ_hd_var_size (dv_box,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
- struct TransportDVBoxMessage,
- &cmc),
- GNUNET_MQ_hd_fixed_size (
- validation_challenge,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
- struct TransportValidationChallengeMessage,
- &cmc),
- GNUNET_MQ_hd_fixed_size (
- validation_response,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
- struct TransportValidationResponseMessage,
- &cmc),
- GNUNET_MQ_handler_end ()};
+ { GNUNET_MQ_hd_var_size (fragment_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+ struct TransportFragmentBoxMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (reliability_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+ struct TransportReliabilityBoxMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (reliability_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+ struct TransportReliabilityAckMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+ struct TransportBackchannelEncapsulationMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (dv_learn,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+ struct TransportDVLearnMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (dv_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+ struct TransportDVBoxMessage,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (
+ validation_challenge,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
+ struct TransportValidationChallengeMessage,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (flow_control,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL,
+ struct TransportFlowControlMessage,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (
+ validation_response,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
+ struct TransportValidationResponseMessage,
+ &cmc),
+ GNUNET_MQ_handler_end () };
int ret;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Handling message of type %u with %u bytes\n",
+ (unsigned int) ntohs (msg->type),
+ (unsigned int) ntohs (msg->size));
ret = GNUNET_MQ_handle_message (handlers, msg);
if (GNUNET_SYSERR == ret)
{
{
if (pm->msg_uuid_set)
return;
- pm->msg_uuid.uuid = pm->target->message_uuid_ctr++;
+ pm->msg_uuid.uuid = pm->vl->message_uuid_ctr++;
pm->msg_uuid_set = GNUNET_YES;
}
{
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&pa->ack_uuid,
- sizeof (pa->ack_uuid));
- } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
- pending_acks,
- &pa->ack_uuid.value,
- pa,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ sizeof(pa->ack_uuid));
+ }
+ while (GNUNET_YES != GNUNET_CONTAINER_multiuuidmap_put (
+ pending_acks,
+ &pa->ack_uuid.value,
+ pa,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
if (NULL != dvh)
GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
pa->transmission_time = GNUNET_TIME_absolute_get ();
pa->message_size = pm->bytes_msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Waiting for ACKnowledgment `%s' for <%llu>\n",
+ GNUNET_uuid2s (&pa->ack_uuid.value),
+ pm->logging_uuid);
return pa;
}
struct PendingMessage *ff;
uint16_t mtu;
- pa = prepare_pending_acknowledgement (queue, dvh, pm);
mtu = (0 == queue->mtu)
- ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
- : queue->mtu;
+ ? UINT16_MAX - sizeof(struct GNUNET_TRANSPORT_SendMessageTo)
+ : queue->mtu;
set_pending_message_uuid (pm);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmenting message %llu <%llu> to %s for MTU %u\n",
+ (unsigned long long) pm->msg_uuid.uuid,
+ pm->logging_uuid,
+ GNUNET_i2s (&pm->vl->target),
+ (unsigned int) mtu);
+ pa = prepare_pending_acknowledgement (queue, dvh, pm);
/* This invariant is established in #handle_add_queue_message() */
- GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
+ GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage));
/* select fragment for transmission, descending the tree if it has
been expanded until we are at a leaf or at a fragment that is small
while (((ff->bytes_msg > mtu) || (pm == ff)) &&
(ff->frag_off == ff->bytes_msg) && (NULL != ff->head_frag))
{
- ff = ff->head_frag; /* descent into fragmented fragments */
+ ff = ff->head_frag; /* descent into fragmented fragments */
}
if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg))
const struct TransportFragmentBoxMessage *tfbo;
tfbo = (const struct TransportFragmentBoxMessage *) orig;
- orig += sizeof (struct TransportFragmentBoxMessage);
- msize -= sizeof (struct TransportFragmentBoxMessage);
+ orig += sizeof(struct TransportFragmentBoxMessage);
+ msize -= sizeof(struct TransportFragmentBoxMessage);
xoff = ntohs (tfbo->frag_off);
}
- fragmax = mtu - sizeof (struct TransportFragmentBoxMessage);
+ fragmax = mtu - sizeof(struct TransportFragmentBoxMessage);
fragsize = GNUNET_MIN (msize - ff->frag_off, fragmax);
frag =
- GNUNET_malloc (sizeof (struct PendingMessage) +
- sizeof (struct TransportFragmentBoxMessage) + fragsize);
- frag->target = pm->target;
+ GNUNET_malloc (sizeof(struct PendingMessage)
+ + sizeof(struct TransportFragmentBoxMessage) + fragsize);
+ frag->logging_uuid = logging_uuid_gen++;
+ frag->vl = pm->vl;
frag->frag_parent = ff;
frag->timeout = pm->timeout;
- frag->bytes_msg = sizeof (struct TransportFragmentBoxMessage) + fragsize;
+ frag->bytes_msg = sizeof(struct TransportFragmentBoxMessage) + fragsize;
frag->pmt = PMT_FRAGMENT_BOX;
msg = (char *) &frag[1];
tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
tfb.header.size =
- htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
+ htons (sizeof(struct TransportFragmentBoxMessage) + fragsize);
tfb.ack_uuid = pa->ack_uuid;
tfb.msg_uuid = pm->msg_uuid;
tfb.frag_off = htons (ff->frag_off + xoff);
tfb.msg_size = htons (pm->bytes_msg);
- memcpy (msg, &tfb, sizeof (tfb));
- memcpy (&msg[sizeof (tfb)], &orig[ff->frag_off], fragsize);
+ memcpy (msg, &tfb, sizeof(tfb));
+ memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize);
GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag);
ff->frag_off += fragsize;
ff = frag;
if (NULL != pm->bpm)
return pm->bpm; /* already computed earlier: do nothing */
GNUNET_assert (NULL == pm->head_frag);
- if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
+ if (pm->bytes_msg + sizeof(rbox) > UINT16_MAX)
{
/* failed hard */
GNUNET_break (0);
client_send_response (pm);
return NULL;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing reliability box for message <%llu> to %s on queue %s\n",
+ pm->logging_uuid,
+ GNUNET_i2s (&pm->vl->target),
+ queue->address);
pa = prepare_pending_acknowledgement (queue, dvh, pm);
- bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
- pm->bytes_msg);
- bpm->target = pm->target;
+ bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox)
+ + pm->bytes_msg);
+ bpm->logging_uuid = logging_uuid_gen++;
+ bpm->vl = pm->vl;
bpm->frag_parent = pm;
GNUNET_CONTAINER_MDLL_insert (frag, pm->head_frag, pm->tail_frag, bpm);
bpm->timeout = pm->timeout;
bpm->pmt = PMT_RELIABILITY_BOX;
- bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
+ bpm->bytes_msg = pm->bytes_msg + sizeof(rbox);
set_pending_message_uuid (bpm);
rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
- rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
- rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
+ rbox.header.size = htons (sizeof(rbox) + pm->bytes_msg);
+ rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
rbox.ack_uuid = pa->ack_uuid;
msg = (char *) &bpm[1];
- memcpy (msg, &rbox, sizeof (rbox));
- memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
+ memcpy (msg, &rbox, sizeof(rbox));
+ memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg);
pm->bpm = bpm;
return bpm;
}
* to @a next_attempt and re-order @a pm in the transmission
* list as required by the new timestmap.
*
- * @param pm a pending message to update
- * @param next_attempt timestamp to use
+ * @param pm a pending message to update
+ * @param next_attempt timestamp to use
+ */
+static void
+update_pm_next_attempt (struct PendingMessage *pm,
+ struct GNUNET_TIME_Absolute next_attempt)
+{
+ struct VirtualLink *vl = pm->vl;
+
+ pm->next_attempt = next_attempt;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Next attempt for message <%llu> set to %s\n",
+ pm->logging_uuid,
+ GNUNET_STRINGS_absolute_time_to_string (next_attempt));
+
+ if (NULL == pm->frag_parent)
+ {
+ struct PendingMessage *pos;
+
+ /* re-insert sort in neighbour list */
+ GNUNET_CONTAINER_MDLL_remove (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pm);
+ pos = vl->pending_msg_tail;
+ while ((NULL != pos) &&
+ (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+ pos = pos->prev_vl;
+ GNUNET_CONTAINER_MDLL_insert_after (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pos,
+ pm);
+ }
+ else
+ {
+ /* re-insert sort in fragment list */
+ struct PendingMessage *fp = pm->frag_parent;
+ struct PendingMessage *pos;
+
+ GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm);
+ pos = fp->tail_frag;
+ while ((NULL != pos) &&
+ (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+ pos = pos->prev_frag;
+ GNUNET_CONTAINER_MDLL_insert_after (frag,
+ fp->head_frag,
+ fp->tail_frag,
+ pos,
+ pm);
+ }
+}
+
+
+/**
+ * Context for #select_best_pending_from_link().
+ */
+struct PendingMessageScoreContext
+{
+ /**
+ * Set to the best message that was found, NULL for none.
+ */
+ struct PendingMessage *best;
+
+ /**
+ * DVH that @e best should take, or NULL for direct transmission.
+ */
+ struct DistanceVectorHop *dvh;
+
+ /**
+ * What is the estimated total overhead for this message?
+ */
+ size_t real_overhead;
+
+ /**
+ * Number of pending messages we seriously considered this time.
+ */
+ unsigned int consideration_counter;
+
+ /**
+ * Did we have to fragment?
+ */
+ int frag;
+
+ /**
+ * Did we have to reliability box?
+ */
+ int relb;
+};
+
+
+/**
+ * Select the best pending message from @a vl for transmission
+ * via @a queue.
+ *
+ * @param sc[in,out] best message so far (NULL for none), plus scoring data
+ * @param queue the queue that will be used for transmission
+ * @param vl the virtual link providing the messages
+ * @param dvh path we are currently considering, or NULL for none
+ * @param overhead number of bytes of overhead to be expected
+ * from DV encapsulation (0 for without DV)
+ */
+static void
+select_best_pending_from_link (struct PendingMessageScoreContext *sc,
+ struct Queue *queue,
+ struct VirtualLink *vl,
+ struct DistanceVectorHop *dvh,
+ size_t overhead)
+{
+ struct GNUNET_TIME_Absolute now;
+
+ now = GNUNET_TIME_absolute_get ();
+ for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos;
+ pos = pos->next_vl)
+ {
+ size_t real_overhead = overhead;
+ int frag;
+ int relb;
+
+ if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
+ continue; /* DV messages must not be DV-routed to next hop! */
+ if (pos->next_attempt.abs_value_us > now.abs_value_us)
+ break; /* too early for all messages, they are sorted by next_attempt */
+ if (NULL != pos->qe)
+ continue; /* not eligible */
+ sc->consideration_counter++;
+ /* determine if we have to fragment, if so add fragmentation
+ overhead! */
+ frag = GNUNET_NO;
+ if (((0 != queue->mtu) &&
+ (pos->bytes_msg + real_overhead > queue->mtu)) ||
+ (pos->bytes_msg > UINT16_MAX - sizeof(struct
+ GNUNET_TRANSPORT_SendMessageTo))
+ ||
+ (NULL != pos->head_frag /* fragments already exist, should
+ respect that even if MTU is 0 for
+ this queue */))
+ {
+ frag = GNUNET_YES;
+ if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+ {
+ /* FIXME-FRAG-REL-UUID: we could use an optimized, shorter fragmentation
+ header without the ACK UUID when using a *reliable* channel! */
+ }
+ real_overhead = overhead + sizeof(struct TransportFragmentBoxMessage);
+ }
+ /* determine if we have to reliability-box, if so add reliability box
+ overhead */
+ relb = GNUNET_NO;
+ if ((GNUNET_NO == frag) &&
+ (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) &&
+ (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc))
+ {
+ relb = GNUNET_YES;
+ real_overhead += sizeof(struct TransportReliabilityBoxMessage);
+ }
+
+ /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
+ message would beat it! */
+ if (NULL != sc->best)
+ {
+ /* CHECK if pos fits queue BETTER (=smaller) than pm, if not: continue;
+ OPTIMIZE-ME: This is a heuristic, which so far has NOT been
+ experimentally validated. There may be some huge potential for
+ improvement here. Also, we right now only compare how well the
+ given message fits _this_ queue, and do not consider how well other
+ queues might suit the message. Taking other queues into consideration
+ may further improve the result, but could also be expensive
+ in terms of CPU time. */long long sc_score = sc->frag * 40 + sc->relb * 20 + sc->real_overhead;
+ long long pm_score = frag * 40 + relb * 20 + real_overhead;
+ long long time_delta =
+ (sc->best->next_attempt.abs_value_us - pos->next_attempt.abs_value_us)
+ / 1000LL;
+
+ /* "time_delta" considers which message has been 'ready' for transmission
+ for longer, if a message has a preference for low latency, increase
+ the weight of the time_delta by 10x if it is favorable for that message */
+ if ((0 != (pos->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+ (0 != (sc->best->prefs & GNUNET_MQ_PREF_LOW_LATENCY)))
+ time_delta *= 10; /* increase weight (always, both are low latency) */
+ else if ((0 != (pos->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+ (time_delta > 0))
+ time_delta *=
+ 10; /* increase weight, favors 'pos', which is low latency */
+ else if ((0 != (sc->best->prefs & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+ (time_delta < 0))
+ time_delta *=
+ 10; /* increase weight, favors 'sc->best', which is low latency */
+ if (0 != queue->mtu)
+ {
+ /* Grant bonus if we are bellow MTU, larger bonus the closer we will
+ be to the MTU */
+ if (queue->mtu > sc->real_overhead + sc->best->bytes_msg)
+ sc_score -= queue->mtu - (sc->real_overhead + sc->best->bytes_msg);
+ if (queue->mtu > real_overhead + pos->bytes_msg)
+ pm_score -= queue->mtu - (real_overhead + pos->bytes_msg);
+ }
+ if (sc_score + time_delta > pm_score)
+ continue; /* sc_score larger, keep sc->best */
+ }
+ sc->best = pos;
+ sc->dvh = dvh;
+ sc->frag = frag;
+ sc->relb = relb;
+ }
+}
+
+
+/**
+ * Function to call to further operate on the now DV encapsulated
+ * message @a hdr, forwarding it via @a next_hop under respect of
+ * @a options.
+ *
+ * @param cls a `struct PendingMessageScoreContext`
+ * @param next_hop next hop of the DV path
+ * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
+ * @param options options of the original message
*/
static void
-update_pm_next_attempt (struct PendingMessage *pm,
- struct GNUNET_TIME_Absolute next_attempt)
+extract_box_cb (void *cls,
+ struct Neighbour *next_hop,
+ const struct GNUNET_MessageHeader *hdr,
+ enum RouteMessageOptions options)
{
- struct Neighbour *neighbour = pm->target;
-
- pm->next_attempt = next_attempt;
- if (NULL == pm->frag_parent)
- {
- struct PendingMessage *pos;
-
- /* re-insert sort in neighbour list */
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pm);
- pos = neighbour->pending_msg_tail;
- while ((NULL != pos) &&
- (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
- pos = pos->prev_neighbour;
- GNUNET_CONTAINER_MDLL_insert_after (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pos,
- pm);
- }
- else
- {
- /* re-insert sort in fragment list */
- struct PendingMessage *fp = pm->frag_parent;
- struct PendingMessage *pos;
+ struct PendingMessageScoreContext *sc = cls;
+ struct PendingMessage *pm = sc->best;
+ struct PendingMessage *bpm;
+ uint16_t bsize = ntohs (hdr->size);
- GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm);
- pos = fp->tail_frag;
- while ((NULL != pos) &&
- (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
- pos = pos->prev_frag;
- GNUNET_CONTAINER_MDLL_insert_after (frag,
- fp->head_frag,
- fp->tail_frag,
- pos,
- pm);
- }
+ GNUNET_assert (NULL == pm->bpm);
+ bpm = GNUNET_malloc (sizeof(struct PendingMessage) + bsize);
+ bpm->logging_uuid = logging_uuid_gen++;
+ bpm->pmt = PMT_DV_BOX;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating DV Box %llu for original message %llu (next hop is %s)\n",
+ bpm->logging_uuid,
+ pm->logging_uuid,
+ GNUNET_i2s (&next_hop->pid));
+ memcpy (&bpm[1], hdr, bsize);
+ pm->bpm = bpm;
}
/**
- * We believe we are ready to transmit a message on a queue.
- * Gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * We believe we are ready to transmit a `struct PendingMessage` on a
+ * queue, the big question is which one! We need to see if there is
+ * one pending that is allowed by flow control and congestion control
+ * and (ideally) matches our queue's performance profile.
+ *
+ * If such a message is found, we give the message to the communicator
+ * for transmission (updating the tracker, and re-scheduling ourselves
+ * if applicable).
+ *
+ * If no such message is found, the queue's `idle` field must be set
+ * to #GNUNET_YES.
*
* @param cls the `struct Queue` to process transmissions for
*/
{
struct Queue *queue = cls;
struct Neighbour *n = queue->neighbour;
+ struct PendingMessageScoreContext sc;
struct PendingMessage *pm;
- struct PendingMessage *s;
- uint32_t overhead;
queue->transmit_task = NULL;
- if (NULL == (pm = n->pending_msg_head))
+ if (NULL == n->vl)
{
- /* no message pending, nothing to do here! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Virtual link `%s' is down, cannot have PM for queue `%s'\n",
+ GNUNET_i2s (&n->pid),
+ queue->address);
+ queue->idle = GNUNET_YES;
return;
}
- if (NULL != pm->qe)
+ memset (&sc, 0, sizeof(sc));
+ select_best_pending_from_link (&sc, queue, n->vl, NULL, 0);
+ if (NULL == sc.best)
{
- /* message still pending with communciator!
- LOGGING-FIXME: Use stats? logging? Should this not be rare? */
- return;
- }
- schedule_transmit_on_queue (queue, GNUNET_YES);
- if (NULL != queue->transmit_task)
- return; /* do it later */
- overhead = 0;
- if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- overhead += sizeof (struct TransportReliabilityBoxMessage);
- s = pm;
- if ( ( (0 != queue->mtu) &&
- (pm->bytes_msg + overhead > queue->mtu) ) ||
- (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
- (NULL != pm->head_frag /* fragments already exist, should
- respect that even if MTU is 0 for
- this queue */) )
- s = fragment_message (queue, pm->dvh, s);
- if (NULL == s)
- {
- /* Fragmentation failed, try next message... */
- schedule_transmit_on_queue (queue, GNUNET_NO);
- return;
+ /* Also look at DVH that have the n as first hop! */
+ for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh;
+ dvh = dvh->next_neighbour)
+ {
+ select_best_pending_from_link (&sc,
+ queue,
+ dvh->dv->vl,
+ dvh,
+ sizeof(struct GNUNET_PeerIdentity)
+ * (1 + dvh->distance)
+ + sizeof(struct TransportDVBoxMessage)
+ + sizeof(struct TransportDVBoxPayloadP));
+ }
}
- if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
- s = reliability_box_message (queue, pm->dvh, s);
- if (NULL == s)
+ if (NULL == sc.best)
{
- /* Reliability boxing failed, try next message... */
- schedule_transmit_on_queue (queue, GNUNET_NO);
+ /* no message pending, nothing to do here! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No pending messages, queue `%s' to %s now idle\n",
+ queue->address,
+ GNUNET_i2s (&n->pid));
+ queue->idle = GNUNET_YES;
return;
}
- /* Pass 's' for transission to the communicator */
- queue_send_msg (queue, s, &s[1], s->bytes_msg);
- // FIXME: do something similar to the logic below
- // in defragmentation / reliability ACK handling!
-
- /* Check if this transmission somehow conclusively finished handing 'pm'
- even without any explicit ACKs */
- if ((PMT_CORE == s->pmt) &&
- (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
+ /* Given selection in `sc`, do transmission */
+ pm = sc.best;
+ if (NULL != sc.dvh)
{
- /* Full message sent, and over reliabile channel */
- client_send_response (pm);
+ GNUNET_assert (PMT_DV_BOX != pm->pmt);
+ if (NULL != sc.best->bpm)
+ {
+ /* We did this boxing before, but possibly for a different path!
+ Discard old DV box! OPTIMIZE-ME: we might want to check if
+ it is the same and then not re-build the message... */
+ free_pending_message (sc.best->bpm);
+ sc.best->bpm = NULL;
+ }
+ encapsulate_for_dv (sc.dvh->dv,
+ 1,
+ &sc.dvh,
+ (const struct GNUNET_MessageHeader *) &sc.best[1],
+ &extract_box_cb,
+ &sc,
+ RMO_NONE);
+ GNUNET_assert (NULL != sc.best->bpm);
+ pm = sc.best->bpm;
+ }
+ if (GNUNET_YES == sc.frag)
+ {
+ pm = fragment_message (queue, sc.dvh, pm);
+ if (NULL == pm)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
+ queue->address,
+ GNUNET_i2s (&n->pid),
+ sc.best->logging_uuid);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ return;
+ }
}
- else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
- queue->tc->details.communicator.cc) &&
- (PMT_FRAGMENT_BOX == s->pmt))
+ else if (GNUNET_YES == sc.relb)
{
- struct PendingMessage *pos;
-
- /* Fragment sent over reliabile channel */
- free_fragment_tree (s);
- pos = s->frag_parent;
- GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
- GNUNET_free (s);
- /* check if subtree is done */
- while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
- (pos != pm))
+ pm = reliability_box_message (queue, sc.dvh, pm);
+ if (NULL == pm)
{
- s = pos;
- pos = s->frag_parent;
- GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
- GNUNET_free (s);
+ /* Reliability boxing failed, try next message... */
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
+ queue->address,
+ GNUNET_i2s (&n->pid),
+ sc.best->logging_uuid);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ return;
}
+ }
- /* Was this the last applicable fragmment? */
- if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
- client_send_response (pm);
+ /* Pass 'pm' for transission to the communicator */
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Passing message <%llu> to queue %s for peer %s (considered %u others)\n",
+ pm->logging_uuid,
+ queue->address,
+ GNUNET_i2s (&n->pid),
+ sc.consideration_counter);
+
+ /* Flow control: increment amount of traffic sent; if we are routing
+ via DV (and thus the ultimate target of the pending message is for
+ a different virtual link than the one of the queue), then we need
+ to use up not only the window of the direct link but also the
+ flow control window for the DV link! */pm->vl->outbound_fc_window_size_used += pm->bytes_msg;
+
+ if (pm->vl != queue->neighbour->vl)
+ {
+ /* If the virtual link of the queue differs, this better be distance
+ vector routing! */
+ GNUNET_assert (NULL != sc.dvh);
+ /* If we do distance vector routing, we better not do this for a
+ message that was itself DV-routed */
+ GNUNET_assert (PMT_DV_BOX != sc.best->pmt);
+ /* We use the size of the unboxed message here, to avoid counting
+ the DV-Box header which is eaten up on the way by intermediaries */
+ queue->neighbour->vl->outbound_fc_window_size_used += sc.best->bytes_msg;
}
- else if (PMT_CORE != pm->pmt)
+ else
{
- /* This was an acknowledgement of some type, always free */
- free_pending_message (pm);
+ GNUNET_assert (NULL == sc.dvh);
+ }
+
+ queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
+
+ /* Check if this transmission somehow conclusively finished handing 'pm'
+ even without any explicit ACKs */
+ if ((PMT_CORE == pm->pmt) ||
+ (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
+ {
+ completed_pending_message (pm);
}
else
{
characteristics (i.e. RTT); it takes one RTT for the message to
arrive and the ACK to come back in the best case; but the other
side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
- retransmitting. Note that in the future this heuristic should
- likely be improved further (measure RTT stability, consider
- message urgency and size when delaying ACKs, etc.) */
- update_pm_next_attempt (s,
+ retransmitting.
+
+ OPTIMIZE: Note that in the future this heuristic should likely
+ be improved further (measure RTT stability, consider message
+ urgency and size when delaying ACKs, etc.) */update_pm_next_attempt (pm,
GNUNET_TIME_relative_to_absolute (
GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
4)));
}
-
/* finally, re-schedule queue transmission task itself */
- schedule_transmit_on_queue (queue, GNUNET_NO);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
if ((dqm->qid != queue->qid) ||
(0 != GNUNET_memcmp (&dqm->receiver, &neighbour->pid)))
continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropped queue %s to peer %s\n",
+ queue->address,
+ GNUNET_i2s (&neighbour->pid));
free_queue (queue);
GNUNET_SERVICE_client_continue (tc->client);
return;
qe);
qe->queue->queue_length--;
tc->details.communicator.total_queue_length--;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received ACK on queue %s to peer %s (new length: %u/%u)\n",
+ qe->queue->address,
+ GNUNET_i2s (&qe->queue->neighbour->pid),
+ qe->queue->queue_length,
+ tc->details.communicator.total_queue_length);
GNUNET_SERVICE_client_continue (tc->client);
/* if applicable, resume transmissions that waited on ACK */
for (struct Queue *queue = tc->details.communicator.queue_head;
NULL != queue;
queue = queue->next_client)
- schedule_transmit_on_queue (queue, GNUNET_NO);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
{
"# Transmission throttled due to queue queue limit",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (qe->queue, GNUNET_NO);
+ schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
if (NULL != (pm = qe->pm))
{
- struct Neighbour *n;
+ struct VirtualLink *vl;
GNUNET_assert (qe == pm->qe);
pm->qe = NULL;
/* If waiting for this communicator may have blocked transmission
of pm on other queues for this neighbour, force schedule
transmit on queue for queues of the neighbour */
- n = pm->target;
- if (n->pending_msg_head == pm)
- {
- for (struct Queue *queue = n->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- schedule_transmit_on_queue (queue, GNUNET_NO);
- }
- if (GNUNET_OK != ntohl (sma->status))
- {
- GNUNET_log (
- GNUNET_ERROR_TYPE_INFO,
- "Queue failed in transmission, will try retransmission immediately\n");
- update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
- }
+ vl = pm->vl;
+ if (vl->pending_msg_head == pm)
+ check_vl_transmission (vl);
}
GNUNET_free (qe);
}
for (struct Queue *q = neighbour->queue_head; NULL != q;
q = q->next_neighbour)
{
- struct MonitorEvent me = {.rtt = q->pd.aged_rtt,
- .cs = q->cs,
- .num_msg_pending = q->num_msg_pending,
- .num_bytes_pending = q->num_bytes_pending};
+ struct MonitorEvent me = { .rtt = q->pd.aged_rtt,
+ .cs = q->cs,
+ .num_msg_pending = q->num_msg_pending,
+ .num_bytes_pending = q->num_bytes_pending };
notify_monitor (tc, pid, q->address, q->nt, &me);
}
prefix = GNUNET_HELLO_address_to_prefix (address);
if (NULL == prefix)
{
- GNUNET_break (0); /* We got an invalid address!? */
+ GNUNET_break (0); /* We got an invalid address!? */
return;
}
tc = lookup_communicator (prefix);
"# Suggestions ignored due to missing communicator",
1,
GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Cannot connect to %s at `%s', no matching communicator present\n",
+ GNUNET_i2s (pid),
+ address);
+ GNUNET_free (prefix);
return;
}
/* forward suggestion for queue creation to communicator */
(unsigned int) idgen,
prefix,
address);
+ GNUNET_free (prefix);
alen = strlen (address) + 1;
env =
GNUNET_MQ_msg_extra (cqm, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
{
struct TransportValidationChallengeMessage tvc;
- vs->last_challenge_use = GNUNET_TIME_absolute_get ();
+ vs->last_challenge_use = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
tvc.header.type =
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
- tvc.header.size = htons (sizeof (tvc));
+ tvc.header.size = htons (sizeof(tvc));
tvc.reserved = htonl (0);
tvc.challenge = vs->challenge;
tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
- queue_send_msg (q, NULL, &tvc, sizeof (tvc));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Sending address validation challenge %s to %s\n",
+ GNUNET_sh2s (&tvc.challenge.value),
+ GNUNET_i2s (&q->neighbour->pid));
+ queue_send_msg (q, NULL, &tvc, sizeof(tvc));
}
vs = GNUNET_CONTAINER_heap_peek (validation_heap);
}
if (NULL == vs)
- return; /* woopsie, no more addresses known, should only
- happen if we're really a lonely peer */
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Address validation task not scheduled anymore, nothing to do\n");
+ return; /* woopsie, no more addresses known, should only
+ happen if we're really a lonely peer */
+ }
q = find_queue (&vs->pid, vs->address);
if (NULL == q)
{
vs->challenge_backoff =
GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
MAX_VALIDATION_CHALLENGE_FREQ);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Address validation task will run again in %s\n",
+ GNUNET_STRINGS_relative_time_to_string (vs->challenge_backoff,
+ GNUNET_YES));
update_next_challenge_time (vs,
GNUNET_TIME_relative_to_absolute (
vs->challenge_backoff));
ctx->num_queues++;
if (0 == ctx->k--)
ctx->q = q;
- /* OPTIMIZE-FIXME: in the future, add reliability / goodput
+ /* FIXME-CONQ-STATISTICS: in the future, add reliability / goodput
statistics and consider those as well here? */
if (q->pd.aged_rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
do_inc = GNUNET_YES;
/* scale our retries by how far we are above the threshold */
factor = qqc.quality_count / DV_LEARN_QUALITY_THRESHOLD;
delay = GNUNET_TIME_relative_multiply (DV_LEARN_BASE_FREQUENCY, factor);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "At connection quality %u, will launch DV learn in %s\n",
+ qqc.quality_count,
+ GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
dvlearn_task = GNUNET_SCHEDULER_add_delayed (delay, &start_dv_learn, NULL);
return;
}
lle = GNUNET_new (struct LearnLaunchEntry);
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&lle->challenge,
- sizeof (lle->challenge));
+ sizeof(lle->challenge));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting launch DV learn with challenge %s\n",
+ GNUNET_sh2s (&lle->challenge.value));
GNUNET_CONTAINER_DLL_insert (lle_head, lle_tail, lle);
GNUNET_break (GNUNET_YES ==
GNUNET_CONTAINER_multishortmap_put (
lle,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
dvl.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
- dvl.header.size = htons (sizeof (dvl));
+ dvl.header.size = htons (sizeof(dvl));
dvl.num_hops = htons (0);
dvl.bidirectional = htons (0);
dvl.non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
dvl.monotonic_time =
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
{
- struct DvInitPS dvip = {.purpose.purpose = htonl (
- GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
- .purpose.size = htonl (sizeof (dvip)),
- .monotonic_time = dvl.monotonic_time,
- .challenge = lle->challenge};
+ struct DvInitPS dvip = {
+ .purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+ .purpose.size = htonl (sizeof(dvip)),
+ .monotonic_time = dvl.monotonic_time,
+ .challenge = lle->challenge
+ };
- GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
- &dvip.purpose,
- &dvl.init_sig));
+ GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+ &dvip,
+ &dvl.init_sig);
}
dvl.initiator = GST_my_identity;
dvl.challenge = lle->challenge;
/* Do this as close to transmission time as possible! */
lle->launch_time = GNUNET_TIME_absolute_get ();
- queue_send_msg (qqc.q, NULL, &dvl, sizeof (dvl));
+ queue_send_msg (qqc.q, NULL, &dvl, sizeof(dvl));
/* reschedule this job, randomizing the time it runs (but no
actual backoff!) */
dvlearn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (
n->dv_monotime_available = GNUNET_YES;
return;
}
- if (sizeof (*mtbe) != record->value_size)
+ if (sizeof(*mtbe) != record->value_size)
{
GNUNET_break (0);
return;
const char *addr;
uint16_t addr_len;
- if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBoxMessage))
+ if (ntohl (aqm->mtu) <= sizeof(struct TransportFragmentBoxMessage))
{
/* MTU so small as to be useless for transmissions,
required for #fragment_message()! */
if (NULL == neighbour)
{
neighbour = GNUNET_new (struct Neighbour);
- neighbour->message_uuid_ctr =
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
neighbour->pid = aqm->receiver;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_put (
&neighbour_dv_monotime_cb,
neighbour);
}
- addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
+ addr_len = ntohs (aqm->header.size) - sizeof(*aqm);
addr = (const char *) &aqm[1];
-
- queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New queue %s to %s available with QID %llu\n",
+ addr,
+ GNUNET_i2s (&aqm->receiver),
+ (unsigned long long) aqm->qid);
+ queue = GNUNET_malloc (sizeof(struct Queue) + addr_len);
queue->tc = tc;
queue->address = (const char *) &queue[1];
queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
queue->neighbour = neighbour;
+ queue->idle = GNUNET_YES;
memcpy (&queue[1], addr, addr_len);
/* notify monitors about new queue */
{
- struct MonitorEvent me = {.rtt = queue->pd.aged_rtt, .cs = queue->cs};
+ struct MonitorEvent me = { .rtt = queue->pd.aged_rtt, .cs = queue->cs };
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
}
queue);
/* check if valdiations are waiting for the queue */
(void)
- GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
- &aqm->receiver,
- &check_validation_request_pending,
- queue);
+ GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+ &aqm->receiver,
+ &check_validation_request_pending,
+ queue);
+ /* look for traffic for this queue */
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
/* might be our first queue, try launching DV learning */
if (NULL == dvlearn_task)
dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
}
-/**
- * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
- * messages. We do nothing here, real verification is done later.
- *
- * @param cls a `struct TransportClient *`
- * @param msg message to verify
- * @return #GNUNET_OK
- */
-static int
-check_address_consider_verify (
- void *cls,
- const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
-{
- (void) cls;
- (void) hdr;
- return GNUNET_OK;
-}
-
-
-/**
- * Closure for #check_known_address.
- */
-struct CheckKnownAddressContext
-{
- /**
- * Set to the address we are looking for.
- */
- const char *address;
-
- /**
- * Set to a matching validation state, if one was found.
- */
- struct ValidationState *vs;
-};
-
-
-/**
- * Test if the validation state in @a value matches the
- * address from @a cls.
- *
- * @param cls a `struct CheckKnownAddressContext`
- * @param pid unused (must match though)
- * @param value a `struct ValidationState`
- * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
- */
-static int
-check_known_address (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
-{
- struct CheckKnownAddressContext *ckac = cls;
- struct ValidationState *vs = value;
-
- (void) pid;
- if (0 != strcmp (vs->address, ckac->address))
- return GNUNET_OK;
- ckac->vs = vs;
- return GNUNET_NO;
-}
-
-
-/**
- * Start address validation.
- *
- * @param pid peer the @a address is for
- * @param address an address to reach @a pid (presumably)
- */
-static void
-start_address_validation (const struct GNUNET_PeerIdentity *pid,
- const char *address)
-{
- struct GNUNET_TIME_Absolute now;
- struct ValidationState *vs;
- struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
-
- (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
- pid,
- &check_known_address,
- &ckac);
- if (NULL != (vs = ckac.vs))
- {
- /* if 'vs' is not currently valid, we need to speed up retrying the
- * validation */
- if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
- {
- /* reduce backoff as we got a fresh advertisement */
- vs->challenge_backoff =
- GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
- GNUNET_TIME_relative_divide (vs->challenge_backoff,
- 2));
- update_next_challenge_time (vs,
- GNUNET_TIME_relative_to_absolute (
- vs->challenge_backoff));
- }
- return;
- }
- now = GNUNET_TIME_absolute_get ();
- vs = GNUNET_new (struct ValidationState);
- vs->pid = *pid;
- vs->valid_until =
- GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME);
- vs->first_challenge_use = now;
- vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
- GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &vs->challenge,
- sizeof (vs->challenge));
- vs->address = GNUNET_strdup (address);
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_put (
- validation_map,
- &vs->pid,
- vs,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- update_next_challenge_time (vs, now);
-}
-
-
/**
* Function called by PEERSTORE for each matching record.
*
- * @param cls closure
+ * @param cls closure, a `struct PeerRequest`
* @param record peerstore record information
* @param emsg error message, or NULL if no errors
*/
static void
-handle_hello (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
+handle_hello_for_client (void *cls,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
struct PeerRequest *pr = cls;
const char *val;
pr->bw = msg->bw;
pr->pk = (enum GNUNET_MQ_PriorityPreferences) ntohl (msg->pk);
if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_put (
- tc->details.application.requests,
- &pr->pid,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+ tc->details.application.requests,
+ &pr->pid,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
{
GNUNET_break (0);
GNUNET_free (pr);
"transport",
&pr->pid,
GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
- &handle_hello,
+ &handle_hello_for_client,
pr);
GNUNET_SERVICE_client_continue (tc->client);
}
-/**
- * Given another peers address, consider checking it for validity
- * and then adding it to the Peerstore.
- *
- * @param cls a `struct TransportClient`
- * @param hdr message containing the raw address data and
- * signature in the body, see #GNUNET_HELLO_extract_address()
- */
-static void
-handle_address_consider_verify (
- void *cls,
- const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
-{
- struct TransportClient *tc = cls;
- char *address;
- enum GNUNET_NetworkType nt;
- struct GNUNET_TIME_Absolute mono_time;
-
- (void) cls;
- // OPTIMIZE-FIXME: checking that we know this address already should
- // be done BEFORE checking the signature => HELLO API change!
- // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
- // validation?!
- address =
- GNUNET_HELLO_extract_address (&hdr[1],
- ntohs (hdr->header.size) - sizeof (*hdr),
- &hdr->peer,
- &nt,
- &mono_time);
- if (NULL == address)
- {
- GNUNET_break_op (0);
- return;
- }
- start_address_validation (&hdr->peer, address);
- GNUNET_free (address);
- GNUNET_SERVICE_client_continue (tc->client);
-}
-
-
/**
* Check #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION
* messages.
(void) cls;
(void) pid;
- GNUNET_break (0); // should this ever happen?
+ GNUNET_break (0); // should this ever happen?
free_neighbour (neighbour);
return GNUNET_OK;
}
-/**
- * Free ephemeral entry.
- *
- * @param cls NULL
- * @param pid unused
- * @param value a `struct EphemeralCacheEntry`
- * @return #GNUNET_OK (always)
- */
-static int
-free_ephemeral_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
-{
- struct EphemeralCacheEntry *ece = value;
-
- (void) cls;
- (void) pid;
- free_ephemeral (ece);
- return GNUNET_OK;
-}
-
-
/**
* Free validation state.
*
* @return #GNUNET_OK (always)
*/
static int
-free_pending_ack_cb (void *cls,
- const struct GNUNET_ShortHashCode *key,
- void *value)
+free_pending_ack_cb (void *cls, const struct GNUNET_Uuid *key, void *value)
{
struct PendingAcknowledgement *pa = value;
do_shutdown (void *cls)
{
struct LearnLaunchEntry *lle;
+
(void) cls;
- if (NULL != ephemeral_task)
- {
- GNUNET_SCHEDULER_cancel (ephemeral_task);
- ephemeral_task = NULL;
- }
GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL);
if (NULL != peerstore)
{
NULL);
GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
ack_cummulators = NULL;
- GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
- &free_pending_ack_cb,
- NULL);
- GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+ GNUNET_CONTAINER_multiuuidmap_iterate (pending_acks,
+ &free_pending_ack_cb,
+ NULL);
+ GNUNET_CONTAINER_multiuuidmap_destroy (pending_acks);
pending_acks = NULL;
GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
NULL);
GNUNET_CONTAINER_multipeermap_destroy (validation_map);
validation_map = NULL;
+ while (NULL != ir_head)
+ free_incoming_request (ir_head);
+ GNUNET_assert (0 == ir_total);
while (NULL != (lle = lle_head))
{
GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle);
GNUNET_CONTAINER_multipeermap_iterate (dv_routes, &free_dv_routes_cb, NULL);
GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
dv_routes = NULL;
- GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
- &free_ephemeral_cb,
- NULL);
- GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
- ephemeral_map = NULL;
- GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
- ephemeral_heap = NULL;
}
hello_mono_time = GNUNET_TIME_absolute_get_monotonic (c);
GST_cfg = c;
backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
- pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+ pending_acks = GNUNET_CONTAINER_multiuuidmap_create (32768, GNUNET_YES);
ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
- ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
- ephemeral_heap =
- GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
GNUNET_YES);
validation_map = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
struct GNUNET_TRANSPORT_AddQueueMessage,
NULL),
- GNUNET_MQ_hd_var_size (address_consider_verify,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
- struct GNUNET_TRANSPORT_AddressToVerify,
- NULL),
GNUNET_MQ_hd_fixed_size (del_queue_message,
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
struct GNUNET_TRANSPORT_DelQueueMessage,