*
* TODO:
* Implement next:
- * - track RTT, distance, loss, etc. => requires extra data structures!
- * - consider replacing random `struct GNUNET_ShortHashCode` message UUIDs
- * with (incrementing) 64-bit numbers (compacting both
- * `struct TransportReliabilityBox` and `struct
- * TransportReliabilityAckMessage`), and using *different* UUIDs for each
- * transmission (even of the same message!)
- * - proper use/initialization of timestamps in messages exchanged
- * during DV learning
- * - persistence of monotonic time from DVInit to prevent
- * replay attacks using DVInit messages
- * - persistence of monotonic time obtained from other peers
- * in PEERSTORE (by message type) -- done for backchannel, needed elsewhere?
- * - change transport-core API to provide proper flow control in both
- * directions, allow multiple messages per peer simultaneously (tag
- * confirmations with unique message ID), and replace quota-out with
- * proper flow control; specify transmission preferences (latency,
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
* reliability, etc.) per message!
- * - add logging
- *
- * Later:
* - review retransmission logic, right now there is no smartness there!
- * => congestion control, flow control, etc (requires RTT, loss, etc.)
+ * => congestion control, flow control, etc [PERFORMANCE-BASICS]
*
* Optimizations:
- * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
- * against our pending message queue (requires additional per neighbour
- * hash map to be maintained, avoids possible linear scan on pending msgs)
+ * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
+ * => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
* - queue_send_msg and route_message both by API design have to make copies
* of the payload, and route_message on top of that requires a malloc/free.
- * Change design to approximate "zero" copy better...
+ * Change design to approximate "zero" copy better... [CPU]
* - could avoid copying body of message into each fragment and keep
* fragments as just pointers into the original message and only
* fully build fragments just before transmission (optimization, should
- * reduce CPU and memory use)
+ * reduce CPU and memory use) [CPU, MEMORY]
* - if messages are below MTU, consider adding ACKs and other stuff
- * (requires planning at receiver, and additional MST-style demultiplex
- * at receiver!)
+ * to the same transmission to avoid tiny messages (requires planning at
+ * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
* - When we passively learned DV (with unconfirmed freshness), we
* right now add the path to our list but with a zero path_valid_until
* time and only use it for unconfirmed routes. However, we could consider
* triggering an explicit validation mechansim ourselves, specifically routing
- * a challenge-response message over the path (OPTIMIZATION-FIXME).
+ * a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
#include "gnunet_signatures.h"
#include "transport.h"
+/**
+ * Maximum number of messages we acknowledge together in one
+ * cummulative ACK. Larger values may save a bit of bandwidth.
+ */
+#define MAX_CUMMULATIVE_ACKS 64
/**
* What is the size we assume for a read operation in the
*/
#define IN_PACKET_SIZE_WITHOUT_MTU 128
+/**
+ * Number of slots we keep of historic data for computation of
+ * goodput / message loss ratio.
+ */
+#define GOODPUT_AGING_SLOTS 4
+
+/**
+ * Maximum number of peers we select for forwarding DVInit
+ * messages at the same time (excluding initiator).
+ */
+#define MAX_DV_DISCOVERY_SELECTION 16
+
+/**
+ * Window size. How many messages to the same target do we pass
+ * to CORE without a RECV_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define RECV_WINDOW_SIZE 4
+
/**
* Minimum number of hops we should forward DV learn messages
* even if they are NOT useful for us in hope of looping
#define MAX_VALIDATION_CHALLENGE_FREQ \
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
+/**
+ * How long until we forget about historic accumulators and thus
+ * reset the ACK counter? Should exceed the maximum time an
+ * active connection experiences without an ACK.
+ */
+#define ACK_CUMMULATOR_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+
/**
* What is the non-randomized base frequency at which we
* would initiate DV learn messages?
/**
* Unique identifier we attach to a message.
*/
-struct MessageUUID
+struct MessageUUIDP
+{
+ /**
+ * Unique value, generated by incrementing the
+ * `message_uuid_ctr` of `struct Neighbour`.
+ */
+ uint64_t uuid GNUNET_PACKED;
+};
+
+
+/**
+ * Unique identifier to map an acknowledgement to a transmission.
+ */
+struct AcknowledgementUUIDP
{
/**
- * Unique value.
+ * The UUID value. Not actually a hash, but a random value.
*/
- struct GNUNET_ShortHashCode uuid;
+ struct GNUNET_ShortHashCode value;
};
/**
* Type of a nonce used for challenges.
*/
-struct ChallengeNonce
+struct ChallengeNonceP
{
/**
* The value of the nonce. Note that this is NOT a hash.
* communicators must protect against replay attacks when using backchannel
* communication!
*/
- struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+ struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time;
/**
* Target's peer identity.
* Plaintext of the variable-size payload that is encrypted
* within a `struct TransportBackchannelEncapsulationMessage`
*/
-struct TransportBackchannelRequestPayload
+struct TransportBackchannelRequestPayloadP
{
/**
*/
struct GNUNET_CRYPTO_EddsaSignature sender_sig;
- /**
- * How long is this signature over the ephemeral key valid?
- *
- * Note that the receiver MUST IGNORE the absolute time, and only interpret
- * the value as a mononic time and reject "older" values than the last one
- * observed. This is necessary as we do not want to require synchronized
- * clocks and may not have a bidirectional communication channel.
- *
- * Even with this, there is no real guarantee against replay achieved here,
- * unless the latest timestamp is persisted. While persistence should be
- * provided via PEERSTORE, we do not consider the mechanism reliable! Thus,
- * communicators must protect against replay attacks when using backchannel
- * communication!
- */
- struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
-
/**
* Current monotonic time of the sending transport service. Used to
* detect replayed messages. Note that the receiver should remember
* Outer layer of an encapsulated unfragmented application message sent
* over an unreliable channel.
*/
-struct TransportReliabilityBox
+struct TransportReliabilityBoxMessage
{
/**
* Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
* messages sent over possibly unreliable channels. Should
* be a random.
*/
- struct MessageUUID msg_uuid;
+ struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayloadP
+{
+ /**
+ * How long was the ACK delayed for generating cummulative ACKs?
+ * Used to calculate the correct network RTT by taking the receipt
+ * time of the ack minus the transmission time of the sender minus
+ * this value.
+ */
+ struct GNUNET_TIME_RelativeNBO ack_delay;
+
+ /**
+ * UUID of a message being acknowledged.
+ */
+ struct AcknowledgementUUIDP ack_uuid;
};
struct GNUNET_MessageHeader header;
/**
- * Reserved. Zero.
- */
- uint32_t reserved GNUNET_PACKED;
-
- /**
- * How long was the ACK delayed relative to the average time of
- * receipt of the messages being acknowledged? Used to calculate
- * the average RTT by taking the receipt time of the ack minus the
- * average transmission time of the sender minus this value.
+ * Counter of ACKs transmitted by the sender to us. Incremented
+ * by one for each ACK, used to detect how many ACKs were lost.
*/
- struct GNUNET_TIME_RelativeNBO avg_ack_delay;
+ uint32_t ack_counter GNUNET_PACKED;
- /* followed by any number of `struct MessageUUID`
+ /* followed by any number of `struct TransportCummulativeAckPayloadP`
messages providing ACKs */
};
/**
* Outer layer of an encapsulated fragmented application message.
*/
-struct TransportFragmentBox
+struct TransportFragmentBoxMessage
{
/**
* Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
/**
* Unique ID of this fragment (and fragment transmission!). Will
* change even if a fragement is retransmitted to make each
- * transmission attempt unique! Should be incremented by one for
- * each fragment transmission. If a client receives a duplicate
- * fragment (same @e frag_off), it must send
- * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
+ * transmission attempt unique! If a client receives a duplicate
+ * fragment (same @e frag_off for same @a msg_uuid, it must send
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately.
*/
- struct FragmentUUIDP frag_uuid;
+ struct AcknowledgementUUIDP ack_uuid;
/**
- * Original message ID for of the message that all the
- * fragments belong to. Must be the same for all fragments.
+ * Original message ID for of the message that all the fragments
+ * belong to. Must be the same for all fragments.
*/
- struct MessageUUID msg_uuid;
+ struct MessageUUIDP msg_uuid;
/**
* Offset of this fragment in the overall message.
};
-/**
- * Outer layer of an fragmented application message sent over a queue
- * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
- * received, the receiver has two RTTs or 64 further fragments with
- * the same basic message time to send an acknowledgement, possibly
- * acknowledging up to 65 fragments in one ACK. ACKs must also be
- * sent immediately once all fragments were sent.
- */
-struct TransportFragmentAckMessage
-{
- /**
- * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Unique ID of the lowest fragment UUID being acknowledged.
- */
- struct FragmentUUIDP frag_uuid;
-
- /**
- * Bitfield of up to 64 additional fragments following the
- * @e msg_uuid being acknowledged by this message.
- */
- uint64_t extra_acks GNUNET_PACKED;
-
- /**
- * Original message ID for of the message that all the
- * fragments belong to.
- */
- struct MessageUUID msg_uuid;
-
- /**
- * How long was the ACK delayed relative to the average time of
- * receipt of the fragments being acknowledged? Used to calculate
- * the average RTT by taking the receipt time of the ack minus the
- * average transmission time of the sender minus this value.
- */
- struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
- /**
- * How long until the receiver will stop trying reassembly
- * of this message?
- */
- struct GNUNET_TIME_RelativeNBO reassembly_timeout;
-};
-
-
/**
* Content signed by the initator during DV learning.
*
/**
* Challenge value used by the initiator to re-identify the path.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
};
/**
* Challenge value used by the initiator to re-identify the path.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
};
/**
* An entry describing a peer on a path in a
- * `struct TransportDVLearn` message.
+ * `struct TransportDVLearnMessage` message.
*/
struct DVPathEntryP
{
* zero, peers that can forward to the initator should always try to
* forward to the initiator.
*/
-struct TransportDVLearn
+struct TransportDVLearnMessage
{
/**
* Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
*/
struct GNUNET_TIME_RelativeNBO non_network_delay;
+ /**
+ * Time at the initiator when generating the signature.
+ *
+ * Note that the receiver MUST IGNORE the absolute time, and only interpret
+ * the value as a mononic time and reject "older" values than the last one
+ * observed. This is necessary as we do not want to require synchronized
+ * clocks and may not have a bidirectional communication channel.
+ *
+ * Even with this, there is no real guarantee against replay achieved here,
+ * unless the latest timestamp is persisted. Persistence should be
+ * provided via PEERSTORE if possible.
+ */
+ struct GNUNET_TIME_AbsoluteNBO monotonic_time;
+
/**
* Signature of this hop over the path, of purpose
* #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
/**
* Challenge value used by the initiator to re-identify the path.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
/* Followed by @e num_hops `struct DVPathEntryP` values,
excluding the initiator of the DV trace; the last entry is the
*
* If a peer finds itself still on the list, it must drop the message.
*/
-struct TransportDVBox
+struct TransportDVBoxMessage
{
/**
* Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
* Message send to another peer to validate that it can indeed
* receive messages at a particular address.
*/
-struct TransportValidationChallenge
+struct TransportValidationChallengeMessage
{
/**
/**
* Challenge to be signed by the receiving peer.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
/**
* Timestamp of the sender, to be copied into the reply
/**
* Challenge signed by the receiving peer.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
};
* Message send to a peer to respond to a
* #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
*/
-struct TransportValidationResponse
+struct TransportValidationResponseMessage
{
/**
/**
* The challenge that was signed by the receiving peer.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
/**
* Original timestamp of the sender (was @code{sender_time}),
};
+/**
+ * Which transmission options are allowable for transmission?
+ * Interpreted bit-wise!
+ */
+enum RouteMessageOptions
+{
+ /**
+ * Only confirmed, non-DV direct neighbours.
+ */
+ RMO_NONE = 0,
+
+ /**
+ * We are allowed to use DV routing for this @a hdr
+ */
+ RMO_DV_ALLOWED = 1,
+
+ /**
+ * We are allowed to use unconfirmed queues or DV routes for this message
+ */
+ RMO_UNCONFIRMED_ALLOWED = 2,
+
+ /**
+ * Reliable and unreliable, DV and non-DV are all acceptable.
+ */
+ RMO_ANYTHING_GOES = (RMO_DV_ALLOWED | RMO_UNCONFIRMED_ALLOWED),
+
+ /**
+ * If we have multiple choices, it is OK to send this message
+ * over multiple channels at the same time to improve loss tolerance.
+ * (We do at most 2 transmissions.)
+ */
+ RMO_REDUNDANT = 4
+};
+
+
/**
* When did we launch this DV learning activity?
*/
/**
* Challenge that uniquely identifies this activity.
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
/**
* When did we transmit the DV learn message (used to calculate RTT) and
*/
struct GNUNET_TIME_Absolute ephemeral_validity;
+ /**
+ * What time was @e sender_sig created
+ */
+ struct GNUNET_TIME_Absolute monotime;
+
/**
* Our ephemeral key.
*/
};
+/**
+ * Information we keep per #GOODPUT_AGING_SLOTS about historic
+ * (or current) transmission performance.
+ */
+struct TransmissionHistoryEntry
+{
+ /**
+ * Number of bytes actually sent in the interval.
+ */
+ uint64_t bytes_sent;
+
+ /**
+ * Number of bytes received and acknowledged by the other peer in
+ * the interval.
+ */
+ uint64_t bytes_received;
+};
+
+
+/**
+ * Performance data for a transmission possibility.
+ */
+struct PerformanceData
+{
+ /**
+ * Weighted average for the RTT.
+ */
+ struct GNUNET_TIME_Relative aged_rtt;
+
+ /**
+ * Historic performance data, using a ring buffer of#GOODPUT_AGING_SLOTS
+ * entries.
+ */
+ struct TransmissionHistoryEntry the[GOODPUT_AGING_SLOTS];
+
+ /**
+ * What was the last age when we wrote to @e the? Used to clear
+ * old entries when the age advances.
+ */
+ unsigned int last_age;
+};
+
+
/**
* Client connected to the transport service.
*/
struct TransportClient;
-
/**
* A neighbour that at least one communicator is connected to.
*/
struct Neighbour;
-
/**
* Entry in our #dv_routes table, representing a (set of) distance
* vector routes to a particular peer.
*/
struct DistanceVector;
+/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
+
/**
* One possible hop towards a DV target.
*/
-struct DistanceVectorHop
+struct DistanceVectorHop;
+
+
+/**
+ * Context from #handle_incoming_msg(). Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
{
/**
- * Kept in a MDLL, sorted by @e timeout.
+ * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+ * flow control to unchoke.
*/
- struct DistanceVectorHop *next_dv;
+ struct CommunicatorMessageContext *next;
/**
- * Kept in a MDLL, sorted by @e timeout.
+ * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+ * flow control to unchoke.
*/
- struct DistanceVectorHop *prev_dv;
+ struct CommunicatorMessageContext *prev;
/**
- * Kept in a MDLL.
+ * Which communicator provided us with the message.
*/
- struct DistanceVectorHop *next_neighbour;
+ struct TransportClient *tc;
/**
- * Kept in a MDLL.
+ * Additional information for flow control and about the sender.
*/
- struct DistanceVectorHop *prev_neighbour;
+ struct GNUNET_TRANSPORT_IncomingMessage im;
/**
- * What would be the next hop to @e target?
+ * Number of hops the message has travelled (if DV-routed).
+ * FIXME: make use of this in ACK handling!
*/
- struct Neighbour *next_hop;
+ uint16_t total_hops;
+};
+
+/**
+ * A virtual link is another reachable peer that is known to CORE. It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`. With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink
+{
/**
- * Distance vector entry this hop belongs with.
+ * Identity of the peer at the other end of the link.
*/
- struct DistanceVector *dv;
+ struct GNUNET_PeerIdentity target;
/**
- * Array of @e distance hops to the target, excluding @e next_hop.
- * NULL if the entire path is us to @e next_hop to `target`. Allocated
- * at the end of this struct. Excludes the target itself!
+ * Communicators blocked for receiving on @e target as we are waiting
+ * on the @e core_recv_window to increase.
*/
- const struct GNUNET_PeerIdentity *path;
+ struct CommunicatorMessageContext *cmc_head;
/**
- * At what time do we forget about this path unless we see it again
- * while learning?
+ * Communicators blocked for receiving on @e target as we are waiting
+ * on the @e core_recv_window to increase.
*/
- struct GNUNET_TIME_Absolute timeout;
+ struct CommunicatorMessageContext *cmc_tail;
/**
- * For how long is the validation of this path considered
- * valid?
- * Set to ZERO if the path is learned by snooping on DV learn messages
- * initiated by other peers, and to the time at which we generated the
- * challenge for DV learn operations this peer initiated.
+ * Task scheduled to possibly notfiy core that this peer is no
+ * longer counting as confirmed. Runs the #core_visibility_check(),
+ * which checks that some DV-path or a queue exists that is still
+ * considered confirmed.
*/
- struct GNUNET_TIME_Absolute path_valid_until;
+ struct GNUNET_SCHEDULER_Task *visibility_task;
/**
- * Number of hops in total to the `target` (excluding @e next_hop and `target`
- * itself). Thus 0 still means a distance of 2 hops (to @e next_hop and then
- * to `target`).
+ * Neighbour used by this virtual link, NULL if @e dv is used.
*/
- unsigned int distance;
+ struct Neighbour *n;
+
+ /**
+ * Distance vector used by this virtual link, NULL if @e n is used.
+ */
+ struct DistanceVector *dv;
+
+ /**
+ * How many more messages can we send to core before we exhaust
+ * the receive window of CORE for this peer? If this hits zero,
+ * we must tell communicators to stop providing us more messages
+ * for this peer. In fact, the window can go negative as we
+ * have multiple communicators, so per communicator we can go
+ * down by one into the negative range.
+ */
+ int core_recv_window;
};
/**
- * Entry in our #dv_routes table, representing a (set of) distance
- * vector routes to a particular peer.
+ * Data structure kept when we are waiting for an acknowledgement.
*/
-struct DistanceVector
+struct PendingAcknowledgement
{
/**
- * To which peer is this a route?
+ * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to its pending message.
*/
- struct GNUNET_PeerIdentity target;
+ struct PendingAcknowledgement *next_pm;
/**
- * Known paths to @e target.
+ * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to its pending message.
*/
- struct DistanceVectorHop *dv_head;
+ struct PendingAcknowledgement *prev_pm;
/**
- * Known paths to @e target.
+ * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the queue that was used to transmit the
+ * @a pm.
*/
- struct DistanceVectorHop *dv_tail;
+ struct PendingAcknowledgement *next_queue;
/**
- * Task scheduled to purge expired paths from @e dv_head MDLL.
+ * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the queue that was used to transmit the
+ * @a pm.
*/
- struct GNUNET_SCHEDULER_Task *timeout_task;
+ struct PendingAcknowledgement *prev_queue;
/**
- * Task scheduled to possibly notfiy core that this queue is no longer
- * counting as confirmed. Runs the #core_queue_visibility_check().
+ * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the DVH that was used to transmit the
+ * @a pm.
*/
- struct GNUNET_SCHEDULER_Task *visibility_task;
+ struct PendingAcknowledgement *next_dvh;
/**
- * Quota at which CORE is allowed to transmit to this peer
- * (note that the value CORE should actually be told is this
- * value plus the respective value in `struct Neighbour`).
- * Should match the sum of the quotas of all of the paths.
- *
- * FIXME: not yet set, tricky to get right given multiple paths,
- * many of which may be inactive! (=> Idea: measure???)
- * FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero?
- * (=> Current thought: clean would be zero!)
+ * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the DVH that was used to transmit the
+ * @a pm.
*/
- struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+ struct PendingAcknowledgement *prev_dvh;
/**
- * Is one of the DV paths in this struct 'confirmed' and thus
- * the cause for CORE to see this peer as connected? (Note that
- * the same may apply to a `struct Neighbour` at the same time.)
+ * Pointers for the DLL of all pending acknowledgements.
+ * This list is sorted by @e transmission time. If the list gets too
+ * long, the oldest entries are discarded.
*/
- int core_visible;
-};
+ struct PendingAcknowledgement *next_pa;
+ /**
+ * Pointers for the DLL of all pending acknowledgements.
+ * This list is sorted by @e transmission time. If the list gets too
+ * long, the oldest entries are discarded.
+ */
+ struct PendingAcknowledgement *prev_pa;
-/**
- * A queue is a message queue provided by a communicator
- * via which we can reach a particular neighbour.
- */
-struct Queue;
+ /**
+ * Unique identifier for this transmission operation.
+ */
+ struct AcknowledgementUUIDP ack_uuid;
+
+ /**
+ * Message that was transmitted, may be NULL if the message was ACKed
+ * via another channel.
+ */
+ struct PendingMessage *pm;
+
+ /**
+ * Distance vector path chosen for this transmission, NULL if transmission
+ * was to a direct neighbour OR if the path was forgotten in the meantime.
+ */
+ struct DistanceVectorHop *dvh;
+
+ /**
+ * Queue used for transmission, NULL if the queue has been destroyed
+ * (which may happen before we get an acknowledgement).
+ */
+ struct Queue *queue;
+
+ /**
+ * Time of the transmission, for RTT calculation.
+ */
+ struct GNUNET_TIME_Absolute transmission_time;
+
+ /**
+ * Number of bytes of the original message (to calculate bandwidth).
+ */
+ uint16_t message_size;
+};
+
+
+/**
+ * One possible hop towards a DV target.
+ */
+struct DistanceVectorHop
+{
+
+ /**
+ * Kept in a MDLL, sorted by @e timeout.
+ */
+ struct DistanceVectorHop *next_dv;
+
+ /**
+ * Kept in a MDLL, sorted by @e timeout.
+ */
+ struct DistanceVectorHop *prev_dv;
+
+ /**
+ * Kept in a MDLL.
+ */
+ struct DistanceVectorHop *next_neighbour;
+
+ /**
+ * Kept in a MDLL.
+ */
+ struct DistanceVectorHop *prev_neighbour;
+
+ /**
+ * Head of MDLL of messages routed via this path.
+ */
+ struct PendingMessage *pending_msg_head;
+
+ /**
+ * Tail of MDLL of messages routed via this path.
+ */
+ struct PendingMessage *pending_msg_tail;
+
+ /**
+ * Head of DLL of PAs that used our @a path.
+ */
+ struct PendingAcknowledgement *pa_head;
+
+ /**
+ * Tail of DLL of PAs that used our @a path.
+ */
+ struct PendingAcknowledgement *pa_tail;
+
+ /**
+ * What would be the next hop to @e target?
+ */
+ struct Neighbour *next_hop;
+
+ /**
+ * Distance vector entry this hop belongs with.
+ */
+ struct DistanceVector *dv;
+
+ /**
+ * Array of @e distance hops to the target, excluding @e next_hop.
+ * NULL if the entire path is us to @e next_hop to `target`. Allocated
+ * at the end of this struct. Excludes the target itself!
+ */
+ const struct GNUNET_PeerIdentity *path;
+
+ /**
+ * At what time do we forget about this path unless we see it again
+ * while learning?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * For how long is the validation of this path considered
+ * valid?
+ * Set to ZERO if the path is learned by snooping on DV learn messages
+ * initiated by other peers, and to the time at which we generated the
+ * challenge for DV learn operations this peer initiated.
+ */
+ struct GNUNET_TIME_Absolute path_valid_until;
+
+ /**
+ * Performance data for this transmission possibility.
+ */
+ struct PerformanceData pd;
+
+ /**
+ * Number of hops in total to the `target` (excluding @e next_hop and `target`
+ * itself). Thus 0 still means a distance of 2 hops (to @e next_hop and then
+ * to `target`).
+ */
+ unsigned int distance;
+};
+
+
+/**
+ * Entry in our #dv_routes table, representing a (set of) distance
+ * vector routes to a particular peer.
+ */
+struct DistanceVector
+{
+
+ /**
+ * To which peer is this a route?
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Known paths to @e target.
+ */
+ struct DistanceVectorHop *dv_head;
+
+ /**
+ * Known paths to @e target.
+ */
+ struct DistanceVectorHop *dv_tail;
+
+ /**
+ * Task scheduled to purge expired paths from @e dv_head MDLL.
+ */
+ struct GNUNET_SCHEDULER_Task *timeout_task;
+
+ /**
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE? If so, this is the virtual link, otherwise NULL.
+ */
+ struct VirtualLink *link;
+};
-/**
- * Message awaiting transmission. See detailed comments below.
- */
-struct PendingMessage;
/**
* Entry identifying transmission in one of our `struct
*/
struct Queue *next_client;
+ /**
+ * Head of DLL of PAs that used this queue.
+ */
+ struct PendingAcknowledgement *pa_head;
+
+ /**
+ * Tail of DLL of PAs that used this queue.
+ */
+ struct PendingAcknowledgement *pa_tail;
+
/**
* Head of DLL of unacked transmission requests.
*/
/**
* Task scheduled for the time when this queue can (likely) transmit the
- * next message. Still needs to check with the @e tracker_out to be sure.
+ * next message.
*/
struct GNUNET_SCHEDULER_Task *transmit_task;
- /**
- * Task scheduled to possibly notfiy core that this queue is no longer
- * counting as confirmed. Runs the #core_queue_visibility_check().
- */
- struct GNUNET_SCHEDULER_Task *visibility_task;
-
- /**
- * Our current RTT estimate for this queue.
- */
- struct GNUNET_TIME_Relative rtt;
-
/**
* How long do *we* consider this @e address to be valid? In the past or
* zero if we have not yet validated it. Can be updated based on
struct GNUNET_TIME_Absolute validated_until;
/**
- * Message ID generator for transmissions on this queue.
+ * Performance data for this queue.
+ */
+ struct PerformanceData pd;
+
+ /**
+ * Message ID generator for transmissions on this queue to the
+ * communicator.
*/
uint64_t mid_gen;
* Connection status for this queue.
*/
enum GNUNET_TRANSPORT_ConnectionStatus cs;
-
- /**
- * How much outbound bandwidth do we have available for this queue?
- */
- struct GNUNET_BANDWIDTH_Tracker tracker_out;
-
- /**
- * How much inbound bandwidth do we have available for this queue?
- */
- struct GNUNET_BANDWIDTH_Tracker tracker_in;
};
{
/**
- * Original message ID for of the message that all the
- * fragments belong to.
+ * Original message ID for of the message that all the fragments
+ * belong to.
*/
- struct MessageUUID msg_uuid;
+ struct MessageUUIDP msg_uuid;
/**
* Which neighbour is this context for?
*/
struct GNUNET_TIME_Absolute reassembly_timeout;
- /**
- * Average delay of all acks in @e extra_acks and @e frag_uuid.
- * Should be reset to zero when @e num_acks is set to 0.
- */
- struct GNUNET_TIME_Relative avg_ack_delay;
-
/**
* Time we received the last fragment. @e avg_ack_delay must be
* incremented by now - @e last_frag multiplied by @e num_acks.
*/
struct GNUNET_TIME_Absolute last_frag;
- /**
- * Bitfield of up to 64 additional fragments following @e frag_uuid
- * to be acknowledged in the next cummulative ACK.
- */
- uint64_t extra_acks;
-
- /**
- * Unique ID of the lowest fragment UUID to be acknowledged in the
- * next cummulative ACK. Only valid if @e num_acks > 0.
- */
- uint32_t frag_uuid;
-
- /**
- * Number of ACKs we have accumulated so far. Reset to 0
- * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
- */
- unsigned int num_acks;
-
/**
* How big is the message we are reassembling in total?
*/
* reassembly. May be NULL if we currently have no fragments from
* this @e pid (lazy initialization).
*/
- struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
+ struct GNUNET_CONTAINER_MultiHashMap32 *reassembly_map;
/**
* Heap with `struct ReassemblyContext` structs for fragments under
struct Queue *queue_tail;
/**
- * Task run to cleanup pending messages that have exceeded their timeout.
+ * Handle for an operation to fetch @e last_dv_learn_monotime information from
+ * the PEERSTORE, or NULL.
*/
- struct GNUNET_SCHEDULER_Task *timeout_task;
+ struct GNUNET_PEERSTORE_IterateContext *get;
/**
- * Quota at which CORE is allowed to transmit to this peer
- * (note that the value CORE should actually be told is this
- * value plus the respective value in `struct DistanceVector`).
- * Should match the sum of the quotas of all of the queues.
- *
- * FIXME: not yet set, tricky to get right given multiple queues!
- * (=> Idea: measure???)
- * FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero?
- * (=> Current thought: clean would be zero!)
+ * Handle to a PEERSTORE store operation to store this @e pid's @e
+ * @e last_dv_learn_monotime. NULL if no PEERSTORE operation is pending.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *sc;
+
+ /**
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+ struct VirtualLink *link;
/**
- * What is the earliest timeout of any message in @e pending_msg_tail?
+ * Latest DVLearn monotonic time seen from this peer. Initialized only
+ * if @e dl_monotime_available is #GNUNET_YES.
*/
- struct GNUNET_TIME_Absolute earliest_timeout;
+ struct GNUNET_TIME_Absolute last_dv_learn_monotime;
/**
- * Do we have a confirmed working queue and are thus visible to
- * CORE?
+ * Do we have the lastest value for @e last_dv_learn_monotime from
+ * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
*/
- int core_visible;
+ int dv_monotime_available;
};
*/
struct PendingMessage *prev_frag;
+ /**
+ * Kept in a MDLL of messages using this @a dvh (if @e dvh is
+ * non-NULL).
+ */
+ struct PendingMessage *next_dvh;
+
+ /**
+ * Kept in a MDLL of messages using this @a dvh (if @e dvh is
+ * non-NULL).
+ */
+ struct PendingMessage *prev_dvh;
+
+ /**
+ * Head of DLL of PAs for this pending message.
+ */
+ struct PendingAcknowledgement *pa_head;
+
+ /**
+ * Tail of DLL of PAs for this pending message.
+ */
+ struct PendingAcknowledgement *pa_tail;
+
/**
* This message, reliability boxed. Only possibly available if @e pmt is
* #PMT_CORE.
struct PendingMessage *bpm;
/**
- * Target of the request.
+ * Target of the request (for transmission, may not be ultimate
+ * destination!).
*/
struct Neighbour *target;
+ /**
+ * Distance vector path selected for this message, or
+ * NULL if transmitted directly.
+ */
+ struct DistanceVectorHop *dvh;
+
/**
* Set to non-NULL value if this message is currently being given to a
* communicator and we are awaiting that communicator's acknowledgement.
* UUID to use for this message (used for reassembly of fragments, only
* initialized if @e msg_uuid_set is #GNUNET_YES).
*/
- struct MessageUUID msg_uuid;
-
- /**
- * Counter incremented per generated fragment.
- */
- uint32_t frag_uuidgen;
+ struct MessageUUIDP msg_uuid;
/**
* Type of the pending message.
};
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayload
+{
+ /**
+ * When did we receive the message we are ACKing? Used to calculate
+ * the delay we introduced by cummulating ACKs.
+ */
+ struct GNUNET_TIME_Absolute receive_time;
+
+ /**
+ * UUID of a message being acknowledged.
+ */
+ struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Data structure in which we track acknowledgements still to
+ * be sent to the
+ */
+struct AcknowledgementCummulator
+{
+ /**
+ * Target peer for which we are accumulating ACKs here.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * ACK data being accumulated. Only @e num_acks slots are valid.
+ */
+ struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS];
+
+ /**
+ * Task scheduled either to transmit the cummulative ACK message,
+ * or to clean up this data structure after extended periods of
+ * inactivity (if @e num_acks is zero).
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * When is @e task run (only used if @e num_acks is non-zero)?
+ */
+ struct GNUNET_TIME_Absolute min_transmission_time;
+
+ /**
+ * Counter to produce the `ack_counter` in the `struct
+ * TransportReliabilityAckMessage`. Allows the receiver to detect
+ * lost ACK messages. Incremented by @e num_acks upon transmission.
+ */
+ uint32_t ack_counter;
+
+ /**
+ * Number of entries used in @e ack_uuids. Reset to 0 upon transmission.
+ */
+ unsigned int num_acks;
+};
+
+
/**
* One of the addresses of this peer.
*/
* (We must not rotate more often as otherwise we may discard valid answers
* due to packet losses, latency and reorderings on the network).
*/
- struct ChallengeNonce challenge;
+ struct ChallengeNonceP challenge;
/**
* Claimed address of the peer.
*/
static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers;
+/**
+ * Map from PIDs to `struct AcknowledgementCummulator`s.
+ * Here we track the cummulative ACKs for transmission.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
+
+/**
+ * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
+ * a `struct PendingAcknowledgement`.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+
/**
* Map from PIDs to `struct DistanceVector` entries describing
* known paths to the peer.
*/
static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
+/**
+ * Map from PIDs to `struct VirtualLink` entries describing
+ * links CORE knows to exist.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *links;
+
/**
* Map from challenges to `struct LearnLaunchEntry` values.
*/
*/
static struct GNUNET_SCHEDULER_Task *validation_task;
+/**
+ * The most recent PA we have created, head of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_head;
/**
- * Free cached ephemeral key.
- *
- * @param ece cached signature to free
+ * The oldest PA we have created, tail of DLL.
+ * The length of the DLL is kept in #pa_count.
*/
-static void
-free_ephemeral (struct EphemeralCacheEntry *ece)
-{
- GNUNET_CONTAINER_multipeermap_remove (ephemeral_map, &ece->target, ece);
- GNUNET_CONTAINER_heap_remove_node (ece->hn);
+static struct PendingAcknowledgement *pa_tail;
+
+/**
+ * Number of entries in the #pa_head/#pa_tail DLL. Used to
+ * limit the size of the data structure.
+ */
+static unsigned int pa_count;
+
+
+/**
+ * Get an offset into the transmission history buffer for `struct
+ * PerformanceData`. Note that the caller must perform the required
+ * modulo #GOODPUT_AGING_SLOTS operation before indexing into the
+ * array!
+ *
+ * An 'age' lasts 15 minute slots.
+ *
+ * @return current age of the world
+ */
+static unsigned int
+get_age ()
+{
+ struct GNUNET_TIME_Absolute now;
+
+ now = GNUNET_TIME_absolute_get ();
+ return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
+}
+
+
+/**
+ * Release @a pa data structure.
+ *
+ * @param pa data structure to release
+ */
+static void
+free_pending_acknowledgement (struct PendingAcknowledgement *pa)
+{
+ struct Queue *q = pa->queue;
+ struct PendingMessage *pm = pa->pm;
+ struct DistanceVectorHop *dvh = pa->dvh;
+
+ GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
+ pa_count--;
+ if (NULL != q)
+ {
+ GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
+ pa->queue = NULL;
+ }
+ if (NULL != pm)
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+ pa->pm = NULL;
+ }
+ if (NULL != dvh)
+ {
+ GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+ pa->queue = NULL;
+ }
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multishortmap_remove (pending_acks,
+ &pa->ack_uuid.value,
+ pa));
+ GNUNET_free (pa);
+}
+
+
+/**
+ * Free cached ephemeral key.
+ *
+ * @param ece cached signature to free
+ */
+static void
+free_ephemeral (struct EphemeralCacheEntry *ece)
+{
+ GNUNET_CONTAINER_multipeermap_remove (ephemeral_map, &ece->target, ece);
+ GNUNET_CONTAINER_heap_remove_node (ece->hn);
GNUNET_free (ece);
}
+/**
+ * Free virtual link.
+ *
+ * @param vl link data to free
+ */
+static void
+free_virtual_link (struct VirtualLink *vl)
+{
+ GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+ if (NULL != vl->visibility_task)
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = NULL;
+ }
+ GNUNET_break (NULL == vl->n);
+ GNUNET_break (NULL == vl->dv);
+ GNUNET_free (vl);
+}
+
+
/**
* Free validation state.
*
{
struct Neighbour *n = dvh->next_hop;
struct DistanceVector *dv = dvh->dv;
+ struct PendingAcknowledgement *pa;
+ struct PendingMessage *pm;
+ while (NULL != (pm = dvh->pending_msg_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (dvh,
+ dvh->pending_msg_head,
+ dvh->pending_msg_tail,
+ pm);
+ pm->dvh = NULL;
+ }
+ while (NULL != (pa = dvh->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+ pa->dvh = NULL;
+ }
GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
GNUNET_free (dvh);
GNUNET_assert (
GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
- if (NULL != dv->visibility_task)
- GNUNET_SCHEDULER_cancel (dv->visibility_task);
if (NULL != dv->timeout_task)
GNUNET_SCHEDULER_cancel (dv->timeout_task);
GNUNET_free (dv);
GNUNET_assert (rc == GNUNET_CONTAINER_heap_remove_node (rc->hn));
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
- &rc->msg_uuid.uuid,
- rc));
+ GNUNET_CONTAINER_multihashmap32_remove (n->reassembly_map,
+ rc->msg_uuid.uuid,
+ rc));
GNUNET_free (rc);
}
* @return #GNUNET_OK (continue iteration)
*/
static int
-free_reassembly_cb (void *cls,
- const struct GNUNET_ShortHashCode *key,
- void *value)
+free_reassembly_cb (void *cls, uint32_t key, void *value)
{
struct ReassemblyContext *rc = value;
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
neighbour));
- if (NULL != neighbour->timeout_task)
- GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
if (NULL != neighbour->reassembly_map)
{
- GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
- &free_reassembly_cb,
- NULL);
- GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
+ GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
+ &free_reassembly_cb,
+ NULL);
+ GNUNET_CONTAINER_multihashmap32_destroy (neighbour->reassembly_map);
neighbour->reassembly_map = NULL;
GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
neighbour->reassembly_heap = NULL;
free_dv_route (dv);
}
if (NULL != neighbour->reassembly_timeout_task)
+ {
GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
+ neighbour->reassembly_timeout_task = NULL;
+ }
+ if (NULL != neighbour->get)
+ {
+ GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
+ neighbour->get = NULL;
+ }
+ if (NULL != neighbour->sc)
+ {
+ GNUNET_PEERSTORE_store_cancel (neighbour->sc);
+ neighbour->sc = NULL;
+ }
GNUNET_free (neighbour);
}
*
* @param tc client to inform (must be CORE client)
* @param pid peer the connection is for
- * @param quota_out current quota for the peer
*/
static void
core_send_connect_info (struct TransportClient *tc,
- const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+ const struct GNUNET_PeerIdentity *pid)
{
struct GNUNET_MQ_Envelope *env;
struct ConnectInfoMessage *cim;
GNUNET_assert (CT_CORE == tc->type);
env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim->quota_out = quota_out;
cim->id = *pid;
GNUNET_MQ_send (tc->mq, env);
}
* Send message to CORE clients that we gained a connection
*
* @param pid peer the queue was for
- * @param quota_out current quota for the peer
*/
static void
-cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Informing CORE clients about connection to %s\n",
+ GNUNET_i2s (pid));
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
if (CT_CORE != tc->type)
continue;
- core_send_connect_info (tc, pid, quota_out);
+ core_send_connect_info (tc, pid);
}
}
static void
cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Informing CORE clients about disconnect from %s\n",
+ GNUNET_i2s (pid));
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
struct GNUNET_MQ_Envelope *env;
/**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * We believe we are ready to transmit a message on a queue. Gives the
+ * message to the communicator for transmission (updating the tracker,
+ * and re-scheduling itself if applicable).
*
* @param cls the `struct Queue` to process transmissions for
*/
struct Neighbour *n = queue->neighbour;
struct PendingMessage *pm = n->pending_msg_head;
struct GNUNET_TIME_Relative out_delay;
- unsigned int wsize;
GNUNET_assert (NULL != pm);
if (queue->tc->details.communicator.total_queue_length >=
return;
}
- wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */
- : queue->mtu;
- out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize);
- out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (
- pm->next_attempt),
- out_delay);
+ out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt);
if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
+ {
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Schedule transmission on queue %llu of %s decides to run immediately\n",
+ (unsigned long long) queue->qid,
+ GNUNET_i2s (&n->pid));
return; /* we should run immediately! */
+ }
/* queue has changed since we were scheduled, reschedule again */
queue->transmit_task =
GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
/**
- * Check whether the CORE visibility of @a n changed. If so,
- * check whether we need to notify CORE.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
*
- * @param n neighbour to perform the check for
+ * @param cls a `struct VirtualLink`
*/
static void
-update_neighbour_core_visibility (struct Neighbour *n);
+check_link_down (void *cls)
+{
+ struct VirtualLink *vl = cls;
+ struct DistanceVector *dv = vl->dv;
+ struct Neighbour *n = vl->n;
+ struct GNUNET_TIME_Absolute dvh_timeout;
+ struct GNUNET_TIME_Absolute q_timeout;
+
+ vl->visibility_task = NULL;
+ dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
+ if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ vl->dv = NULL;
+ q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+ q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+ if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ vl->n = NULL;
+ if ((NULL == vl->n) && (NULL == vl->dv))
+ {
+ cores_send_disconnect_info (&dv->target);
+ free_virtual_link (vl);
+ return;
+ }
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
+ &check_link_down,
+ vl);
+}
/**
.rtt = GNUNET_TIME_UNIT_FOREVER_REL};
struct QueueEntry *qe;
int maxxed;
+ struct PendingAcknowledgement *pa;
+ struct VirtualLink *vl;
if (NULL != queue->transmit_task)
{
GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task = NULL;
}
- if (NULL != queue->visibility_task)
+ while (NULL != (pa = queue->pa_head))
{
- GNUNET_SCHEDULER_cancel (queue->visibility_task);
- queue->visibility_task = NULL;
+ GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
+ pa->queue = NULL;
}
+
GNUNET_CONTAINER_MDLL_remove (neighbour,
neighbour->queue_head,
neighbour->queue_tail,
schedule_transmit_on_queue (s, GNUNET_NO);
}
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
GNUNET_free (queue);
- update_neighbour_core_visibility (neighbour);
- cores_send_disconnect_info (&neighbour->pid);
-
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+ if ((NULL != vl) && (neighbour == vl->n))
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ check_link_down (vl);
+ }
if (NULL == neighbour->queue_head)
{
free_neighbour (neighbour);
struct TransportClient *tc = app_ctx;
(void) cls;
+ (void) client;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client %p disconnected, cleaning up.\n",
tc);
void *value)
{
struct TransportClient *tc = cls;
- struct Neighbour *neighbour = value;
- core_send_connect_info (tc, pid, neighbour->quota_out);
+ (void) value;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Telling new CORE client about existing connection to %s\n",
+ GNUNET_i2s (pid));
+ core_send_connect_info (tc, pid);
return GNUNET_OK;
}
return;
}
tc->type = CT_CORE;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New CORE client with PID %s registered\n",
+ GNUNET_i2s (&start->self));
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
¬ify_client_connect_info,
tc);
/**
* Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
*
* @param root root of the tree to free
*/
while (NULL != (frag = root->head_frag))
{
+ struct PendingAcknowledgement *pa;
+
free_fragment_tree (frag);
+ while (NULL != (pa = frag->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+ pa->pm = NULL;
+ }
GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
GNUNET_free (frag);
}
{
struct TransportClient *tc = pm->client;
struct Neighbour *target = pm->target;
+ struct DistanceVectorHop *dvh = pm->dvh;
+ struct PendingAcknowledgement *pa;
if (NULL != tc)
{
tc->details.core.pending_msg_tail,
pm);
}
+ if (NULL != dvh)
+ {
+ GNUNET_CONTAINER_MDLL_remove (dvh,
+ dvh->pending_msg_head,
+ dvh->pending_msg_tail,
+ pm);
+ }
GNUNET_CONTAINER_MDLL_remove (neighbour,
target->pending_msg_head,
target->pending_msg_tail,
pm);
+ while (NULL != (pa = pm->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+ pa->pm = NULL;
+ }
+
free_fragment_tree (pm);
if (NULL != pm->qe)
{
/**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request. Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
*
* @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- * for transmission failure
- * @param bytes_physical amount of bandwidth consumed
*/
static void
-client_send_response (struct PendingMessage *pm,
- int success,
- uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
{
struct TransportClient *tc = pm->client;
struct Neighbour *target = pm->target;
if (NULL != tc)
{
env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->success = htonl ((uint32_t) success);
- som->bytes_msg = htons (pm->bytes_msg);
- som->bytes_physical = htonl (bytes_physical);
som->peer = target->pid;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Confirming transmission to %s\n",
+ GNUNET_i2s (&pm->target->pid));
GNUNET_MQ_send (tc->mq, env);
}
free_pending_message (pm);
/**
- * Checks the message queue for a neighbour for messages that have timed
- * out and purges them.
+ * Create a DV Box message.
*
- * @param cls a `struct Neighbour`
+ * @param total_hops how many hops did the message take so far
+ * @param num_hops length of the @a hops array
+ * @param origin origin of the message
+ * @param hops next peer(s) to the destination, including destination
+ * @param payload payload of the box
+ * @param payload_size number of bytes in @a payload
+ * @return boxed message (caller must #GNUNET_free() it).
+ */
+static struct TransportDVBoxMessage *
+create_dv_box (uint16_t total_hops,
+ const struct GNUNET_PeerIdentity *origin,
+ const struct GNUNET_PeerIdentity *target,
+ uint16_t num_hops,
+ const struct GNUNET_PeerIdentity *hops,
+ const void *payload,
+ uint16_t payload_size)
+{
+ struct TransportDVBoxMessage *dvb;
+ struct GNUNET_PeerIdentity *dhops;
+
+ GNUNET_assert (UINT16_MAX <
+ sizeof (struct TransportDVBoxMessage) +
+ sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
+ payload_size);
+ dvb = GNUNET_malloc (sizeof (struct TransportDVBoxMessage) +
+ sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) +
+ payload_size);
+ dvb->header.size =
+ htons (sizeof (struct TransportDVBoxMessage) +
+ sizeof (struct GNUNET_PeerIdentity) * (num_hops + 1) + payload_size);
+ dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
+ dvb->total_hops = htons (total_hops);
+ dvb->num_hops = htons (num_hops + 1);
+ dvb->origin = *origin;
+ dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
+ memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
+ dhops[num_hops] = *target;
+ memcpy (&dhops[num_hops + 1], payload, payload_size);
+
+ if (GNUNET_EXTRA_LOGGING > 0)
+ {
+ char *path;
+
+ path = GNUNET_strdup (GNUNET_i2s (&dvb->origin));
+ for (unsigned int i = 0; i <= num_hops; i++)
+ {
+ char *tmp;
+
+ GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[i]));
+ GNUNET_free (path);
+ path = tmp;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating DVBox for %u bytes of payload via %s\n",
+ (unsigned int) payload_size,
+ path);
+ GNUNET_free (path);
+ }
+
+ return dvb;
+}
+
+
+/**
+ * Pick @a hops_array_length random DV paths satisfying @a options
+ *
+ * @param dv data structure to pick paths from
+ * @param options constraints to satisfy
+ * @param hops_array[out] set to the result
+ * @param hops_array_length length of the @a hops_array
+ * @return number of entries set in @a hops_array
*/
-static void
-check_queue_timeouts (void *cls)
+static unsigned int
+pick_random_dv_hops (const struct DistanceVector *dv,
+ enum RouteMessageOptions options,
+ struct DistanceVectorHop **hops_array,
+ unsigned int hops_array_length)
{
- struct Neighbour *n = cls;
- struct PendingMessage *pm;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute earliest_timeout;
+ uint64_t choices[hops_array_length];
+ uint64_t num_dv;
+ unsigned int dv_count;
- n->timeout_task = NULL;
- earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
- now = GNUNET_TIME_absolute_get ();
- for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm)
+ /* Pick random vectors, but weighted by distance, giving more weight
+ to shorter vectors */
+ num_dv = 0;
+ dv_count = 0;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
+ (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
+ .rel_value_us == 0))
+ continue; /* pos unconfirmed and confirmed required */
+ num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
+ dv_count++;
+ }
+ if (0 == dv_count)
+ return 0;
+ if (dv_count <= hops_array_length)
+ {
+ dv_count = 0;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ hops_array[dv_count++] = pos;
+ return dv_count;
+ }
+ for (unsigned int i = 0; i < hops_array_length; i++)
{
- pm = pos->next_neighbour;
- if (pos->timeout.abs_value_us <= now.abs_value_us)
+ int ok = GNUNET_NO;
+ while (GNUNET_NO == ok)
{
- GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (timeout before confirmation)",
- 1,
- GNUNET_NO);
- client_send_response (pm, GNUNET_NO, 0);
- continue;
+ choices[i] =
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
+ ok = GNUNET_YES;
+ for (unsigned int j = 0; j < i; j++)
+ if (choices[i] == choices[j])
+ {
+ ok = GNUNET_NO;
+ break;
+ }
}
- earliest_timeout =
- GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
}
- n->earliest_timeout = earliest_timeout;
- if (NULL != n->pending_msg_head)
- n->timeout_task =
- GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
+ dv_count = 0;
+ num_dv = 0;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
+
+ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
+ (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
+ .rel_value_us == 0))
+ continue; /* pos unconfirmed and confirmed required */
+ for (unsigned int i = 0; i < hops_array_length; i++)
+ if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
+ hops_array[dv_count++] = pos;
+ num_dv += delta;
+ }
+ return dv_count;
}
struct PendingMessage *pm;
const struct GNUNET_MessageHeader *obmm;
struct Neighbour *target;
+ struct DistanceVector *dv;
+ struct DistanceVectorHop *dvh;
uint32_t bytes_msg;
int was_empty;
+ const void *payload;
+ size_t payload_size;
+ struct TransportDVBoxMessage *dvb;
+ struct VirtualLink *vl;
GNUNET_assert (CT_CORE == tc->type);
obmm = (const struct GNUNET_MessageHeader *) &obm[1];
bytes_msg = ntohs (obmm->size);
- target = lookup_neighbour (&obm->peer);
- if (NULL == target)
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+ if (NULL == vl)
{
/* Failure: don't have this peer as a neighbour (anymore).
Might have gone down asynchronously, so this is NOT
a protocol violation by CORE. Still count the event,
as this should be rare. */
- struct GNUNET_MQ_Envelope *env;
- struct SendOkMessage *som;
-
- env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->success = htonl (GNUNET_SYSERR);
- som->bytes_msg = htonl (bytes_msg);
- som->bytes_physical = htonl (0);
- som->peer = obm->peer;
- GNUNET_MQ_send (tc->mq, env);
GNUNET_SERVICE_client_continue (tc->client);
GNUNET_STATISTICS_update (GST_stats,
"# messages dropped (neighbour unknown)",
GNUNET_NO);
return;
}
+ target = lookup_neighbour (&obm->peer);
+ if (NULL == target)
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
+ else
+ dv = NULL;
+ GNUNET_assert ((NULL != target) || (NULL != dv));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u bytes to %s using %s\n",
+ bytes_msg,
+ GNUNET_i2s (&obm->peer),
+ (NULL == target) ? "distance vector path" : "direct queue");
+ if (NULL == target)
+ {
+ unsigned int res;
+ struct DistanceVectorHop *dvh;
+
+ res = pick_random_dv_hops (dv, RMO_NONE, &dvh, 1);
+ GNUNET_assert (1 == res);
+ target = dvh->next_hop;
+ dvb = create_dv_box (0,
+ &GST_my_identity,
+ &obm->peer,
+ dvh->distance,
+ dvh->path,
+ &obm[1],
+ bytes_msg);
+ payload = dvb;
+ payload_size = ntohs (dvb->header.size);
+ }
+ else
+ {
+ dvh = NULL;
+ dvb = NULL;
+ payload = &obm[1];
+ payload_size = bytes_msg;
+ }
+
was_empty = (NULL == target->pending_msg_head);
- pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + payload_size);
pm->client = tc;
pm->target = target;
- pm->bytes_msg = bytes_msg;
- pm->timeout =
- GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
- memcpy (&pm[1], &obm[1], bytes_msg);
+ pm->bytes_msg = payload_size;
+ memcpy (&pm[1], payload, payload_size);
+ GNUNET_free_non_null (dvb);
+ dvb = NULL;
+ pm->dvh = dvh;
+ if (NULL != dvh)
+ {
+ GNUNET_CONTAINER_MDLL_insert (dvh,
+ dvh->pending_msg_head,
+ dvh->pending_msg_tail,
+ pm);
+ }
GNUNET_CONTAINER_MDLL_insert (neighbour,
target->pending_msg_head,
target->pending_msg_tail,
tc->details.core.pending_msg_head,
tc->details.core.pending_msg_tail,
pm);
- if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
- {
- target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
- if (NULL != target->timeout_task)
- GNUNET_SCHEDULER_cancel (target->timeout_task);
- target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
- &check_queue_timeouts,
- target);
- }
if (! was_empty)
return; /* all queues must already be busy */
for (struct Queue *queue = target->queue_head; NULL != queue;
{
/* try transmission on any queue that is idle */
if (NULL == queue->transmit_task)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue %llu to %s is idle, triggering transmission\n",
+ (unsigned long long) queue->qid,
+ GNUNET_i2s (&queue->neighbour->pid));
queue->transmit_task =
GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
+ }
}
}
}
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+ if (0 != ntohl (cmc->im.fc_on))
+ {
+ /* send ACK when done to communicator for flow control! */
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+ env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+ ack->reserved = htonl (0);
+ ack->fc_id = cmc->im.fc_id;
+ ack->sender = cmc->im.sender;
+ GNUNET_MQ_send (cmc->tc->mq, env);
+ }
+ GNUNET_SERVICE_client_continue (cmc->tc->client);
+ GNUNET_free (cmc);
+}
+
+
+/**
+ * Client confirms that it is done handling message(s) to a particular
+ * peer. We may now provide more messages to CORE for this peer.
+ *
+ * Notifies the respective queues that more messages can now be received.
+ *
+ * @param cls the client
+ * @param rom the message that was sent
+ */
+static void
+handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
+{
+ struct TransportClient *tc = cls;
+ struct VirtualLink *vl;
+ uint32_t delta;
+ struct CommunicatorMessageContext *cmc;
+
+ if (CT_CORE != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+ if (NULL == vl)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# RECV_OK dropped: virtual link unknown",
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVICE_client_continue (tc->client);
+ return;
+ }
+ delta = ntohl (rom->increase_window_delta);
+ vl->core_recv_window += delta;
+ if (vl->core_recv_window <= 0)
+ return;
+ /* resume communicators */
+ while (NULL != (cmc = vl->cmc_tail))
+ {
+ GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+ finish_cmc_handling (cmc);
+ }
+}
+
+
/**
* Communicator started. Process the request.
*
size = ntohs (cam->header.size) - sizeof (*cam);
if (0 == size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Receive-only communicator connected\n");
return; /* receive-only communicator */
+ }
tc->details.communicator.address_prefix =
GNUNET_strdup ((const char *) &cam[1]);
tc->details.communicator.cc =
(enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Communicator with prefix `%s' connected\n",
+ tc->details.communicator.address_prefix);
GNUNET_SERVICE_client_continue (tc->client);
}
(void) cls;
msize = ntohs (cb->header.size) - sizeof (*cb);
- if (UINT16_MAX - msize >
+ if (((size_t) (UINT16_MAX - msize)) >
sizeof (struct TransportBackchannelEncapsulationMessage) +
- sizeof (struct TransportBackchannelRequestPayload))
+ sizeof (struct TransportBackchannelRequestPayloadP))
{
GNUNET_break (0);
return GNUNET_SYSERR;
/**
- * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
- * one, cache it and return it.
+ * Lookup ephemeral key in our #ephemeral_map. If no valid one exists,
+ * generate one, cache it and return it.
*
* @param pid peer to look up ephemeral for
* @param private_key[out] set to the private key
* @param ephemeral_key[out] set to the key
* @param ephemeral_sender_sig[out] set to the signature
- * @param ephemeral_validity[out] set to the validity expiration time
+ * @param monotime[out] set to the monotime used for the signature
*/
static void
lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
- struct GNUNET_TIME_Absolute *ephemeral_validity)
+ struct GNUNET_TIME_Absolute *monotime)
{
struct EphemeralCacheEntry *ece;
struct EphemeralConfirmationPS ec;
{
ece = GNUNET_new (struct EphemeralCacheEntry);
ece->target = *pid;
+ ece->monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
ece->ephemeral_validity =
- GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
- EPHEMERAL_VALIDITY);
+ GNUNET_TIME_absolute_add (ece->monotime, EPHEMERAL_VALIDITY);
GNUNET_assert (GNUNET_OK ==
GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, &ece->ephemeral_key);
*private_key = ece->private_key;
*ephemeral_key = ece->ephemeral_key;
*ephemeral_sender_sig = ece->sender_sig;
- *ephemeral_validity = ece->ephemeral_validity;
+ *monotime = ece->monotime;
}
struct GNUNET_TRANSPORT_SendMessageTo *smt;
struct GNUNET_MQ_Envelope *env;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing %u bytes of payload for transmission on queue %llu to %s\n",
+ (unsigned int) payload_size,
+ (unsigned long long) queue->qid,
+ GNUNET_i2s (&queue->neighbour->pid));
env = GNUNET_MQ_msg_extra (smt,
payload_size,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
}
}
-
-/**
- * Which transmission options are allowable for transmission?
- * Interpreted bit-wise!
- */
-enum RouteMessageOptions
-{
- /**
- * Only confirmed, non-DV direct neighbours.
- */
- RMO_NONE = 0,
-
- /**
- * We are allowed to use DV routing for this @a hdr
- */
- RMO_DV_ALLOWED = 1,
-
- /**
- * We are allowed to use unconfirmed queues or DV routes for this message
- */
- RMO_UNCONFIRMED_ALLOWED = 2,
-
- /**
- * Reliable and unreliable, DV and non-DV are all acceptable.
- */
- RMO_ANYTHING_GOES = (RMO_DV_ALLOWED | RMO_UNCONFIRMED_ALLOWED),
-
- /**
- * If we have multiple choices, it is OK to send this message
- * over multiple channels at the same time to improve loss tolerance.
- * (We do at most 2 transmissions.)
- */
- RMO_REDUNDANT = 4
-};
-
+
+// FIXME: improve logging after this point!
/**
* Pick a queue of @a n under constraints @a options and schedule
for (struct Queue *pos = n->queue_head; NULL != pos;
pos = pos->next_neighbour)
{
- /* Count the queue with the visibility task in all cases, as
- otherwise we may end up with no queues just because the
- time for the visibility task just expired but the scheduler
- just ran this task first */
if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
- (pos->validated_until.abs_value_us > now.abs_value_us) ||
- (NULL != pos->visibility_task))
+ (pos->validated_until.abs_value_us > now.abs_value_us))
candidates++;
}
if (0 == candidates)
{
- /* Given that we above check for pos->visibility task,
- this should be strictly impossible. */
- GNUNET_break (0);
+ /* This can happen rarely if the last confirmed queue timed
+ out just as we were beginning to process this message. */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# route selection failed (all no valid queue)",
+ 1,
+ GNUNET_NO);
return;
}
sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
for (struct Queue *pos = n->queue_head; NULL != pos;
pos = pos->next_neighbour)
{
- /* Count the queue with the visibility task in all cases, as
- otherwise we may end up with no queues just because the
- time for the visibility task just expired but the scheduler
- just ran this task first */
- if ((pos->validated_until.abs_value_us > now.abs_value_us) ||
- (NULL != pos->visibility_task))
+ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+ (pos->validated_until.abs_value_us > now.abs_value_us))
{
if ((sel1 == candidates) || (sel2 == candidates))
queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
const struct GNUNET_MessageHeader *payload,
enum RouteMessageOptions options)
{
- uint16_t mlen = ntohs (payload->size);
- char boxram[sizeof (struct TransportDVBox) +
- (dvh->distance + 1) * sizeof (struct GNUNET_PeerIdentity) +
- mlen] GNUNET_ALIGN;
- struct TransportDVBox *box = (struct TransportDVBox *) boxram;
- struct GNUNET_PeerIdentity *path = (struct GNUNET_PeerIdentity *) &box[1];
+ struct TransportDVBoxMessage *dvb;
- box->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
- box->header.size = htons (sizeof (boxram));
- box->total_hops = htons (0);
- box->num_hops = htons (dvh->distance + 1);
- box->origin = GST_my_identity;
- memcpy (path, dvh->path, dvh->distance * sizeof (struct GNUNET_PeerIdentity));
- path[dvh->distance] = dvh->dv->target;
- memcpy (&path[dvh->distance + 1], payload, mlen);
- route_via_neighbour (dvh->next_hop, &box->header, options);
+ dvb = create_dv_box (0,
+ &GST_my_identity,
+ &dvh->dv->target,
+ dvh->distance,
+ dvh->path,
+ payload,
+ ntohs (payload->size));
+ route_via_neighbour (dvh->next_hop, &dvb->header, options);
+ GNUNET_free (dvb);
}
const struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
{
- struct DistanceVectorHop *h1;
- struct DistanceVectorHop *h2;
- uint64_t num_dv;
- uint64_t choice1;
- uint64_t choice2;
+ struct DistanceVectorHop *hops[2];
+ unsigned int res;
- /* Pick random vectors, but weighted by distance, giving more weight
- to shorter vectors */
- num_dv = 0;
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
- {
- if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
- (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
- .rel_value_us == 0))
- continue; /* pos unconfirmed and confirmed required */
- num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
- }
- if (0 == num_dv)
- {
- GNUNET_break (0);
- return;
- }
- choice1 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
- choice2 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, num_dv);
- num_dv = 0;
- h1 = NULL;
- h2 = NULL;
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
- {
- uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
-
- if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
- (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
- .rel_value_us == 0))
- continue; /* pos unconfirmed and confirmed required */
- if ((num_dv <= choice1) && (num_dv + delta > choice1))
- h1 = pos;
- if ((num_dv <= choice2) && (num_dv + delta > choice2))
- h2 = pos;
- num_dv += delta;
- }
- forward_via_dvh (h1, hdr, options & (~RMO_REDUNDANT));
- if (0 == (options & RMO_REDUNDANT))
- forward_via_dvh (h2, hdr, options & (~RMO_REDUNDANT));
+ res = pick_random_dv_hops (dv,
+ options,
+ hops,
+ (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
+ for (unsigned int i = 0; i < res; i++)
+ forward_via_dvh (hops[i], hdr, options & (~RMO_REDUNDANT));
}
struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
{
+ struct VirtualLink *vl;
struct Neighbour *n;
struct DistanceVector *dv;
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, target);
- dv = (0 != (options & RMO_DV_ALLOWED))
- ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target)
- : NULL;
+ vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+ n = vl->n;
+ dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
{
/* if confirmed is required, and we do not have anything
confirmed, drop respective options */
- if ((NULL != n) && (GNUNET_NO == n->core_visible))
- n = NULL;
- if ((NULL != dv) && (GNUNET_NO == dv->core_visible))
- dv = NULL;
+ if (NULL == n)
+ n = lookup_neighbour (target);
+ if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
}
if ((NULL == n) && (NULL == dv))
{
/**
- * FIXME: comment!
+ * Given the key material in @a km and the initialization vector
+ * @a iv, setup the key material for the backchannel in @a key.
+ *
+ * @param km raw master secret
+ * @param iv initialization vector
+ * @param key[out] symmetric cipher and HMAC state to generate
*/
static void
bc_setup_key_state_from_km (const struct GNUNET_HashCode *km,
{
struct TransportClient *tc = cls;
struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
- struct GNUNET_TIME_Absolute ephemeral_validity;
+ struct GNUNET_TIME_Absolute monotime;
struct TransportBackchannelEncapsulationMessage *enc;
- struct TransportBackchannelRequestPayload ppay;
+ struct TransportBackchannelRequestPayloadP ppay;
struct BackchannelKeyState key;
char *mpos;
uint16_t msize;
/* encapsulate and encrypt message */
msize = ntohs (cb->header.size) - sizeof (*cb) +
- sizeof (struct TransportBackchannelRequestPayload);
+ sizeof (struct TransportBackchannelRequestPayloadP);
enc = GNUNET_malloc (sizeof (*enc) + msize);
enc->header.type =
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
&private_key,
&enc->ephemeral_key,
&ppay.sender_sig,
- &ephemeral_validity);
+ &monotime);
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&enc->iv,
sizeof (enc->iv));
dh_key_derive_eph_pid (&private_key, &cb->pid, &enc->iv, &key);
- ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
- ppay.monotonic_time =
- GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
+ ppay.monotonic_time = GNUNET_TIME_absolute_hton (monotime);
mpos = (char *) &enc[1];
bc_encrypt (&key, &ppay, mpos, sizeof (ppay));
bc_encrypt (&key,
}
-/**
- * Context from #handle_incoming_msg(). Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
- /**
- * Which communicator provided us with the message.
- */
- struct TransportClient *tc;
-
- /**
- * Additional information for flow control and about the sender.
- */
- struct GNUNET_TRANSPORT_IncomingMessage im;
-
- /**
- * Number of hops the message has travelled (if DV-routed).
- * FIXME: make use of this in ACK handling!
- */
- uint16_t total_hops;
-};
-
-
/**
* Given an inbound message @a msg from a communicator @a cmc,
* demultiplex it based on the type calling the right handler.
const struct GNUNET_MessageHeader *msg);
-/**
- * Send ACK to communicator (if requested) and free @a cmc.
- *
- * @param cmc context for which we are done handling the message
- */
-static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
-{
- if (0 != ntohl (cmc->im.fc_on))
- {
- /* send ACK when done to communicator for flow control! */
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
-
- env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
- ack->reserved = htonl (0);
- ack->fc_id = cmc->im.fc_id;
- ack->sender = cmc->im.sender;
- GNUNET_MQ_send (cmc->tc->mq, env);
- }
- GNUNET_SERVICE_client_continue (cmc->tc->client);
- GNUNET_free (cmc);
-}
-
-
/**
* Communicator gave us an unencapsulated message to pass as-is to
* CORE. Process the request.
handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
{
struct CommunicatorMessageContext *cmc = cls;
+ struct VirtualLink *vl;
uint16_t size = ntohs (mh->size);
if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
GNUNET_SERVICE_client_drop (client);
return;
}
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+ if (NULL == vl)
+ {
+ /* FIXME: sender is giving us messages for CORE but we don't have
+ the link up yet! I *suspect* this can happen right now (i.e.
+ sender has verified us, but we didn't verify sender), but if
+ we pass this on, CORE would be confused (link down, messages
+ arrive). We should investigate more if this happens often,
+ or in a persistent manner, and possibly do "something" about
+ it. Thus logging as error for now. */
+ GNUNET_break_op (0);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (virtual link still down)",
+ 1,
+ GNUNET_NO);
+
+ finish_cmc_handling (cmc);
+ return;
+ }
/* Forward to all CORE clients */
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
memcpy (&im[1], mh, size);
GNUNET_MQ_send (tc->mq, env);
}
- /* FIXME: consider doing this _only_ once the message
- was drained from the CORE MQs to extend flow control to CORE!
- (basically, increment counter in cmc, decrement on MQ send continuation! */
- finish_cmc_handling (cmc);
+ vl->core_recv_window--;
+ if (vl->core_recv_window > 0)
+ {
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* Wait with calling #finish_cmc_handling(cmc) until the message
+ was processed by CORE MQs (for CORE flow control)! */
+ GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
}
* @return #GNUNET_YES if message is well-formed
*/
static int
-check_fragment_box (void *cls, const struct TransportFragmentBox *fb)
+check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
{
uint16_t size = ntohs (fb->header.size);
uint16_t bsize = size - sizeof (*fb);
+ (void) cls;
if (0 == bsize)
{
GNUNET_break_op (0);
/**
- * Generate a fragment acknowledgement for an @a rc.
+ * Clean up an idle cummulative acknowledgement data structure.
*
- * @param rc context to generate ACK for, @a rc ACK state is reset
+ * @param cls a `struct AcknowledgementCummulator *`
*/
static void
-send_fragment_ack (struct ReassemblyContext *rc)
+destroy_ack_cummulator (void *cls)
{
- struct TransportFragmentAckMessage *ack;
+ struct AcknowledgementCummulator *ac = cls;
- ack = GNUNET_new (struct TransportFragmentAckMessage);
- ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
- ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
- ack->frag_uuid.uuid = htonl (rc->frag_uuid);
- ack->extra_acks = GNUNET_htonll (rc->extra_acks);
- ack->msg_uuid = rc->msg_uuid;
- ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
- if (0 == rc->msg_missing)
- ack->reassembly_timeout = GNUNET_TIME_relative_hton (
- GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
+ ac->task = NULL;
+ GNUNET_assert (0 == ac->num_acks);
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
+ GNUNET_free (ac);
+}
+
+
+/**
+ * Do the transmission of a cummulative acknowledgement now.
+ *
+ * @param cls a `struct AcknowledgementCummulator *`
+ */
+static void
+transmit_cummulative_ack_cb (void *cls)
+{
+ struct AcknowledgementCummulator *ac = cls;
+ struct TransportReliabilityAckMessage *ack;
+ struct TransportCummulativeAckPayloadP *ap;
+
+ ac->task = NULL;
+ GNUNET_assert (0 < ac->ack_counter);
+ ack = GNUNET_malloc (sizeof (*ack) +
+ ac->ack_counter *
+ sizeof (struct TransportCummulativeAckPayloadP));
+ ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+ ack->header.size =
+ htons (sizeof (*ack) +
+ ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+ ack->ack_counter = htonl (ac->ack_counter++);
+ ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
+ for (unsigned int i = 0; i < ac->ack_counter; i++)
+ {
+ ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
+ ap[i].ack_delay = GNUNET_TIME_relative_hton (
+ GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
+ }
+ route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+ ac->num_acks = 0;
+ ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
+ &destroy_ack_cummulator,
+ ac);
+}
+
+
+/**
+ * Transmit an acknowledgement for @a ack_uuid to @a pid delaying
+ * transmission by at most @a ack_delay.
+ *
+ * @param pid target peer
+ * @param ack_uuid UUID to ack
+ * @param max_delay how long can the ACK wait
+ */
+static void
+cummulative_ack (const struct GNUNET_PeerIdentity *pid,
+ const struct AcknowledgementUUIDP *ack_uuid,
+ struct GNUNET_TIME_Absolute max_delay)
+{
+ struct AcknowledgementCummulator *ac;
+
+ ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
+ if (NULL == ac)
+ {
+ ac = GNUNET_new (struct AcknowledgementCummulator);
+ ac->target = *pid;
+ ac->min_transmission_time = max_delay;
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ ack_cummulators,
+ &ac->target,
+ ac,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ }
else
- ack->reassembly_timeout = GNUNET_TIME_relative_hton (
- GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
- route_message (&rc->neighbour->pid, &ack->header, RMO_DV_ALLOWED);
- rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
- rc->num_acks = 0;
- rc->extra_acks = 0LLU;
+ {
+ if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
+ {
+ /* must run immediately, ack buffer full! */
+ GNUNET_SCHEDULER_cancel (ac->task);
+ transmit_cummulative_ack_cb (ac);
+ }
+ GNUNET_SCHEDULER_cancel (ac->task);
+ ac->min_transmission_time =
+ GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
+ }
+ GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
+ ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
+ ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
+ ac->num_acks++;
+ ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
+ &transmit_cummulative_ack_cb,
+ ac);
+}
+
+
+/**
+ * Closure for #find_by_message_uuid.
+ */
+struct FindByMessageUuidContext
+{
+ /**
+ * UUID to look for.
+ */
+ struct MessageUUIDP message_uuid;
+
+ /**
+ * Set to the reassembly context if found.
+ */
+ struct ReassemblyContext *rc;
+};
+
+
+/**
+ * Iterator called to find a reassembly context by the message UUID in the
+ * multihashmap32.
+ *
+ * @param cls a `struct FindByMessageUuidContext`
+ * @param key a key (unused)
+ * @param value a `struct ReassemblyContext`
+ * @return #GNUNET_YES if not found, #GNUNET_NO if found
+ */
+static int
+find_by_message_uuid (void *cls, uint32_t key, void *value)
+{
+ struct FindByMessageUuidContext *fc = cls;
+ struct ReassemblyContext *rc = value;
+
+ (void) key;
+ if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
+ {
+ fc->rc = rc;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
}
* @param fb the message that was received
*/
static void
-handle_fragment_box (void *cls, const struct TransportFragmentBox *fb)
+handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
{
struct CommunicatorMessageContext *cmc = cls;
struct Neighbour *n;
uint16_t msize;
uint16_t fsize;
uint16_t frag_off;
- uint32_t frag_uuid;
char *target;
struct GNUNET_TIME_Relative cdelay;
- int ack_now;
+ struct FindByMessageUuidContext fc;
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
+ n = lookup_neighbour (&cmc->im.sender);
if (NULL == n)
{
struct GNUNET_SERVICE_Client *client = cmc->tc->client;
}
if (NULL == n->reassembly_map)
{
- n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8, GNUNET_YES);
+ n->reassembly_map = GNUNET_CONTAINER_multihashmap32_create (8);
n->reassembly_heap =
GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
n->reassembly_timeout_task =
n);
}
msize = ntohs (fb->msg_size);
- rc =
- GNUNET_CONTAINER_multishortmap_get (n->reassembly_map, &fb->msg_uuid.uuid);
- if (NULL == rc)
+ fc.message_uuid = fb->msg_uuid;
+ fc.rc = NULL;
+ GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
+ fb->msg_uuid.uuid,
+ &find_by_message_uuid,
+ &fc);
+ if (NULL == (rc = fc.rc))
{
rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */
(msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
rc,
rc->reassembly_timeout.abs_value_us);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multishortmap_put (
+ GNUNET_CONTAINER_multihashmap32_put (
n->reassembly_map,
- &rc->msg_uuid.uuid,
+ rc->msg_uuid.uuid,
rc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
target = (char *) &rc[1];
rc->bitfield = (uint8_t *) (target + rc->msg_size);
rc->msg_missing = rc->msg_size;
/* reassemble */
fsize = ntohs (fb->header.size) - sizeof (*fb);
+ if (0 == fsize)
+ {
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
frag_off = ntohs (fb->frag_off);
memcpy (&target[frag_off], &fb[1], fsize);
/* update bitfield and msg_missing */
}
/* Compute cummulative ACK */
- frag_uuid = ntohl (fb->frag_uuid.uuid);
cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
- cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks);
+ cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
+ if (0 == rc->msg_missing)
+ cdelay = GNUNET_TIME_UNIT_ZERO;
+ cummulative_ack (&cmc->im.sender,
+ &fb->ack_uuid,
+ GNUNET_TIME_relative_to_absolute (cdelay));
rc->last_frag = GNUNET_TIME_absolute_get ();
- rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay);
- ack_now = GNUNET_NO;
- if (0 == rc->num_acks)
- {
- /* case one: first ack */
- rc->frag_uuid = frag_uuid;
- rc->extra_acks = 0LLU;
- rc->num_acks = 1;
- }
- else if ((frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64))
- {
- /* case two: ack fits after existing min UUID */
- if ((frag_uuid == rc->frag_uuid) ||
- (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))))
- {
- /* duplicate fragment, ack now! */
- ack_now = GNUNET_YES;
- }
- else
- {
- rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
- rc->num_acks++;
- }
- }
- else if ((rc->frag_uuid > frag_uuid) &&
- (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks)) ||
- ((rc->frag_uuid < frag_uuid + 64) &&
- (rc->extra_acks ==
- (rc->extra_acks &
- ~((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))))))
- {
- /* can fit ack by shifting extra acks and starting at
- frag_uid, test above esured that the bits we will
- shift 'extra_acks' by are all zero. */
- rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
- rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
- rc->frag_uuid = frag_uuid;
- rc->num_acks++;
- }
- if (65 == rc->num_acks) /* OPTIMIZE-FIXME: maybe use smaller threshold? This
- is very aggressive. */
- ack_now = GNUNET_YES; /* maximum acks received */
- // FIXME: possibly also ACK based on RTT (but for that we'd need to
- // determine the queue used for the ACK first!)
-
/* is reassembly complete? */
if (0 != rc->msg_missing)
{
- if (ack_now)
- send_fragment_ack (rc);
finish_cmc_handling (cmc);
return;
}
return;
}
/* successful reassembly */
- send_fragment_ack (rc);
demultiplex_with_cmc (cmc, msg);
/* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
en-route and we forget that we finished this reassembly immediately!
/**
- * Check the @a fa against the fragments associated with @a pm.
- * If it matches, remove the matching fragments from the transmission
- * list.
+ * Communicator gave us a reliability box. Check the message.
*
- * @param pm pending message to check against the ack
- * @param fa the ack that was received
- * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
*/
static int
-check_ack_against_pm (struct PendingMessage *pm,
- const struct TransportFragmentAckMessage *fa)
-{
- int match;
- struct PendingMessage *nxt;
- uint32_t fs = ntohl (fa->frag_uuid.uuid);
- uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
-
- match = GNUNET_NO;
- for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt)
- {
- const struct TransportFragmentBox *tfb =
- (const struct TransportFragmentBox *) &pm[1];
- uint32_t fu = ntohl (tfb->frag_uuid.uuid);
-
- GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
- nxt = frag->next_frag;
- /* Check for exact match or match in the 'xtra' bitmask */
- if ((fu == fs) ||
- ((fu > fs) && (fu <= fs + 64) && (0 != (1LLU << (fu - fs - 1) & xtra))))
- {
- match = GNUNET_YES;
- free_fragment_tree (frag);
- }
- }
- return match;
+check_reliability_box (void *cls,
+ const struct TransportReliabilityBoxMessage *rb)
+{
+ (void) cls;
+ GNUNET_MQ_check_boxed_message (rb);
+ return GNUNET_YES;
}
/**
- * Communicator gave us a fragment acknowledgement. Process the request.
+ * Communicator gave us a reliability box. Process the request.
*
* @param cls a `struct CommunicatorMessageContext` (must call
* #finish_cmc_handling() when done)
- * @param fa the message that was received
+ * @param rb the message that was received
*/
static void
-handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa)
+handle_reliability_box (void *cls,
+ const struct TransportReliabilityBoxMessage *rb)
{
struct CommunicatorMessageContext *cmc = cls;
- struct Neighbour *n;
- int matched;
+ const struct GNUNET_MessageHeader *inbox =
+ (const struct GNUNET_MessageHeader *) &rb[1];
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
- if (NULL == n)
- {
- struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+ // FIXME: call cummulative_ack(), have ack_countdown influence max_delay!
+ (void) (0 == ntohl (rb->ack_countdown));
+ /* continue with inner message */
+ demultiplex_with_cmc (cmc, inbox);
+}
- GNUNET_break (0);
- finish_cmc_handling (cmc);
- GNUNET_SERVICE_client_drop (client);
- return;
- }
- /* FIXME-OPTIMIZE: maybe use another hash map here? */
- matched = GNUNET_NO;
- for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm;
- pm = pm->prev_neighbour)
- {
- if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid))
- continue;
- matched = GNUNET_YES;
- if (GNUNET_YES == check_ack_against_pm (pm, fa))
- {
- struct GNUNET_TIME_Relative avg_ack_delay =
- GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
- // FIXME: update RTT and other reliability data!
- // ISSUE: we don't know which of n's queues the message(s)
- // took (and in fact the different messages might have gone
- // over different queues and possibly over multiple).
- // => track queues with PendingMessages, and update RTT only if
- // the queue used is unique?
- // -> how can we get loss rates?
- // -> or, add extra state to Box and ACK to identify queue?
- // IDEA: generate MULTIPLE frag-uuids per fragment and track
- // the queue with the fragment! (-> this logic must
- // be moved into check_ack_against_pm!)
- (void) avg_ack_delay;
- }
- else
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# FRAGMENT_ACKS dropped, no matching fragment",
- 1,
- GNUNET_NO);
- }
- if (NULL == pm->head_frag)
- {
- // if entire message is ACKed, handle that as well.
- // => clean up PM, any post actions?
- free_pending_message (pm);
- }
- else
- {
- struct GNUNET_TIME_Relative reassembly_timeout =
- GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
- // OPTIMIZE-FIXME: adjust retransmission strategy based on
- // reassembly_timeout!
- (void) reassembly_timeout;
- }
- break;
- }
- if (GNUNET_NO == matched)
+
+/**
+ * Check if we have advanced to another age since the last time. If
+ * so, purge ancient statistics (more than GOODPUT_AGING_SLOTS before
+ * the current age)
+ *
+ * @param pd[in,out] data to update
+ * @param age current age
+ */
+static void
+update_pd_age (struct PerformanceData *pd, unsigned int age)
+{
+ unsigned int sage;
+
+ if (age == pd->last_age)
+ return; /* nothing to do */
+ sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
+ for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
{
- GNUNET_STATISTICS_update (GST_stats,
- "# FRAGMENT_ACKS dropped, no matching pending message",
- 1,
- GNUNET_NO);
+ struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
+
+ the->bytes_sent = 0;
+ the->bytes_received = 0;
}
- finish_cmc_handling (cmc);
+ pd->last_age = age;
}
/**
- * Communicator gave us a reliability box. Check the message.
+ * Update @a pd based on the latest @a rtt and the number of bytes
+ * that were confirmed to be successfully transmitted.
*
- * @param cls a `struct CommunicatorMessageContext`
- * @param rb the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param pd[in,out] data to update
+ * @param rtt latest round-trip time
+ * @param bytes_transmitted_ok number of bytes receiver confirmed as received
*/
-static int
-check_reliability_box (void *cls, const struct TransportReliabilityBox *rb)
+static void
+update_performance_data (struct PerformanceData *pd,
+ struct GNUNET_TIME_Relative rtt,
+ uint16_t bytes_transmitted_ok)
{
- GNUNET_MQ_check_boxed_message (rb);
- return GNUNET_YES;
+ uint64_t nval = rtt.rel_value_us;
+ uint64_t oval = pd->aged_rtt.rel_value_us;
+ unsigned int age = get_age ();
+ struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
+
+ if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
+ pd->aged_rtt = rtt;
+ else
+ pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
+ update_pd_age (pd, age);
+ the->bytes_received += bytes_transmitted_ok;
}
/**
- * Communicator gave us a reliability box. Process the request.
+ * We have successfully transmitted data via @a q, update metrics.
*
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param rb the message that was received
+ * @param q queue to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
*/
static void
-handle_reliability_box (void *cls, const struct TransportReliabilityBox *rb)
+update_queue_performance (struct Queue *q,
+ struct GNUNET_TIME_Relative rtt,
+ uint16_t bytes_transmitted_ok)
{
- struct CommunicatorMessageContext *cmc = cls;
- const struct GNUNET_MessageHeader *inbox =
- (const struct GNUNET_MessageHeader *) &rb[1];
+ update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
+}
- if (0 == ntohl (rb->ack_countdown))
- {
- struct TransportReliabilityAckMessage *ack;
- /* FIXME-OPTIMIZE: implement cummulative ACKs and ack_countdown,
- then setting the avg_ack_delay field below: */
- ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct MessageUUID));
- ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
- ack->header.size = htons (sizeof (*ack) + sizeof (struct MessageUUID));
- memcpy (&ack[1], &rb->msg_uuid, sizeof (struct MessageUUID));
- route_message (&cmc->im.sender, &ack->header, RMO_DV_ALLOWED);
+/**
+ * We have successfully transmitted data via @a dvh, update metrics.
+ *
+ * @param dvh distance vector path data to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
+ */
+static void
+update_dvh_performance (struct DistanceVectorHop *dvh,
+ struct GNUNET_TIME_Relative rtt,
+ uint16_t bytes_transmitted_ok)
+{
+ update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
+}
+
+
+/**
+ * The @a pa was acknowledged, process the acknowledgement.
+ *
+ * @param pa the pending acknowledgement that was satisfied
+ * @param ack_delay artificial delay from cummulative acks created by the
+ * other peer
+ */
+static void
+handle_acknowledged (struct PendingAcknowledgement *pa,
+ struct GNUNET_TIME_Relative ack_delay)
+{
+ struct PendingMessage *pm = pa->pm;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
+ if (delay.rel_value_us > ack_delay.rel_value_us)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
+ if (NULL != pa->queue)
+ update_queue_performance (pa->queue, delay, pa->message_size);
+ if (NULL != pa->dvh)
+ update_dvh_performance (pa->dvh, delay, pa->message_size);
+ if (NULL != pm)
+ {
+ if (NULL != pm->frag_parent)
+ {
+ pm = pm->frag_parent;
+ free_fragment_tree (pa->pm);
+ }
+ while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
+ {
+ struct PendingMessage *parent = pm->frag_parent;
+
+ free_fragment_tree (pm);
+ pm = parent;
+ }
+ if (NULL != pm->head_frag)
+ pm = NULL; /* we are done, otherwise free 'pm' below */
}
- /* continue with inner message */
- demultiplex_with_cmc (cmc, inbox);
+ if (NULL != pm)
+ free_pending_message (pm);
+ free_pending_acknowledgement (pa);
+}
+
+
+/**
+ * Communicator gave us a reliability ack. Check it is well-formed.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (unused)
+ * @param ra the message that was received
+ * @return #GNUNET_Ok if @a ra is well-formed
+ */
+static int
+check_reliability_ack (void *cls,
+ const struct TransportReliabilityAckMessage *ra)
+{
+ unsigned int n_acks;
+
+ (void) cls;
+ n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+ sizeof (struct TransportCummulativeAckPayloadP);
+ if (0 == n_acks)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if ((ntohs (ra->header.size) - sizeof (*ra)) !=
+ n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
}
const struct TransportReliabilityAckMessage *ra)
{
struct CommunicatorMessageContext *cmc = cls;
- struct Neighbour *n;
+ const struct TransportCummulativeAckPayloadP *ack;
+ struct PendingAcknowledgement *pa;
unsigned int n_acks;
- const struct MessageUUID *msg_uuids;
- struct PendingMessage *nxt;
- int matched;
+ uint32_t ack_counter;
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
- if (NULL == n)
+ n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+ sizeof (struct TransportCummulativeAckPayloadP);
+ ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
+ for (unsigned int i = 0; i < n_acks; i++)
{
- struct GNUNET_SERVICE_Client *client = cmc->tc->client;
-
- GNUNET_break (0);
- finish_cmc_handling (cmc);
- GNUNET_SERVICE_client_drop (client);
- return;
- }
- n_acks =
- (ntohs (ra->header.size) - sizeof (*ra)) / sizeof (struct MessageUUID);
- msg_uuids = (const struct MessageUUID *) &ra[1];
-
- /* FIXME-OPTIMIZE: maybe use another hash map here? */
- matched = GNUNET_NO;
- for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt)
- {
- int in_list;
-
- nxt = pm->next_neighbour;
- in_list = GNUNET_NO;
- for (unsigned int i = 0; i < n_acks; i++)
+ pa =
+ GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
+ if (NULL == pa)
{
- if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid))
- continue;
- in_list = GNUNET_YES;
- break;
- }
- if (GNUNET_NO == in_list)
+ GNUNET_STATISTICS_update (
+ GST_stats,
+ "# FRAGMENT_ACKS dropped, no matching pending message",
+ 1,
+ GNUNET_NO);
continue;
-
- /* this pm was acked! */
- matched = GNUNET_YES;
- free_pending_message (pm);
-
- {
- struct GNUNET_TIME_Relative avg_ack_delay =
- GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
- // FIXME: update RTT and other reliability data!
- // ISSUE: we don't know which of n's queues the message(s)
- // took (and in fact the different messages might have gone
- // over different queues and possibly over multiple).
- // => track queues with PendingMessages, and update RTT only if
- // the queue used is unique?
- // -> how can we get loss rates?
- // -> or, add extra state to MSG and ACKs to identify queue?
- // -> if we do this, might just do the same for the avg_ack_delay!
- (void) avg_ack_delay;
}
+ handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
}
- if (GNUNET_NO == matched)
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# FRAGMENT_ACKS dropped, no matching pending message",
- 1,
- GNUNET_NO);
- }
+
+ ack_counter = htonl (ra->ack_counter);
+ (void) ack_counter; /* silence compiler warning for now */
+ // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
+ // (DV and/or Neighbour?)
finish_cmc_handling (cmc);
}
(void) cls;
if ((size - sizeof (*be)) <
- (sizeof (struct TransportBackchannelRequestPayload) +
+ (sizeof (struct TransportBackchannelRequestPayloadP) +
sizeof (struct GNUNET_MessageHeader)))
{
GNUNET_break_op (0);
1,
GNUNET_NO);
b->monotonic_time = mt;
- /* Setting body_size to 0 prevents call to #forward_backchannel_payload() */
+ /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+ */
b->body_size = 0;
return;
}
{
struct Backtalker *b;
struct GNUNET_TIME_Absolute monotime;
- struct TransportBackchannelRequestPayload ppay;
+ struct TransportBackchannelRequestPayloadP ppay;
char body[hdr_len - sizeof (ppay)];
GNUNET_assert (hdr_len >=
GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
}
-/**
- * Task run to check whether the hops of the @a cls still
- * are validated, or if we need to core about disconnection.
- *
- * @param cls a `struct DistanceVector` (with core_visible set!)
- */
-static void
-check_dv_path_down (void *cls)
-{
- struct DistanceVector *dv = cls;
- struct Neighbour *n;
-
- dv->visibility_task = NULL;
- GNUNET_assert (GNUNET_YES == dv->core_visible);
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
- {
- if (0 <
- GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us)
- {
- dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until,
- &check_dv_path_down,
- dv);
- return;
- }
- }
- /* all paths invalid, make dv core-invisible */
- dv->core_visible = GNUNET_NO;
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &dv->target);
- if ((NULL != n) && (GNUNET_YES == n->core_visible))
- return; /* no need to tell core, connection still up! */
- cores_send_disconnect_info (&dv->target);
-}
-
/**
* The @a hop is a validated path to the respective target
activate_core_visible_dv_path (struct DistanceVectorHop *hop)
{
struct DistanceVector *dv = hop->dv;
- struct Neighbour *n;
+ struct VirtualLink *vl;
- GNUNET_assert (GNUNET_NO == dv->core_visible);
- GNUNET_assert (NULL == dv->visibility_task);
-
- dv->core_visible = GNUNET_YES;
- dv->visibility_task =
- GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv);
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &dv->target);
- if ((NULL != n) && (GNUNET_YES == n->core_visible))
- return; /* no need to tell core, connection already up! */
- cores_send_connect_info (&dv->target,
- (NULL != n)
- ? GNUNET_BANDWIDTH_value_sum (n->quota_out,
- dv->quota_out)
- : dv->quota_out);
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+ if (NULL != vl)
+ {
+ /* Link was already up, remember dv is also now available and we are done */
+ vl->dv = dv;
+ return;
+ }
+ vl = GNUNET_new (struct VirtualLink);
+ vl->target = dv->target;
+ vl->dv = dv;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&dv->target);
}
* non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
*
* @param path the path we learned, path[0] should be us,
- * and then path contains a valid path from us to `path[path_len-1]`
- * path[1] should be a direct neighbour (we should check!)
+ * and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
* @param path_len number of entries on the @a path, at least three!
* @param network_latency how long does the message take from us to
* `path[path_len-1]`? set to "forever" if unknown
- * @param path_valid_until how long is this path considered validated? Maybe be
- * zero.
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
* @return #GNUNET_YES on success,
* #GNUNET_NO if we have better path(s) to the target
* #GNUNET_SYSERR if the path is useless and/or invalid
return GNUNET_SYSERR;
}
GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
- next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours, &path[1]);
+ next_hop = lookup_neighbour (&path[1]);
if (NULL == next_hop)
{
/* next hop must be a neighbour, otherwise this whole thing is useless! */
return GNUNET_SYSERR;
}
for (unsigned int i = 2; i < path_len; i++)
- if (NULL != GNUNET_CONTAINER_multipeermap_get (neighbours, &path[i]))
+ if (NULL != lookup_neighbour (&path[i]))
{
/* Useless path, we have a direct connection to some hop
in the middle of the path, so this one doesn't even
GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
- if ((GNUNET_NO == dv->core_visible) &&
- (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until)
- .rel_value_us))
+ if (0 <
+ GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
activate_core_visible_dv_path (pos);
if (last_timeout.rel_value_us <
GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
hop->path_valid_until = path_valid_until;
hop->distance = path_len - 2;
+ hop->pd.aged_rtt = network_latency;
GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
GNUNET_CONTAINER_MDLL_insert (neighbour,
next_hop->dv_head,
next_hop->dv_tail,
hop);
- if ((GNUNET_NO == dv->core_visible) &&
- (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us))
+ if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
activate_core_visible_dv_path (hop);
return GNUNET_YES;
}
* @return #GNUNET_YES if message is well-formed
*/
static int
-check_dv_learn (void *cls, const struct TransportDVLearn *dvl)
+check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
{
uint16_t size = ntohs (dvl->header.size);
uint16_t num_hops = ntohs (dvl->num_hops);
*/
static void
forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
- const struct TransportDVLearn *msg,
+ const struct TransportDVLearnMessage *msg,
uint16_t bi_history,
uint16_t nhops,
const struct DVPathEntryP *hops,
struct GNUNET_TIME_Absolute in_time)
{
struct DVPathEntryP *dhops;
- struct TransportDVLearn *fwd;
+ struct TransportDVLearnMessage *fwd;
struct GNUNET_TIME_Relative nnd;
/* compute message for forwarding */
GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
- fwd = GNUNET_malloc (sizeof (struct TransportDVLearn) +
+ fwd = GNUNET_malloc (sizeof (struct TransportDVLearnMessage) +
(nhops + 1) * sizeof (struct DVPathEntryP));
fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
- fwd->header.size = htons (sizeof (struct TransportDVLearn) +
+ fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
(nhops + 1) * sizeof (struct DVPathEntryP));
fwd->num_hops = htons (nhops + 1);
fwd->bidirectional = htons (bi_history);
.succ = *next_hop,
.challenge = msg->challenge};
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
- &dhp.purpose,
- &dhops[nhops].hop_sig));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+ &dhp.purpose,
+ &dhops[nhops].hop_sig));
+ }
+ route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+}
+
+
+/**
+ * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
+ *
+ * @param sender_monotonic_time monotonic time of the initiator
+ * @param init the signer
+ * @param challenge the challenge that was signed
+ * @param init_sig signature presumably by @a init
+ * @return #GNUNET_OK if the signature is valid
+ */
+static int
+validate_dv_initiator_signature (
+ struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
+ const struct GNUNET_PeerIdentity *init,
+ const struct ChallengeNonceP *challenge,
+ const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
+{
+ struct DvInitPS ip = {.purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
+ .purpose.size = htonl (sizeof (ip)),
+ .monotonic_time = sender_monotonic_time,
+ .challenge = *challenge};
+
+ if (
+ GNUNET_OK !=
+ GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
+ &ip.purpose,
+ init_sig,
+ &init->public_key))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Closure for #dv_neighbour_selection and #dv_neighbour_transmission.
+ */
+struct NeighbourSelectionContext
+{
+ /**
+ * Original message we received.
+ */
+ const struct TransportDVLearnMessage *dvl;
+
+ /**
+ * The hops taken.
+ */
+ const struct DVPathEntryP *hops;
+
+ /**
+ * Time we received the message.
+ */
+ struct GNUNET_TIME_Absolute in_time;
+
+ /**
+ * Offsets of the selected peers.
+ */
+ uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
+
+ /**
+ * Number of peers eligible for selection.
+ */
+ unsigned int num_eligible;
+
+ /**
+ * Number of peers that were selected for forwarding.
+ */
+ unsigned int num_selections;
+
+ /**
+ * Number of hops in @e hops
+ */
+ uint16_t nhops;
+
+ /**
+ * Bitmap of bidirectional connections encountered.
+ */
+ uint16_t bi_history;
+};
+
+
+/**
+ * Function called for each neighbour during #handle_dv_learn.
+ *
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
+ */
+static int
+dv_neighbour_selection (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct NeighbourSelectionContext *nsc = cls;
+
+ (void) value;
+ if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+ return GNUNET_YES; /* skip initiator */
+ for (unsigned int i = 0; i < nsc->nhops; i++)
+ if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+ return GNUNET_YES; /* skip peers on path */
+ nsc->num_eligible++;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function called for each neighbour during #handle_dv_learn.
+ * We call #forward_dv_learn() on the neighbour(s) selected
+ * during #dv_neighbour_selection().
+ *
+ * @param cls a `struct NeighbourSelectionContext *`
+ * @param pid identity of the peer
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_YES (always)
+ */
+static int
+dv_neighbour_transmission (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct NeighbourSelectionContext *nsc = cls;
+
+ (void) value;
+ if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
+ return GNUNET_YES; /* skip initiator */
+ for (unsigned int i = 0; i < nsc->nhops; i++)
+ if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
+ return GNUNET_YES; /* skip peers on path */
+ for (unsigned int i = 0; i < nsc->num_selections; i++)
+ {
+ if (nsc->selections[i] == nsc->num_eligible)
+ {
+ forward_dv_learn (pid,
+ nsc->dvl,
+ nsc->bi_history,
+ nsc->nhops,
+ nsc->hops,
+ nsc->in_time);
+ break;
+ }
}
- route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+ nsc->num_eligible++;
+ return GNUNET_YES;
}
/**
- * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
+ * Computes the number of neighbours we should forward a DVInit
+ * message to given that it has so far taken @a hops_taken hops
+ * though the network and that the number of neighbours we have
+ * in total is @a neighbour_count, out of which @a eligible_count
+ * are not yet on the path.
*
- * @param init the signer
- * @param challenge the challenge that was signed
- * @param init_sig signature presumably by @a init
- * @return #GNUNET_OK if the signature is valid
+ * NOTE: technically we might want to include NSE in the formula to
+ * get a better grip on the overall network size. However, for now
+ * using NSE here would create a dependency issue in the build system.
+ * => Left for later, hardcoded to 50 for now.
+ *
+ * The goal of the fomula is that we want to reach a total of LOG(NSE)
+ * peers via DV (`target_total`). We want the reach to be spread out
+ * over various distances to the origin, with a bias towards shorter
+ * distances.
+ *
+ * We make the strong assumption that the network topology looks
+ * "similar" at other hops, in particular the @a neighbour_count
+ * should be comparable at other hops.
+ *
+ * If the local neighbourhood is densely connected, we expect that @a
+ * eligible_count is close to @a neighbour_count minus @a hops_taken
+ * as a lot of the path is already known. In that case, we should
+ * forward to few(er) peers to try to find a path out of the
+ * neighbourhood. OTOH, if @a eligible_count is close to @a
+ * neighbour_count, we should forward to many peers as we are either
+ * still close to the origin (i.e. @a hops_taken is small) or because
+ * we managed to get beyond a local cluster. We express this as
+ * the `boost_factor` using the square of the fraction of eligible
+ * neighbours (so if only 50% are eligible, we boost by 1/4, but if
+ * 99% are eligible, the 'boost' will be almost 1).
+ *
+ * Second, the more hops we have taken, the larger the problem of an
+ * exponential traffic explosion gets. So we take the `target_total`,
+ * and compute our degree such that at each distance d 2^{-d} peers
+ * are selected (corrected by the `boost_factor`).
+ *
+ * @param hops_taken number of hops DVInit has travelled so far
+ * @param neighbour_count number of neighbours we have in total
+ * @param eligible_count number of neighbours we could in
+ * theory forward to
*/
-static int
-validate_dv_initiator_signature (
- const struct GNUNET_PeerIdentity *init,
- const struct ChallengeNonce *challenge,
- const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
+static unsigned int
+calculate_fork_degree (unsigned int hops_taken,
+ unsigned int neighbour_count,
+ unsigned int eligible_count)
{
- struct DvInitPS ip = {.purpose.purpose = htonl (
- GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
- .purpose.size = htonl (sizeof (ip)),
- .challenge = *challenge};
+ double target_total = 50.0; /* FIXME: use LOG(NSE)? */
+ double eligible_ratio =
+ ((double) eligible_count) / ((double) neighbour_count);
+ double boost_factor = eligible_ratio * eligible_ratio;
+ unsigned int rnd;
+ double left;
- if (
- GNUNET_OK !=
- GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
- &ip.purpose,
- init_sig,
- &init->public_key))
+ if (hops_taken >= 64)
+ return 0; /* precaution given bitshift below */
+ for (unsigned int i = 1; i < hops_taken; i++)
{
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ /* For each hop, subtract the expected number of targets
+ reached at distance d (so what remains divided by 2^d) */
+ target_total -= (target_total * boost_factor / (1LLU << i));
}
- return GNUNET_OK;
+ rnd =
+ (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
+ /* round up or down probabilistically depending on how close we were
+ when floor()ing to rnd */
+ left = target_total - (double) rnd;
+ if (UINT32_MAX * left >
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX))
+ rnd++; /* round up */
+ return rnd;
+}
+
+
+/**
+ * Function called when peerstore is done storing a DV monotonic time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param success #GNUNET_YES if peerstore was successful
+ */
+static void
+neighbour_store_dvmono_cb (void *cls, int success)
+{
+ struct Neighbour *n = cls;
+
+ n->sc = NULL;
+ if (GNUNET_YES != success)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to store other peer's monotonic time in peerstore!\n");
}
* @param dvl the message that was received
*/
static void
-handle_dv_learn (void *cls, const struct TransportDVLearn *dvl)
+handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
{
struct CommunicatorMessageContext *cmc = cls;
enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
int do_fwd;
int did_initiator;
struct GNUNET_TIME_Absolute in_time;
+ struct Neighbour *n;
nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
bi_history = ntohs (dvl->bidirectional);
/* continue communicator here, everything else can happen asynchronous! */
finish_cmc_handling (cmc);
- /* OPTIMIZE-FIXME: Technically, we only need to bother checking
- the initiator signature if we send the message back to the initiator... */
- if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator,
- &dvl->challenge,
- &dvl->init_sig))
+ n = lookup_neighbour (&dvl->initiator);
+ if (NULL != n)
{
- GNUNET_break_op (0);
- return;
+ if ((n->dv_monotime_available == GNUNET_YES) &&
+ (GNUNET_TIME_absolute_ntoh (dvl->monotonic_time).abs_value_us <
+ n->last_dv_learn_monotime.abs_value_us))
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# DV learn discarded due to time travel",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+ if (GNUNET_OK != validate_dv_initiator_signature (dvl->monotonic_time,
+ &dvl->initiator,
+ &dvl->challenge,
+ &dvl->init_sig))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ n->last_dv_learn_monotime = GNUNET_TIME_absolute_ntoh (dvl->monotonic_time);
+ if (GNUNET_YES == n->dv_monotime_available)
+ {
+ if (NULL != n->sc)
+ GNUNET_PEERSTORE_store_cancel (n->sc);
+ n->sc =
+ GNUNET_PEERSTORE_store (peerstore,
+ "transport",
+ &dvl->initiator,
+ GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+ &dvl->monotonic_time,
+ sizeof (dvl->monotonic_time),
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ &neighbour_store_dvmono_cb,
+ n);
+ }
+ }
+ /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
+ If signature verification load too high, implement random drop strategy */
+ for (unsigned int i = 0; i < nhops; i++)
+ {
+ struct DvHopPS dhp = {.purpose.purpose =
+ htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+ .purpose.size = htonl (sizeof (dhp)),
+ .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+ .succ = (nhops - 1 == i) ? GST_my_identity
+ : hops[i + 1].hop,
+ .challenge = dvl->challenge};
+
+ if (GNUNET_OK !=
+ GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
+ &dhp.purpose,
+ &hops[i].hop_sig,
+ &hops[i].hop.public_key))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
}
- // FIXME: asynchronously (!) verify hop-by-hop signatures!
- // => if signature verification load too high, implement random drop
- // strategy!?
do_fwd = GNUNET_YES;
if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
(GNUNET_NO == did_initiator)))
{
- /* FIXME: loop over all neighbours, pick those with low
- queues AND that are not yet on the path; possibly
- adapt threshold to nhops! */
-#if FIXME
- forward_dv_learn (NULL, // fill in peer from iterator here!
- dvl,
- bi_history,
- nhops,
- hops,
- in_time);
-#endif
+ /* Pick random neighbours that are not yet on the path */
+ struct NeighbourSelectionContext nsc;
+ unsigned int n_cnt;
+
+ n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
+ nsc.nhops = nhops;
+ nsc.dvl = dvl;
+ nsc.bi_history = bi_history;
+ nsc.hops = hops;
+ nsc.in_time = in_time;
+ nsc.num_eligible = 0;
+ GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+ &dv_neighbour_selection,
+ &nsc);
+ if (0 == nsc.num_eligible)
+ return; /* done here, cannot forward to anyone else */
+ nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
+ nsc.num_selections =
+ GNUNET_MIN (MAX_DV_DISCOVERY_SELECTION, nsc.num_selections);
+ for (unsigned int i = 0; i < nsc.num_selections; i++)
+ nsc.selections[i] =
+ (nsc.num_selections == n_cnt)
+ ? i /* all were selected, avoid collisions by chance */
+ : GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_cnt);
+ nsc.num_eligible = 0;
+ GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+ &dv_neighbour_transmission,
+ &nsc);
}
}
* @return #GNUNET_YES if message is well-formed
*/
static int
-check_dv_box (void *cls, const struct TransportDVBox *dvb)
+check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
{
uint16_t size = ntohs (dvb->header.size);
uint16_t num_hops = ntohs (dvb->num_hops);
const void *payload,
uint16_t payload_size)
{
- struct TransportDVBox *dvb;
- struct GNUNET_PeerIdentity *dhops;
+ struct TransportDVBoxMessage *dvb;
- GNUNET_assert (UINT16_MAX < sizeof (struct TransportDVBox) +
- sizeof (struct GNUNET_PeerIdentity) * num_hops +
- payload_size);
- dvb = GNUNET_malloc (sizeof (struct TransportDVBox) +
- sizeof (struct GNUNET_PeerIdentity) * num_hops +
+ dvb = create_dv_box (total_hops,
+ origin,
+ &hops[num_hops - 1] /* == target */,
+ num_hops - 1 /* do not count target twice */,
+ hops,
+ payload,
payload_size);
- dvb->header.size =
- htons (sizeof (struct TransportDVBox) +
- sizeof (struct GNUNET_PeerIdentity) * num_hops + payload_size);
- dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
- dvb->total_hops = htons (total_hops);
- dvb->num_hops = htons (num_hops);
- dvb->origin = *origin;
- dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
- memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
- memcpy (&dhops[num_hops], payload, payload_size);
route_message (&next_hop->pid, &dvb->header, RMO_NONE);
+ GNUNET_free (dvb);
}
* @param dvb the message that was received
*/
static void
-handle_dv_box (void *cls, const struct TransportDVBox *dvb)
+handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
{
struct CommunicatorMessageContext *cmc = cls;
uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
finish_cmc_handling (cmc);
return;
}
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &hops[i]);
+ n = lookup_neighbour (&hops[i]);
if (NULL == n)
continue;
forward_dv_box (n,
* @param tvc the message that was received
*/
static void
-handle_validation_challenge (void *cls,
- const struct TransportValidationChallenge *tvc)
+handle_validation_challenge (
+ void *cls,
+ const struct TransportValidationChallengeMessage *tvc)
{
struct CommunicatorMessageContext *cmc = cls;
- struct TransportValidationResponse *tvr;
+ struct TransportValidationResponseMessage *tvr;
if (cmc->total_hops > 0)
{
finish_cmc_handling (cmc);
return;
}
- tvr = GNUNET_new (struct TransportValidationResponse);
+ tvr = GNUNET_new (struct TransportValidationResponseMessage);
tvr->header.type =
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
tvr->header.size = htons (sizeof (*tvr));
/**
* Set to the challenge we are looking for.
*/
- const struct ChallengeNonce *challenge;
+ const struct ChallengeNonceP *challenge;
/**
* Set to a matching validation state, if one was found.
{
struct Neighbour *n;
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, pid);
+ n = lookup_neighbour (pid);
if (NULL == n)
return NULL;
for (struct Queue *pos = n->queue_head; NULL != pos;
}
-/**
- * Task run periodically to check whether the validity of the given queue has
- * run its course. If so, finds either another queue to take over, or clears
- * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
- * chance to take over, and if that fails, notifies CORE about the disconnect.
- *
- * @param cls a `struct Queue`
- */
-static void
-core_queue_visibility_check (void *cls)
-{
- struct Queue *q = cls;
-
- q->visibility_task = NULL;
- if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
- {
- q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- return;
- }
- update_neighbour_core_visibility (q->neighbour);
-}
-
-
-/**
- * Check whether the CORE visibility of @a n should change. Finds either a
- * queue to preserve the visibility, or clears the neighbour's `core_visible`
- * flag. In the latter case, gives DV routes a chance to take over, and if
- * that fails, notifies CORE about the disconnect. If so, check whether we
- * need to notify CORE.
- *
- * @param n neighbour to perform the check for
- */
-static void
-update_neighbour_core_visibility (struct Neighbour *n)
-{
- struct DistanceVector *dv;
-
- GNUNET_assert (GNUNET_YES == n->core_visible);
- /* Check if _any_ queue of this neighbour is still valid, if so, schedule
- the #core_queue_visibility_check() task for that queue */
- for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
- {
- if (0 !=
- GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
- {
- /* found a valid queue, use this one */
- q->visibility_task =
- GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- return;
- }
- }
- n->core_visible = GNUNET_NO;
-
- /* Check if _any_ DV route to this neighbour is currently
- valid, if so, do NOT tell core about the loss of direct
- connectivity (DV still counts!) */
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
- if (GNUNET_YES == dv->core_visible)
- return;
- /* Nothing works anymore, need to tell CORE about the loss of
- connectivity! */
- cores_send_disconnect_info (&n->pid);
-}
-
-
/**
* Communicator gave us a transport address validation response. Process the
* request.
* @param tvr the message that was received
*/
static void
-handle_validation_response (void *cls,
- const struct TransportValidationResponse *tvr)
+handle_validation_response (
+ void *cls,
+ const struct TransportValidationResponseMessage *tvr)
{
struct CommunicatorMessageContext *cmc = cls;
struct ValidationState *vs;
.vs = NULL};
struct GNUNET_TIME_Absolute origin_time;
struct Queue *q;
- struct DistanceVector *dv;
struct Neighbour *n;
+ struct VirtualLink *vl;
/* check this is one of our challenges */
(void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
return;
}
q->validated_until = vs->validated_until;
- q->rtt = vs->validation_rtt;
+ q->pd.aged_rtt = vs->validation_rtt;
n = q->neighbour;
- if (GNUNET_NO != n->core_visible)
- return; /* nothing changed, we are done here */
- n->core_visible = GNUNET_YES;
- q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- /* Check if _any_ DV route to this neighbour is
- currently valid, if so, do NOT tell core anything! */
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
- if ((NULL != dv) && (GNUNET_YES == dv->core_visible))
- return; /* nothing changed, done */
- /* We lacked a confirmed connection to the neighbour
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+ if (NULL != vl)
+ {
+ /* Link was already up, remember n is also now available and we are done */
+ vl->n = n;
+ return;
+ }
+ vl = GNUNET_new (struct VirtualLink);
+ vl->target = n->pid;
+ vl->n = n;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ /* We lacked a confirmed connection to the target
before, so tell CORE about it (finally!) */
- cores_send_connect_info (&n->pid,
- (NULL != dv)
- ? GNUNET_BANDWIDTH_value_sum (dv->quota_out,
- n->quota_out)
- : n->quota_out);
+ cores_send_connect_info (&n->pid);
}
struct GNUNET_MQ_MessageHandler handlers[] =
{GNUNET_MQ_hd_var_size (fragment_box,
GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
- struct TransportFragmentBox,
+ struct TransportFragmentBoxMessage,
&cmc),
- GNUNET_MQ_hd_fixed_size (fragment_ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
- struct TransportFragmentAckMessage,
- &cmc),
GNUNET_MQ_hd_var_size (reliability_box,
GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
- struct TransportReliabilityBox,
+ struct TransportReliabilityBoxMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (reliability_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+ struct TransportReliabilityAckMessage,
&cmc),
- GNUNET_MQ_hd_fixed_size (reliability_ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
- struct TransportReliabilityAckMessage,
- &cmc),
GNUNET_MQ_hd_var_size (backchannel_encapsulation,
GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
struct TransportBackchannelEncapsulationMessage,
&cmc),
GNUNET_MQ_hd_var_size (dv_learn,
GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
- struct TransportDVLearn,
+ struct TransportDVLearnMessage,
&cmc),
GNUNET_MQ_hd_var_size (dv_box,
GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
- struct TransportDVBox,
+ struct TransportDVBoxMessage,
&cmc),
GNUNET_MQ_hd_fixed_size (
validation_challenge,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
- struct TransportValidationChallenge,
+ struct TransportValidationChallengeMessage,
&cmc),
GNUNET_MQ_hd_fixed_size (
validation_response,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
- struct TransportValidationResponse,
+ struct TransportValidationResponseMessage,
&cmc),
GNUNET_MQ_handler_end ()};
int ret;
}
-/**
- * Bandwidth tracker informs us that the delay until we should receive
- * more has changed.
- *
- * @param cls a `struct Queue` for which the delay changed
- */
-static void
-tracker_update_in_cb (void *cls)
-{
- struct Queue *queue = cls;
- struct GNUNET_TIME_Relative in_delay;
- unsigned int rsize;
-
- rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
- in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, rsize);
- // FIXME: how exactly do we do inbound flow control?
-}
-
-
/**
* If necessary, generates the UUID for a @a pm
*
}
+/**
+ * Setup data structure waiting for acknowledgements.
+ *
+ * @param queue queue the @a pm will be sent over
+ * @param dvh path the message will take, may be NULL
+ * @param pm the pending message for transmission
+ * @return corresponding fresh pending acknowledgement
+ */
+static struct PendingAcknowledgement *
+prepare_pending_acknowledgement (struct Queue *queue,
+ struct DistanceVectorHop *dvh,
+ struct PendingMessage *pm)
+{
+ struct PendingAcknowledgement *pa;
+
+ pa = GNUNET_new (struct PendingAcknowledgement);
+ pa->queue = queue;
+ pa->dvh = dvh;
+ pa->pm = pm;
+ do
+ {
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &pa->ack_uuid,
+ sizeof (pa->ack_uuid));
+ } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
+ pending_acks,
+ &pa->ack_uuid.value,
+ pa,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
+ GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
+ if (NULL != dvh)
+ GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
+ pa->transmission_time = GNUNET_TIME_absolute_get ();
+ pa->message_size = pm->bytes_msg;
+ return pa;
+}
+
+
/**
* Fragment the given @a pm to the given @a mtu. Adds
* additional fragments to the neighbour as well. If the
* @a mtu is too small, generates and error for the @a pm
* and returns NULL.
*
+ * @param queue which queue to fragment for
+ * @param dvh path the message will take, or NULL
* @param pm pending message to fragment for transmission
- * @param mtu MTU to apply
* @return new message to transmit
*/
static struct PendingMessage *
-fragment_message (struct PendingMessage *pm, uint16_t mtu)
+fragment_message (struct Queue *queue,
+ struct DistanceVectorHop *dvh,
+ struct PendingMessage *pm)
{
+ struct PendingAcknowledgement *pa;
struct PendingMessage *ff;
+ uint16_t mtu;
+ pa = prepare_pending_acknowledgement (queue, dvh, pm);
+ mtu = (0 == queue->mtu)
+ ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
+ : queue->mtu;
set_pending_message_uuid (pm);
/* This invariant is established in #handle_add_queue_message() */
- GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
+ GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
/* select fragment for transmission, descending the tree if it has
- been expanded until we are at a leaf or at a fragment that is small enough
+ been expanded until we are at a leaf or at a fragment that is small
+ enough
*/
ff = pm;
while (((ff->bytes_msg > mtu) || (pm == ff)) &&
{
/* Did not yet calculate all fragments, calculate next fragment */
struct PendingMessage *frag;
- struct TransportFragmentBox tfb;
+ struct TransportFragmentBoxMessage tfb;
const char *orig;
char *msg;
uint16_t fragmax;
msize = ff->bytes_msg;
if (pm != ff)
{
- const struct TransportFragmentBox *tfbo;
+ const struct TransportFragmentBoxMessage *tfbo;
- tfbo = (const struct TransportFragmentBox *) orig;
- orig += sizeof (struct TransportFragmentBox);
- msize -= sizeof (struct TransportFragmentBox);
+ tfbo = (const struct TransportFragmentBoxMessage *) orig;
+ orig += sizeof (struct TransportFragmentBoxMessage);
+ msize -= sizeof (struct TransportFragmentBoxMessage);
xoff = ntohs (tfbo->frag_off);
}
- fragmax = mtu - sizeof (struct TransportFragmentBox);
+ fragmax = mtu - sizeof (struct TransportFragmentBoxMessage);
fragsize = GNUNET_MIN (msize - ff->frag_off, fragmax);
- frag = GNUNET_malloc (sizeof (struct PendingMessage) +
- sizeof (struct TransportFragmentBox) + fragsize);
+ frag =
+ GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct TransportFragmentBoxMessage) + fragsize);
frag->target = pm->target;
frag->frag_parent = ff;
frag->timeout = pm->timeout;
- frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
+ frag->bytes_msg = sizeof (struct TransportFragmentBoxMessage) + fragsize;
frag->pmt = PMT_FRAGMENT_BOX;
msg = (char *) &frag[1];
tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
- tfb.header.size = htons (sizeof (struct TransportFragmentBox) + fragsize);
- tfb.frag_uuid.uuid = htonl (pm->frag_uuidgen++);
+ tfb.header.size =
+ htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
+ tfb.ack_uuid = pa->ack_uuid;
tfb.msg_uuid = pm->msg_uuid;
tfb.frag_off = htons (ff->frag_off + xoff);
tfb.msg_size = htons (pm->bytes_msg);
* @a pm). If the @a pm is already fragmented or reliability boxed,
* or itself an ACK, this function simply returns @a pm.
*
+ * @param queue which queue to prepare transmission for
+ * @param dvh path the message will take, or NULL
* @param pm pending message to box for transmission over unreliabile queue
* @return new message to transmit
*/
static struct PendingMessage *
-reliability_box_message (struct PendingMessage *pm)
+reliability_box_message (struct Queue *queue,
+ struct DistanceVectorHop *dvh,
+ struct PendingMessage *pm)
{
- struct TransportReliabilityBox rbox;
+ struct TransportReliabilityBoxMessage rbox;
+ struct PendingAcknowledgement *pa;
struct PendingMessage *bpm;
char *msg;
{
/* failed hard */
GNUNET_break (0);
- client_send_response (pm, GNUNET_NO, 0);
+ client_send_response (pm);
return NULL;
}
+ pa = prepare_pending_acknowledgement (queue, dvh, pm);
+
bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
pm->bytes_msg);
bpm->target = pm->target;
rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
- rbox.msg_uuid = pm->msg_uuid;
+
+ rbox.ack_uuid = pa->ack_uuid;
msg = (char *) &bpm[1];
memcpy (msg, &rbox, sizeof (rbox));
memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
/**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
+ * We believe we are ready to transmit a message on a queue.
+ * Gives the message to the
* communicator for transmission (updating the tracker, and re-scheduling
* itself if applicable).
*
return; /* do it later */
overhead = 0;
if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- overhead += sizeof (struct TransportReliabilityBox);
+ overhead += sizeof (struct TransportReliabilityBoxMessage);
s = pm;
if ( ( (0 != queue->mtu) &&
(pm->bytes_msg + overhead > queue->mtu) ) ||
(NULL != pm->head_frag /* fragments already exist, should
respect that even if MTU is 0 for
this queue */) )
- s = fragment_message (s,
- (0 == queue->mtu)
- ? UINT16_MAX -
- sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
- : queue->mtu);
+ s = fragment_message (queue, pm->dvh, s);
if (NULL == s)
{
/* Fragmentation failed, try next message... */
return;
}
if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- s = reliability_box_message (s);
+ // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
+ s = reliability_box_message (queue, pm->dvh, s);
if (NULL == s)
{
/* Reliability boxing failed, try next message... */
(GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
{
/* Full message sent, and over reliabile channel */
- client_send_response (pm, GNUNET_YES, pm->bytes_msg);
+ client_send_response (pm);
}
else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
queue->tc->details.communicator.cc) &&
/* Was this the last applicable fragmment? */
if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
- client_send_response (
- pm,
- GNUNET_YES,
- pm->bytes_msg /* FIXME: calculate and add overheads! */);
+ client_send_response (pm);
}
else if (PMT_CORE != pm->pmt)
{
message urgency and size when delaying ACKs, etc.) */
update_pm_next_attempt (s,
GNUNET_TIME_relative_to_absolute (
- GNUNET_TIME_relative_multiply (queue->rtt, 4)));
+ GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
+ 4)));
}
/* finally, re-schedule queue transmission task itself */
}
-/**
- * Bandwidth tracker informs us that the delay until we
- * can transmit again changed.
- *
- * @param cls a `struct Queue` for which the delay changed
- */
-static void
-tracker_update_out_cb (void *cls)
-{
- struct Queue *queue = cls;
- struct Neighbour *n = queue->neighbour;
-
- if (NULL == n->pending_msg_head)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Bandwidth allocation updated for empty transmission queue `%s'\n",
- queue->address);
- return; /* no message pending, nothing to do here! */
- }
- GNUNET_SCHEDULER_cancel (queue->transmit_task);
- queue->transmit_task = NULL;
- schedule_transmit_on_queue (queue, GNUNET_NO);
-}
-
-
-/**
- * Bandwidth tracker informs us that excessive outbound bandwidth was
- * allocated which is not being used.
- *
- * @param cls a `struct Queue` for which the excess was noted
- */
-static void
-tracker_excess_out_cb (void *cls)
-{
- (void) cls;
-
- /* FIXME: trigger excess bandwidth report to core? Right now,
- this is done internally within transport_api2_core already,
- but we probably want to change the logic and trigger it
- from here via a message instead! */
- /* TODO: maybe inform someone at this point? */
- GNUNET_STATISTICS_update (GST_stats,
- "# Excess outbound bandwidth reported",
- 1,
- GNUNET_NO);
-}
-
-
-/**
- * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
- * which is not being used.
- *
- * @param cls a `struct Queue` for which the excess was noted
- */
-static void
-tracker_excess_in_cb (void *cls)
-{
- (void) cls;
-
- /* TODO: maybe inform somone at this point? */
- GNUNET_STATISTICS_update (GST_stats,
- "# Excess inbound bandwidth reported",
- 1,
- GNUNET_NO);
-}
-
-
/**
* Queue to a peer went down. Process the request.
*
for (struct Queue *q = neighbour->queue_head; NULL != q;
q = q->next_neighbour)
{
- struct MonitorEvent me = {.rtt = q->rtt,
+ struct MonitorEvent me = {.rtt = q->pd.aged_rtt,
.cs = q->cs,
.num_msg_pending = q->num_msg_pending,
.num_bytes_pending = q->num_bytes_pending};
static void
validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs)
{
- struct TransportValidationChallenge tvc;
+ struct TransportValidationChallengeMessage tvc;
vs->last_challenge_use = GNUNET_TIME_absolute_get ();
tvc.header.type =
ctx->q = q;
/* OPTIMIZE-FIXME: in the future, add reliability / goodput
statistics and consider those as well here? */
- if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
+ if (q->pd.aged_rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
do_inc = GNUNET_YES;
}
if (GNUNET_YES == do_inc)
{
struct LearnLaunchEntry *lle;
struct QueueQualityContext qqc;
- struct TransportDVLearn dvl;
+ struct TransportDVLearnMessage dvl;
(void) cls;
dvlearn_task = NULL;
dvl.num_hops = htons (0);
dvl.bidirectional = htons (0);
dvl.non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+ dvl.monotonic_time =
+ GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
{
struct DvInitPS dvip = {.purpose.purpose = htonl (
GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
.purpose.size = htonl (sizeof (dvip)),
+ .monotonic_time = dvl.monotonic_time,
.challenge = lle->challenge};
GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
}
+/**
+ * Function called with the monotonic time of a DV initiator
+ * by PEERSTORE. Updates the time.
+ *
+ * @param cls a `struct Neighbour`
+ * @param record the information found, NULL for the last call
+ * @param emsg error message
+ */
+static void
+neighbour_dv_monotime_cb (void *cls,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
+{
+ struct Neighbour *n = cls;
+ struct GNUNET_TIME_AbsoluteNBO *mtbe;
+
+ (void) emsg;
+ if (NULL == record)
+ {
+ /* we're done with #neighbour_dv_monotime_cb() invocations,
+ continue normal processing */
+ n->get = NULL;
+ n->dv_monotime_available = GNUNET_YES;
+ return;
+ }
+ if (sizeof (*mtbe) != record->value_size)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ mtbe = record->value;
+ n->last_dv_learn_monotime =
+ GNUNET_TIME_absolute_max (n->last_dv_learn_monotime,
+ GNUNET_TIME_absolute_ntoh (*mtbe));
+}
+
+
/**
* New queue became available. Process the request.
*
const char *addr;
uint16_t addr_len;
- if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
+ if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBoxMessage))
{
/* MTU so small as to be useless for transmissions,
required for #fragment_message()! */
if (NULL == neighbour)
{
neighbour = GNUNET_new (struct Neighbour);
- neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
neighbour->pid = aqm->receiver;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_put (
&neighbour->pid,
neighbour,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ neighbour->get =
+ GNUNET_PEERSTORE_iterate (peerstore,
+ "transport",
+ &neighbour->pid,
+ GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+ &neighbour_dv_monotime_cb,
+ neighbour);
}
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];
queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
queue->tc = tc;
queue->address = (const char *) &queue[1];
- queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+ queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
queue->qid = aqm->qid;
queue->mtu = ntohl (aqm->mtu);
queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
queue->neighbour = neighbour;
- GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
- &tracker_update_in_cb,
- queue,
- GNUNET_BANDWIDTH_ZERO,
- GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
- &tracker_excess_in_cb,
- queue);
- GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
- &tracker_update_out_cb,
- queue,
- GNUNET_BANDWIDTH_ZERO,
- GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
- &tracker_excess_out_cb,
- queue);
memcpy (&queue[1], addr, addr_len);
/* notify monitors about new queue */
{
- struct MonitorEvent me = {.rtt = queue->rtt, .cs = queue->cs};
+ struct MonitorEvent me = {.rtt = queue->pd.aged_rtt, .cs = queue->cs};
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
}
/**
- * Communicator tells us that our request to create a queue failed. This usually
- * indicates that the provided address is simply invalid or that the
+ * Communicator tells us that our request to create a queue failed. This
+ * usually indicates that the provided address is simply invalid or that the
* communicator's resources are exhausted.
*
* @param cls the `struct TransportClient`
(void) cls;
// OPTIMIZE-FIXME: checking that we know this address already should
// be done BEFORE checking the signature => HELLO API change!
- // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / validation?!
+ // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
+ // validation?!
address =
GNUNET_HELLO_extract_address (&hdr[1],
ntohs (hdr->header.size) - sizeof (*hdr),
}
+/**
+ * Free pending acknowledgement.
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct PendingAcknowledgement`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_pending_ack_cb (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct PendingAcknowledgement *pa = value;
+
+ (void) cls;
+ (void) key;
+ free_pending_acknowledgement (pa);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Free acknowledgement cummulator.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct AcknowledgementCummulator`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_ack_cummulator_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct AcknowledgementCummulator *ac = value;
+
+ (void) cls;
+ (void) pid;
+ GNUNET_free (ac);
+ return GNUNET_OK;
+}
+
+
/**
* Function called when the service shuts down. Unloads our plugins
* and cancels pending validations.
GNUNET_free (GST_my_private_key);
GST_my_private_key = NULL;
}
+ GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators,
+ &free_ack_cummulator_cb,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
+ ack_cummulators = NULL;
+ GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
+ &free_pending_ack_cb,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+ pending_acks = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
neighbours = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
+ GNUNET_CONTAINER_multipeermap_destroy (links);
+ links = NULL;
GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
&free_backtalker_cb,
NULL);
/* setup globals */
GST_cfg = c;
backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+ pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+ ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
+ links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
ephemeral_heap =
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
struct OutboundMessage,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_recv_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
+ struct RecvOkMessage,
+ NULL),
/* communication with communicators */
GNUNET_MQ_hd_var_size (communicator_available,
GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,