2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file transport/gnunet-service-tng.c
22 * @brief main for gnunet-service-tng
23 * @author Christian Grothoff
26 * - figure out how to transmit (selective) ACKs in case of uni-directional
27 * communicators (with/without core? DV-only?) When do we use ACKs?
28 * => communicators use selective ACKs for flow control
29 * => transport uses message-level ACKs for RTT, fragment confirmation
30 * => integrate DV into transport, use neither core nor communicators
31 * but rather give communicators transport-encapsulated messages
32 * (which could be core-data, background-channel traffic, or
33 * transport-to-transport traffic)
36 * - address validation: what is our plan here?
37 * #1 Peerstore only gets 'validated' addresses
38 * #2 transport needs another API to "trigger" validation!
39 * API may be used by core/application or communicators;
40 * => use yet another lib/MQ/connection?
41 * #3 transport should use validation to also establish
42 * effective flow control (for uni-directional transports!)
43 * #4 UDP broadcasting logic must be extended to use the new API
44 * #5 only validated addresses go to ATS for scheduling; that
45 * also ensures we know the RTT
46 * #6 to ensure flow control and RTT are OK, we always do the
47 * 'validation', even if address comes from PEERSTORE
49 * - ACK handling / retransmission
50 * - address verification
51 * - track RTT, distance, loss, etc.
52 * - DV data structures:
56 * - routing of messages (using DV data structures!)
57 * - handling of DV-boxed messages that need to be forwarded
58 * - backchannel message encryption & decryption
62 * - use ATS bandwidth allocation callback and schedule transmissions!
65 * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
68 * - change transport-core API to provide proper flow control in both
69 * directions, allow multiple messages per peer simultaneously (tag
70 * confirmations with unique message ID), and replace quota-out with
71 * proper flow control;
72 * - if messages are below MTU, consider adding ACKs and other stuff
73 * (requires planning at receiver, and additional MST-style demultiplex
75 * - could avoid copying body of message into each fragment and keep
76 * fragments as just pointers into the original message and only
77 * fully build fragments just before transmission (optimization, should
78 * reduce CPU and memory use)
80 * Design realizations / discussion:
81 * - communicators do flow control by calling MQ "notify sent"
82 * when 'ready'. They determine flow implicitly (i.e. TCP blocking)
83 * or explicitly via background channel FC ACKs. As long as the
84 * channel is not full, they may 'notify sent' even if the other
85 * peer has not yet confirmed receipt. The other peer confirming
86 * is _only_ for FC, not for more reliable transmission; reliable
87 * transmission (i.e. of fragments) is left to _transport_.
88 * - ACKs sent back in uni-directional communicators are done via
89 * the background channel API; here transport _may_ initially
90 * broadcast (with bounded # hops) if no path is known;
91 * - transport should _integrate_ DV-routing and build a view of
92 * the network; then background channel traffic can be
93 * routed via DV as well as explicit "DV" traffic.
94 * - background channel is also used for ACKs and NAT traversal support
95 * - transport service is responsible for AEAD'ing the background
96 * channel, timestamps and monotonic time are used against replay
97 * of old messages -> peerstore needs to be supplied with
98 * "latest timestamps seen" data
99 * - if transport implements DV, we likely need a 3rd peermap
100 * in addition to ephemerals and (direct) neighbours
101 * => in this data structure, we should track ATS metrics (distance, RTT, etc.)
102 * as well as latest timestamps seen, goodput, fragments for transmission, etc.
103 * ==> check if stuff needs to be moved out of "Neighbour"
104 * - transport should encapsualte core-level messages and do its
105 * own ACKing for RTT/goodput/loss measurements _and_ fragment
108 #include "platform.h"
109 #include "gnunet_util_lib.h"
110 #include "gnunet_statistics_service.h"
111 #include "gnunet_transport_monitor_service.h"
112 #include "gnunet_peerstore_service.h"
113 #include "gnunet_hello_lib.h"
114 #include "gnunet_ats_transport_service.h"
115 #include "gnunet_signatures.h"
116 #include "transport.h"
120 * What is the size we assume for a read operation in the
121 * absence of an MTU for the purpose of flow control?
123 #define IN_PACKET_SIZE_WITHOUT_MTU 128
126 * If a queue delays the next message by more than this number
127 * of seconds we log a warning. Note: this is for testing,
128 * the value chosen here might be too aggressively low!
130 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
133 * How long are ephemeral keys valid?
135 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
138 * How long do we keep partially reassembled messages around before giving up?
140 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
143 * How many messages can we have pending for a given communicator
144 * process before we start to throttle that communicator?
146 * Used if a communicator might be CPU-bound and cannot handle the traffic.
148 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
151 * How many messages can we have pending for a given session (queue to
152 * a particular peer via a communicator) process before we start to
153 * throttle that queue?
155 * Used if ATS assigns more bandwidth to a particular transmission
156 * method than that transmission method can right now handle. (Yes,
157 * ATS should eventually notice utilization below allocation and
158 * adjust, but we don't want to queue up tons of messages in the
159 * meantime). Must be significantly below
160 * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
162 #define SESSION_QUEUE_LIMIT 32
165 GNUNET_NETWORK_STRUCT_BEGIN
168 * Outer layer of an encapsulated backchannel message.
170 struct TransportBackchannelEncapsulationMessage
173 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
175 struct GNUNET_MessageHeader header;
178 * Distance the backchannel message has traveled, to be updated at
179 * each hop. Used to bound the number of hops in case a backchannel
180 * message is broadcast and thus travels without routing
181 * information (during initial backchannel discovery).
186 * Target's peer identity (as backchannels may be transmitted
187 * indirectly, or even be broadcast).
189 struct GNUNET_PeerIdentity target;
192 * Ephemeral key setup by the sender for @e target, used
193 * to encrypt the payload.
195 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
197 // FIXME: probably should add random IV here as well,
198 // especially if we re-use ephemeral keys!
201 * HMAC over the ciphertext of the encrypted, variable-size
202 * body that follows. Verified via DH of @e target and
205 struct GNUNET_HashCode hmac;
207 /* Followed by encrypted, variable-size payload */
212 * Body by which a peer confirms that it is using an ephemeral key.
214 struct EphemeralConfirmation
218 * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
220 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
223 * How long is this signature over the ephemeral key valid?
224 * Note that the receiver MUST IGNORE the absolute time, and
225 * only interpret the value as a mononic time and reject
226 * "older" values than the last one observed. Even with this,
227 * there is no real guarantee against replay achieved here,
228 * as the latest timestamp is not persisted. This is
229 * necessary as we do not want to require synchronized
230 * clocks and may not have a bidirectional communication
231 * channel. Communicators must protect against replay
232 * attacks when using backchannel communication!
234 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
237 * Target's peer identity.
239 struct GNUNET_PeerIdentity target;
242 * Ephemeral key setup by the sender for @e target, used
243 * to encrypt the payload.
245 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
251 * Plaintext of the variable-size payload that is encrypted
252 * within a `struct TransportBackchannelEncapsulationMessage`
254 struct TransportBackchannelRequestPayload
258 * Sender's peer identity.
260 struct GNUNET_PeerIdentity sender;
263 * Signature of the sender over an
264 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
266 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
269 * How long is this signature over the ephemeral key
272 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
275 * Current monotonic time of the sending transport service. Used to
276 * detect replayed messages. Note that the receiver should remember
277 * a list of the recently seen timestamps and only reject messages
278 * if the timestamp is in the list, or the list is "full" and the
279 * timestamp is smaller than the lowest in the list. This list of
280 * timestamps per peer should be persisted to guard against replays
283 struct GNUNET_TIME_AbsoluteNBO monotonic_time;
285 /* Followed by a `struct GNUNET_MessageHeader` with a message
286 for a communicator */
288 /* Followed by a 0-termianted string specifying the name of
289 the communicator which is to receive the message */
295 * Outer layer of an encapsulated unfragmented application message sent
296 * over an unreliable channel.
298 struct TransportReliabilityBox
301 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
303 struct GNUNET_MessageHeader header;
306 * Number of messages still to be sent before a commulative
307 * ACK is requested. Zero if an ACK is requested immediately.
308 * In NBO. Note that the receiver may send the ACK faster
309 * if it believes that is reasonable.
311 uint32_t ack_countdown GNUNET_PACKED;
314 * Unique ID of the message used for signalling receipt of
315 * messages sent over possibly unreliable channels. Should
318 struct GNUNET_ShortHashCode msg_uuid;
323 * Confirmation that the receiver got a
324 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
325 * confirmation may be transmitted over a completely different queue,
326 * so ACKs are identified by a combination of PID of sender and
327 * message UUID, without the queue playing any role!
329 struct TransportReliabilityAckMessage
332 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
334 struct GNUNET_MessageHeader header;
339 uint32_t reserved GNUNET_PACKED;
342 * How long was the ACK delayed relative to the average time of
343 * receipt of the messages being acknowledged? Used to calculate
344 * the average RTT by taking the receipt time of the ack minus the
345 * average transmission time of the sender minus this value.
347 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
349 /* followed by any number of `struct GNUNET_ShortHashCode`
350 messages providing ACKs */
355 * Outer layer of an encapsulated fragmented application message.
357 struct TransportFragmentBox
360 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
362 struct GNUNET_MessageHeader header;
365 * Unique ID of this fragment (and fragment transmission!). Will
366 * change even if a fragement is retransmitted to make each
367 * transmission attempt unique! Should be incremented by one for
368 * each fragment transmission. If a client receives a duplicate
369 * fragment (same @e frag_off), it must send
370 * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
372 uint32_t frag_uuid GNUNET_PACKED;
375 * Original message ID for of the message that all the1
376 * fragments belong to. Must be the same for all fragments.
378 struct GNUNET_ShortHashCode msg_uuid;
381 * Offset of this fragment in the overall message.
383 uint16_t frag_off GNUNET_PACKED;
386 * Total size of the message that is being fragmented.
388 uint16_t msg_size GNUNET_PACKED;
394 * Outer layer of an fragmented application message sent over a queue
395 * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
396 * received, the receiver has two RTTs or 64 further fragments with
397 * the same basic message time to send an acknowledgement, possibly
398 * acknowledging up to 65 fragments in one ACK. ACKs must also be
399 * sent immediately once all fragments were sent.
401 struct TransportFragmentAckMessage
404 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
406 struct GNUNET_MessageHeader header;
409 * Unique ID of the lowest fragment UUID being acknowledged.
411 uint32_t frag_uuid GNUNET_PACKED;
414 * Bitfield of up to 64 additional fragments following the
415 * @e msg_uuid being acknowledged by this message.
417 uint64_t extra_acks GNUNET_PACKED;
420 * Original message ID for of the message that all the
421 * fragments belong to.
423 struct GNUNET_ShortHashCode msg_uuid;
426 * How long was the ACK delayed relative to the average time of
427 * receipt of the fragments being acknowledged? Used to calculate
428 * the average RTT by taking the receipt time of the ack minus the
429 * average transmission time of the sender minus this value.
431 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
434 * How long until the receiver will stop trying reassembly
437 struct GNUNET_TIME_RelativeNBO reassembly_timeout;
442 * Internal message used by transport for distance vector learning.
443 * If @e num_hops does not exceed the threshold, peers should append
444 * themselves to the peer list and flood the message (possibly only
445 * to a subset of their neighbours to limit discoverability of the
446 * network topology). To the extend that the @e bidirectional bits
447 * are set, peers may learn the inverse paths even if they did not
450 * Unless received on a bidirectional queue and @e num_hops just
451 * zero, peers that can forward to the initator should always try to
452 * forward to the initiator.
454 struct TransportDVLearn
457 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
459 struct GNUNET_MessageHeader header;
462 * Number of hops this messages has travelled, in NBO. Zero if
465 uint16_t num_hops GNUNET_PACKED;
468 * Bitmask of the last 16 hops indicating whether they are confirmed
469 * available (without DV) in both directions or not, in NBO. Used
470 * to possibly instantly learn a path in both directions. Each peer
471 * should shift this value by one to the left, and then set the
472 * lowest bit IF the current sender can be reached from it (without
475 uint16_t bidirectional GNUNET_PACKED;
478 * Peers receiving this message and delaying forwarding to other
479 * peers for any reason should increment this value such as to
480 * enable the origin to determine the actual network-only delay
481 * in addition to the real-time delay (assuming the message loops
482 * back to the origin).
484 struct GNUNET_TIME_Relative cummulative_non_network_delay;
487 * Identity of the peer that started this learning activity.
489 struct GNUNET_PeerIdentity initiator;
491 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
492 excluding the initiator of the DV trace; the last entry is the
493 current sender; the current peer must not be included. */
499 * Outer layer of an encapsulated message send over multiple hops.
500 * The path given only includes the identities of the subsequent
501 * peers, i.e. it will be empty if we are the receiver. Each
502 * forwarding peer should scan the list from the end, and if it can,
503 * forward to the respective peer. The list should then be shortened
504 * by all the entries up to and including that peer. Each hop should
505 * also increment @e total_hops to allow the receiver to get a precise
506 * estimate on the number of hops the message travelled. Senders must
507 * provide a learned path that thus should work, but intermediaries
508 * know of a shortcut, they are allowed to send the message via that
511 * If a peer finds itself still on the list, it must drop the message.
513 struct TransportDVBox
516 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
518 struct GNUNET_MessageHeader header;
521 * Number of total hops this messages travelled. In NBO.
522 * @e origin sets this to zero, to be incremented at
525 uint16_t total_hops GNUNET_PACKED;
528 * Number of hops this messages includes. In NBO.
530 uint16_t num_hops GNUNET_PACKED;
533 * Identity of the peer that originated the message.
535 struct GNUNET_PeerIdentity origin;
537 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
538 excluding the @e origin and the current peer, the last must be
539 the ultimate target; if @e num_hops is zero, the receiver of this
540 message is the ultimate target. */
542 /* Followed by the actual message, which itself may be
543 another box, but not a DV_LEARN or DV_BOX message! */
547 GNUNET_NETWORK_STRUCT_END
552 * What type of client is the `struct TransportClient` about?
557 * We do not know yet (client is fresh).
562 * Is the CORE service, we need to forward traffic to it.
567 * It is a monitor, forward monitor data.
572 * It is a communicator, use for communication.
579 * Entry in our cache of ephemeral keys we currently use.
580 * This way, we only sign an ephemeral once per @e target,
581 * and then can re-use it over multiple
582 * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
583 * messages (as signing is expensive).
585 struct EphemeralCacheEntry
589 * Target's peer identity (we don't re-use ephemerals
590 * to limit linkability of messages).
592 struct GNUNET_PeerIdentity target;
595 * Signature affirming @e ephemeral_key of type
596 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
598 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
601 * How long is @e sender_sig valid
603 struct GNUNET_TIME_Absolute ephemeral_validity;
608 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
611 * Our private ephemeral key.
613 struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
616 * Node in the ephemeral cache for this entry.
617 * Used for expiration.
619 struct GNUNET_CONTAINER_HeapNode *hn;
624 * Client connected to the transport service.
626 struct TransportClient;
630 * A neighbour that at least one communicator is connected to.
636 * Entry in our #dv_routes table, representing a (set of) distance
637 * vector routes to a particular peer.
639 struct DistanceVector;
642 * One possible hop towards a DV target.
644 struct DistanceVectorHop
648 * Kept in a MDLL, sorted by @e timeout.
650 struct DistanceVectorHop *next_dv;
653 * Kept in a MDLL, sorted by @e timeout.
655 struct DistanceVectorHop *prev_dv;
660 struct DistanceVectorHop *next_neighbour;
665 struct DistanceVectorHop *prev_neighbour;
668 * What would be the next hop to @e target?
670 struct Neighbour *next_hop;
673 * Distance vector entry this hop belongs with.
675 struct DistanceVector *dv;
678 * Array of @e distance hops to the target, excluding @e next_hop.
679 * NULL if the entire path is us to @e next_hop to `target`. Allocated
680 * at the end of this struct.
682 const struct GNUNET_PeerIdentity *path;
685 * At what time do we forget about this path unless we see it again
688 struct GNUNET_TIME_Absolute timeout;
691 * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
692 * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
694 unsigned int distance;
699 * Entry in our #dv_routes table, representing a (set of) distance
700 * vector routes to a particular peer.
702 struct DistanceVector
706 * To which peer is this a route?
708 struct GNUNET_PeerIdentity target;
711 * Known paths to @e target.
713 struct DistanceVectorHop *dv_head;
716 * Known paths to @e target.
718 struct DistanceVectorHop *dv_tail;
721 * Task scheduled to purge expired paths from @e dv_head MDLL.
723 struct GNUNET_SCHEDULER_Task *timeout_task;
728 * Entry identifying transmission in one of our `struct
729 * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
730 * ensure we do not overwhelm a communicator and limit the number of
731 * messages outstanding per communicator (say in case communicator is
732 * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
733 * what the communicator can actually provide towards a particular
742 struct QueueEntry *next;
747 struct QueueEntry *prev;
750 * ATS session this entry is queued with.
752 struct GNUNET_ATS_Session *session;
755 * Message ID used for this message with the queue used for transmission.
762 * An ATS session is a message queue provided by a communicator
763 * via which we can reach a particular neighbour.
765 struct GNUNET_ATS_Session
770 struct GNUNET_ATS_Session *next_neighbour;
775 struct GNUNET_ATS_Session *prev_neighbour;
780 struct GNUNET_ATS_Session *prev_client;
785 struct GNUNET_ATS_Session *next_client;
788 * Head of DLL of unacked transmission requests.
790 struct QueueEntry *queue_head;
793 * End of DLL of unacked transmission requests.
795 struct QueueEntry *queue_tail;
798 * Which neighbour is this ATS session for?
800 struct Neighbour *neighbour;
803 * Which communicator offers this ATS session?
805 struct TransportClient *tc;
808 * Address served by the ATS session.
813 * Handle by which we inform ATS about this queue.
815 struct GNUNET_ATS_SessionRecord *sr;
818 * Task scheduled for the time when this queue can (likely) transmit the
819 * next message. Still needs to check with the @e tracker_out to be sure.
821 struct GNUNET_SCHEDULER_Task *transmit_task;
824 * Our current RTT estimate for this ATS session.
826 struct GNUNET_TIME_Relative rtt;
829 * Message ID generator for transmissions on this queue.
834 * Unique identifier of this ATS session with the communicator.
839 * Maximum transmission unit supported by this ATS session.
844 * Distance to the target of this ATS session.
851 uint32_t num_msg_pending;
856 uint32_t num_bytes_pending;
859 * Length of the DLL starting at @e queue_head.
861 unsigned int queue_length;
864 * Network type offered by this ATS session.
866 enum GNUNET_NetworkType nt;
869 * Connection status for this ATS session.
871 enum GNUNET_TRANSPORT_ConnectionStatus cs;
874 * How much outbound bandwidth do we have available for this session?
876 struct GNUNET_BANDWIDTH_Tracker tracker_out;
879 * How much inbound bandwidth do we have available for this session?
881 struct GNUNET_BANDWIDTH_Tracker tracker_in;
886 * Information we keep for a message that we are reassembling.
888 struct ReassemblyContext
892 * Original message ID for of the message that all the
893 * fragments belong to.
895 struct GNUNET_ShortHashCode msg_uuid;
898 * Which neighbour is this context for?
900 struct Neighbour *neighbour;
903 * Entry in the reassembly heap (sorted by expiration).
905 struct GNUNET_CONTAINER_HeapNode *hn;
908 * Bitfield with @e msg_size bits representing the positions
909 * where we have received fragments. When we receive a fragment,
910 * we check the bits in @e bitfield before incrementing @e msg_missing.
912 * Allocated after the reassembled message.
917 * Task for sending ACK. We may send ACKs either because of hitting
918 * the @e extra_acks limit, or based on time and @e num_acks. This
919 * task is for the latter case.
921 struct GNUNET_SCHEDULER_Task *ack_task;
924 * At what time will we give up reassembly of this message?
926 struct GNUNET_TIME_Absolute reassembly_timeout;
929 * Average delay of all acks in @e extra_acks and @e frag_uuid.
930 * Should be reset to zero when @e num_acks is set to 0.
932 struct GNUNET_TIME_Relative avg_ack_delay;
935 * Time we received the last fragment. @e avg_ack_delay must be
936 * incremented by now - @e last_frag multiplied by @e num_acks.
938 struct GNUNET_TIME_Absolute last_frag;
941 * Bitfield of up to 64 additional fragments following @e frag_uuid
942 * to be acknowledged in the next cummulative ACK.
947 * Unique ID of the lowest fragment UUID to be acknowledged in the
948 * next cummulative ACK. Only valid if @e num_acks > 0.
953 * Number of ACKs we have accumulated so far. Reset to 0
954 * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
956 unsigned int num_acks;
959 * How big is the message we are reassembling in total?
964 * How many bytes of the message are still missing? Defragmentation
965 * is complete when @e msg_missing == 0.
967 uint16_t msg_missing;
969 /* Followed by @e msg_size bytes of the (partially) defragmented original message */
971 /* Followed by @e bitfield data */
976 * A neighbour that at least one communicator is connected to.
982 * Which peer is this about?
984 struct GNUNET_PeerIdentity pid;
987 * Map with `struct ReassemblyContext` structs for fragments under
988 * reassembly. May be NULL if we currently have no fragments from
989 * this @e pid (lazy initialization).
991 struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
994 * Heap with `struct ReassemblyContext` structs for fragments under
995 * reassembly. May be NULL if we currently have no fragments from
996 * this @e pid (lazy initialization).
998 struct GNUNET_CONTAINER_Heap *reassembly_heap;
1001 * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1003 struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1006 * Head of list of messages pending for this neighbour.
1008 struct PendingMessage *pending_msg_head;
1011 * Tail of list of messages pending for this neighbour.
1013 struct PendingMessage *pending_msg_tail;
1016 * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1017 * purged if this neighbour goes down.
1019 struct DistanceVectorHop *dv_head;
1022 * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1023 * purged if this neighbour goes down.
1025 struct DistanceVectorHop *dv_tail;
1028 * Head of DLL of ATS sessions to this peer.
1030 struct GNUNET_ATS_Session *session_head;
1033 * Tail of DLL of ATS sessions to this peer.
1035 struct GNUNET_ATS_Session *session_tail;
1038 * Task run to cleanup pending messages that have exceeded their timeout.
1040 struct GNUNET_SCHEDULER_Task *timeout_task;
1043 * Quota at which CORE is allowed to transmit to this peer
1046 * FIXME: not yet used, tricky to get right given multiple queues!
1047 * (=> Idea: let ATS set a quota per queue and we add them up here?)
1048 * FIXME: how do we set this value initially when we tell CORE?
1049 * Options: start at a minimum value or at literally zero (before ATS?)
1050 * (=> Current thought: clean would be zero!)
1052 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1055 * What is the earliest timeout of any message in @e pending_msg_tail?
1057 struct GNUNET_TIME_Absolute earliest_timeout;
1063 * Types of different pending messages.
1065 enum PendingMessageType
1069 * Ordinary message received from the CORE service.
1076 PMT_FRAGMENT_BOX = 1,
1081 PMT_RELIABILITY_BOX = 2,
1084 * Any type of acknowledgement.
1086 PMT_ACKNOWLEDGEMENT = 3
1093 * Transmission request that is awaiting delivery. The original
1094 * transmission requests from CORE may be too big for some queues.
1095 * In this case, a *tree* of fragments is created. At each
1096 * level of the tree, fragments are kept in a DLL ordered by which
1097 * fragment should be sent next (at the head). The tree is searched
1098 * top-down, with the original message at the root.
1100 * To select a node for transmission, first it is checked if the
1101 * current node's message fits with the MTU. If it does not, we
1102 * either calculate the next fragment (based on @e frag_off) from the
1103 * current node, or, if all fragments have already been created,
1104 * descend to the @e head_frag. Even though the node was already
1105 * fragmented, the fragment may be too big if the fragment was
1106 * generated for a queue with a larger MTU. In this case, the node
1107 * may be fragmented again, thus creating a tree.
1109 * When acknowledgements for fragments are received, the tree
1110 * must be pruned, removing those parts that were already
1111 * acknowledged. When fragments are sent over a reliable
1112 * channel, they can be immediately removed.
1114 * If a message is ever fragmented, then the original "full" message
1115 * is never again transmitted (even if it fits below the MTU), and
1116 * only (remaining) fragments are sent.
1118 struct PendingMessage
1121 * Kept in a MDLL of messages for this @a target.
1123 struct PendingMessage *next_neighbour;
1126 * Kept in a MDLL of messages for this @a target.
1128 struct PendingMessage *prev_neighbour;
1131 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1133 struct PendingMessage *next_client;
1136 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1138 struct PendingMessage *prev_client;
1141 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1143 struct PendingMessage *next_frag;
1146 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
1148 struct PendingMessage *prev_frag;
1151 * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1153 struct PendingMessage *bpm;
1156 * Target of the request.
1158 struct Neighbour *target;
1161 * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1163 struct TransportClient *client;
1166 * Head of a MDLL of fragments created for this core message.
1168 struct PendingMessage *head_frag;
1171 * Tail of a MDLL of fragments created for this core message.
1173 struct PendingMessage *tail_frag;
1176 * Our parent in the fragmentation tree.
1178 struct PendingMessage *frag_parent;
1181 * At what time should we give up on the transmission (and no longer retry)?
1183 struct GNUNET_TIME_Absolute timeout;
1186 * What is the earliest time for us to retry transmission of this message?
1188 struct GNUNET_TIME_Absolute next_attempt;
1191 * UUID to use for this message (used for reassembly of fragments, only
1192 * initialized if @e msg_uuid_set is #GNUNET_YES).
1194 struct GNUNET_ShortHashCode msg_uuid;
1197 * Counter incremented per generated fragment.
1199 uint32_t frag_uuidgen;
1202 * Type of the pending message.
1204 enum PendingMessageType pmt;
1207 * Size of the original message.
1212 * Offset at which we should generate the next fragment.
1217 * #GNUNET_YES once @e msg_uuid was initialized
1219 int16_t msg_uuid_set;
1221 /* Followed by @e bytes_msg to transmit */
1226 * One of the addresses of this peer.
1228 struct AddressListEntry
1234 struct AddressListEntry *next;
1239 struct AddressListEntry *prev;
1242 * Which communicator provides this address?
1244 struct TransportClient *tc;
1247 * The actual address.
1249 const char *address;
1252 * Current context for storing this address in the peerstore.
1254 struct GNUNET_PEERSTORE_StoreContext *sc;
1257 * Task to periodically do @e st operation.
1259 struct GNUNET_SCHEDULER_Task *st;
1262 * What is a typical lifetime the communicator expects this
1263 * address to have? (Always from now.)
1265 struct GNUNET_TIME_Relative expiration;
1268 * Address identifier used by the communicator.
1273 * Network type offered by this address.
1275 enum GNUNET_NetworkType nt;
1281 * Client connected to the transport service.
1283 struct TransportClient
1289 struct TransportClient *next;
1294 struct TransportClient *prev;
1297 * Handle to the client.
1299 struct GNUNET_SERVICE_Client *client;
1302 * Message queue to the client.
1304 struct GNUNET_MQ_Handle *mq;
1307 * What type of client is this?
1309 enum ClientType type;
1315 * Information for @e type #CT_CORE.
1320 * Head of list of messages pending for this client, sorted by
1321 * transmission time ("next_attempt" + possibly internal prioritization).
1323 struct PendingMessage *pending_msg_head;
1326 * Tail of list of messages pending for this client.
1328 struct PendingMessage *pending_msg_tail;
1333 * Information for @e type #CT_MONITOR.
1338 * Peer identity to monitor the addresses of.
1339 * Zero to monitor all neighbours. Valid if
1340 * @e type is #CT_MONITOR.
1342 struct GNUNET_PeerIdentity peer;
1345 * Is this a one-shot monitor?
1353 * Information for @e type #CT_COMMUNICATOR.
1357 * If @e type is #CT_COMMUNICATOR, this communicator
1358 * supports communicating using these addresses.
1360 char *address_prefix;
1363 * Head of DLL of queues offered by this communicator.
1365 struct GNUNET_ATS_Session *session_head;
1368 * Tail of DLL of queues offered by this communicator.
1370 struct GNUNET_ATS_Session *session_tail;
1373 * Head of list of the addresses of this peer offered by this communicator.
1375 struct AddressListEntry *addr_head;
1378 * Tail of list of the addresses of this peer offered by this communicator.
1380 struct AddressListEntry *addr_tail;
1383 * Number of queue entries in all queues to this communicator. Used
1384 * throttle sending to a communicator if we see that the communicator
1385 * is globally unable to keep up.
1387 unsigned int total_queue_length;
1390 * Characteristics of this communicator.
1392 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1402 * Head of linked list of all clients to this service.
1404 static struct TransportClient *clients_head;
1407 * Tail of linked list of all clients to this service.
1409 static struct TransportClient *clients_tail;
1412 * Statistics handle.
1414 static struct GNUNET_STATISTICS_Handle *GST_stats;
1417 * Configuration handle.
1419 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1424 static struct GNUNET_PeerIdentity GST_my_identity;
1429 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1432 * Map from PIDs to `struct Neighbour` entries. A peer is
1433 * a neighbour if we have an MQ to it from some communicator.
1435 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1438 * Map from PIDs to `struct DistanceVector` entries describing
1439 * known paths to the peer.
1441 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1444 * Database for peer's HELLOs.
1446 static struct GNUNET_PEERSTORE_Handle *peerstore;
1449 * Heap sorting `struct EphemeralCacheEntry` by their
1450 * key/signature validity.
1452 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1455 * Hash map for looking up `struct EphemeralCacheEntry`s
1456 * by peer identity. (We may have ephemerals in our
1457 * cache for which we do not have a neighbour entry,
1458 * and similar many neighbours may not need ephemerals,
1459 * so we use a second map.)
1461 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1464 * Task to free expired ephemerals.
1466 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1469 * Our connection to ATS for allocation and bootstrapping.
1471 static struct GNUNET_ATS_TransportHandle *ats;
1475 * Free cached ephemeral key.
1477 * @param ece cached signature to free
1480 free_ephemeral (struct EphemeralCacheEntry *ece)
1482 GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1485 GNUNET_CONTAINER_heap_remove_node (ece->hn);
1491 * Lookup neighbour record for peer @a pid.
1493 * @param pid neighbour to look for
1494 * @return NULL if we do not have this peer as a neighbour
1496 static struct Neighbour *
1497 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1499 return GNUNET_CONTAINER_multipeermap_get (neighbours,
1505 * Details about what to notify monitors about.
1510 * @deprecated To be discussed if we keep these...
1512 struct GNUNET_TIME_Absolute last_validation;
1513 struct GNUNET_TIME_Absolute valid_until;
1514 struct GNUNET_TIME_Absolute next_validation;
1517 * Current round-trip time estimate.
1519 struct GNUNET_TIME_Relative rtt;
1522 * Connection status.
1524 enum GNUNET_TRANSPORT_ConnectionStatus cs;
1529 uint32_t num_msg_pending;
1534 uint32_t num_bytes_pending;
1541 * Free a @dvh, and if it is the last path to the `target`,also
1542 * free the associated DV entry in #dv_routes.
1544 * @param dvh hop to free
1547 free_distance_vector_hop (struct DistanceVectorHop *dvh)
1549 struct Neighbour *n = dvh->next_hop;
1550 struct DistanceVector *dv = dvh->dv;
1552 GNUNET_CONTAINER_MDLL_remove (neighbour,
1556 GNUNET_CONTAINER_MDLL_remove (dv,
1561 if (NULL == dv->dv_head)
1563 GNUNET_assert (GNUNET_YES ==
1564 GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1567 if (NULL != dv->timeout_task)
1568 GNUNET_SCHEDULER_cancel (dv->timeout_task);
1575 * Free entry in #dv_routes. First frees all hops to the target, and
1576 * the last target will implicitly free @a dv as well.
1578 * @param dv route to free
1581 free_dv_route (struct DistanceVector *dv)
1583 struct DistanceVectorHop *dvh;
1585 while (NULL != (dvh = dv->dv_head))
1586 free_distance_vector_hop (dvh);
1591 * Notify monitor @a tc about an event. That @a tc
1592 * cares about the event has already been checked.
1594 * Send @a tc information in @a me about a @a peer's status with
1595 * respect to some @a address to all monitors that care.
1597 * @param tc monitor to inform
1598 * @param peer peer the information is about
1599 * @param address address the information is about
1600 * @param nt network type associated with @a address
1601 * @param me detailed information to transmit
1604 notify_monitor (struct TransportClient *tc,
1605 const struct GNUNET_PeerIdentity *peer,
1606 const char *address,
1607 enum GNUNET_NetworkType nt,
1608 const struct MonitorEvent *me)
1610 struct GNUNET_MQ_Envelope *env;
1611 struct GNUNET_TRANSPORT_MonitorData *md;
1612 size_t addr_len = strlen (address) + 1;
1614 env = GNUNET_MQ_msg_extra (md,
1616 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1617 md->nt = htonl ((uint32_t) nt);
1619 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1620 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1621 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1622 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1623 md->cs = htonl ((uint32_t) me->cs);
1624 md->num_msg_pending = htonl (me->num_msg_pending);
1625 md->num_bytes_pending = htonl (me->num_bytes_pending);
1629 GNUNET_MQ_send (tc->mq,
1635 * Send information in @a me about a @a peer's status with respect
1636 * to some @a address to all monitors that care.
1638 * @param peer peer the information is about
1639 * @param address address the information is about
1640 * @param nt network type associated with @a address
1641 * @param me detailed information to transmit
1644 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1645 const char *address,
1646 enum GNUNET_NetworkType nt,
1647 const struct MonitorEvent *me)
1649 static struct GNUNET_PeerIdentity zero;
1651 for (struct TransportClient *tc = clients_head;
1655 if (CT_MONITOR != tc->type)
1657 if (tc->details.monitor.one_shot)
1659 if ( (0 != memcmp (&tc->details.monitor.peer,
1662 (0 != memcmp (&tc->details.monitor.peer,
1676 * Called whenever a client connects. Allocates our
1677 * data structures associated with that client.
1679 * @param cls closure, NULL
1680 * @param client identification of the client
1681 * @param mq message queue for the client
1682 * @return our `struct TransportClient`
1685 client_connect_cb (void *cls,
1686 struct GNUNET_SERVICE_Client *client,
1687 struct GNUNET_MQ_Handle *mq)
1689 struct TransportClient *tc;
1691 tc = GNUNET_new (struct TransportClient);
1692 tc->client = client;
1694 GNUNET_CONTAINER_DLL_insert (clients_head,
1697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1698 "Client %p connected\n",
1707 * @param rc data structure to free
1710 free_reassembly_context (struct ReassemblyContext *rc)
1712 struct Neighbour *n = rc->neighbour;
1714 GNUNET_assert (rc ==
1715 GNUNET_CONTAINER_heap_remove_node (rc->hn));
1716 GNUNET_assert (GNUNET_OK ==
1717 GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1725 * Task run to clean up reassembly context of a neighbour that have expired.
1727 * @param cls a `struct Neighbour`
1730 reassembly_cleanup_task (void *cls)
1732 struct Neighbour *n = cls;
1733 struct ReassemblyContext *rc;
1735 n->reassembly_timeout_task = NULL;
1736 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
1738 if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
1740 free_reassembly_context (rc);
1743 GNUNET_assert (NULL == n->reassembly_timeout_task);
1744 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1745 &reassembly_cleanup_task,
1753 * function called to #free_reassembly_context().
1757 * @param value a `struct ReassemblyContext` to free
1758 * @return #GNUNET_OK (continue iteration)
1761 free_reassembly_cb (void *cls,
1762 const struct GNUNET_ShortHashCode *key,
1765 struct ReassemblyContext *rc = value;
1769 free_reassembly_context (rc);
1775 * Release memory used by @a neighbour.
1777 * @param neighbour neighbour entry to free
1780 free_neighbour (struct Neighbour *neighbour)
1782 struct DistanceVectorHop *dvh;
1784 GNUNET_assert (NULL == neighbour->session_head);
1785 GNUNET_assert (GNUNET_YES ==
1786 GNUNET_CONTAINER_multipeermap_remove (neighbours,
1789 if (NULL != neighbour->timeout_task)
1790 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1791 if (NULL != neighbour->reassembly_map)
1793 GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1794 &free_reassembly_cb,
1796 GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1797 neighbour->reassembly_map = NULL;
1798 GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
1799 neighbour->reassembly_heap = NULL;
1801 while (NULL != (dvh = neighbour->dv_head))
1802 free_distance_vector_hop (dvh);
1803 if (NULL != neighbour->reassembly_timeout_task)
1804 GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
1805 GNUNET_free (neighbour);
1810 * Send message to CORE clients that we lost a connection.
1812 * @param tc client to inform (must be CORE client)
1813 * @param pid peer the connection is for
1814 * @param quota_out current quota for the peer
1817 core_send_connect_info (struct TransportClient *tc,
1818 const struct GNUNET_PeerIdentity *pid,
1819 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1821 struct GNUNET_MQ_Envelope *env;
1822 struct ConnectInfoMessage *cim;
1824 GNUNET_assert (CT_CORE == tc->type);
1825 env = GNUNET_MQ_msg (cim,
1826 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1827 cim->quota_out = quota_out;
1829 GNUNET_MQ_send (tc->mq,
1835 * Send message to CORE clients that we gained a connection
1837 * @param pid peer the queue was for
1838 * @param quota_out current quota for the peer
1841 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1842 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1844 for (struct TransportClient *tc = clients_head;
1848 if (CT_CORE != tc->type)
1850 core_send_connect_info (tc,
1858 * Send message to CORE clients that we lost a connection.
1860 * @param pid peer the connection was for
1863 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1865 for (struct TransportClient *tc = clients_head;
1869 struct GNUNET_MQ_Envelope *env;
1870 struct DisconnectInfoMessage *dim;
1872 if (CT_CORE != tc->type)
1874 env = GNUNET_MQ_msg (dim,
1875 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1877 GNUNET_MQ_send (tc->mq,
1884 * We believe we are ready to transmit a message on a queue. Double-checks
1885 * with the queue's "tracker_out" and then gives the message to the
1886 * communicator for transmission (updating the tracker, and re-scheduling
1887 * itself if applicable).
1889 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1892 transmit_on_queue (void *cls);
1896 * Schedule next run of #transmit_on_queue(). Does NOTHING if
1897 * we should run immediately or if the message queue is empty.
1898 * Test for no task being added AND queue not being empty to
1899 * transmit immediately afterwards! This function must only
1900 * be called if the message queue is non-empty!
1902 * @param queue the queue to do scheduling for
1905 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1907 struct Neighbour *n = queue->neighbour;
1908 struct PendingMessage *pm = n->pending_msg_head;
1909 struct GNUNET_TIME_Relative out_delay;
1912 GNUNET_assert (NULL != pm);
1913 if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1915 GNUNET_STATISTICS_update (GST_stats,
1916 "# Transmission throttled due to communicator queue limit",
1921 if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1923 GNUNET_STATISTICS_update (GST_stats,
1924 "# Transmission throttled due to session queue limit",
1930 wsize = (0 == queue->mtu)
1931 ? pm->bytes_msg /* FIXME: add overheads? */
1933 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1935 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1937 if (0 == out_delay.rel_value_us)
1938 return; /* we should run immediately! */
1939 /* queue has changed since we were scheduled, reschedule again */
1940 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1943 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1944 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1945 "Next transmission on queue `%s' in %s (high delay)\n",
1947 GNUNET_STRINGS_relative_time_to_string (out_delay,
1950 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1951 "Next transmission on queue `%s' in %s\n",
1953 GNUNET_STRINGS_relative_time_to_string (out_delay,
1961 * @param session the session to free
1964 free_session (struct GNUNET_ATS_Session *session)
1966 struct Neighbour *neighbour = session->neighbour;
1967 struct TransportClient *tc = session->tc;
1968 struct MonitorEvent me = {
1969 .cs = GNUNET_TRANSPORT_CS_DOWN,
1970 .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1972 struct QueueEntry *qe;
1975 if (NULL != session->transmit_task)
1977 GNUNET_SCHEDULER_cancel (session->transmit_task);
1978 session->transmit_task = NULL;
1980 GNUNET_CONTAINER_MDLL_remove (neighbour,
1981 neighbour->session_head,
1982 neighbour->session_tail,
1984 GNUNET_CONTAINER_MDLL_remove (client,
1985 tc->details.communicator.session_head,
1986 tc->details.communicator.session_tail,
1988 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1989 while (NULL != (qe = session->queue_head))
1991 GNUNET_CONTAINER_DLL_remove (session->queue_head,
1992 session->queue_tail,
1994 session->queue_length--;
1995 tc->details.communicator.total_queue_length--;
1998 GNUNET_assert (0 == session->queue_length);
2000 (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2002 /* Communicator dropped below threshold, resume all queues */
2003 GNUNET_STATISTICS_update (GST_stats,
2004 "# Transmission throttled due to communicator queue limit",
2007 for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
2010 schedule_transmit_on_queue (s);
2012 notify_monitors (&neighbour->pid,
2016 GNUNET_ATS_session_del (session->sr);
2017 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
2018 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
2019 GNUNET_free (session);
2020 if (NULL == neighbour->session_head)
2022 cores_send_disconnect_info (&neighbour->pid);
2023 free_neighbour (neighbour);
2031 * @param ale address list entry to free
2034 free_address_list_entry (struct AddressListEntry *ale)
2036 struct TransportClient *tc = ale->tc;
2038 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2039 tc->details.communicator.addr_tail,
2041 if (NULL != ale->sc)
2043 GNUNET_PEERSTORE_store_cancel (ale->sc);
2046 if (NULL != ale->st)
2048 GNUNET_SCHEDULER_cancel (ale->st);
2056 * Called whenever a client is disconnected. Frees our
2057 * resources associated with that client.
2059 * @param cls closure, NULL
2060 * @param client identification of the client
2061 * @param app_ctx our `struct TransportClient`
2064 client_disconnect_cb (void *cls,
2065 struct GNUNET_SERVICE_Client *client,
2068 struct TransportClient *tc = app_ctx;
2070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071 "Client %p disconnected, cleaning up.\n",
2073 GNUNET_CONTAINER_DLL_remove (clients_head,
2082 struct PendingMessage *pm;
2084 while (NULL != (pm = tc->details.core.pending_msg_head))
2086 GNUNET_CONTAINER_MDLL_remove (client,
2087 tc->details.core.pending_msg_head,
2088 tc->details.core.pending_msg_tail,
2096 case CT_COMMUNICATOR:
2098 struct GNUNET_ATS_Session *q;
2099 struct AddressListEntry *ale;
2101 while (NULL != (q = tc->details.communicator.session_head))
2103 while (NULL != (ale = tc->details.communicator.addr_head))
2104 free_address_list_entry (ale);
2105 GNUNET_free (tc->details.communicator.address_prefix);
2114 * Iterator telling new CORE client about all existing
2115 * connections to peers.
2117 * @param cls the new `struct TransportClient`
2118 * @param pid a connected peer
2119 * @param value the `struct Neighbour` with more information
2120 * @return #GNUNET_OK (continue to iterate)
2123 notify_client_connect_info (void *cls,
2124 const struct GNUNET_PeerIdentity *pid,
2127 struct TransportClient *tc = cls;
2128 struct Neighbour *neighbour = value;
2130 core_send_connect_info (tc,
2132 neighbour->quota_out);
2138 * Initialize a "CORE" client. We got a start message from this
2139 * client, so add it to the list of clients for broadcasting of
2142 * @param cls the client
2143 * @param start the start message that was sent
2146 handle_client_start (void *cls,
2147 const struct StartMessage *start)
2149 struct TransportClient *tc = cls;
2152 options = ntohl (start->options);
2153 if ( (0 != (1 & options)) &&
2155 memcmp (&start->self,
2157 sizeof (struct GNUNET_PeerIdentity)) ) )
2159 /* client thinks this is a different peer, reject */
2161 GNUNET_SERVICE_client_drop (tc->client);
2164 if (CT_NONE != tc->type)
2167 GNUNET_SERVICE_client_drop (tc->client);
2171 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2172 ¬ify_client_connect_info,
2174 GNUNET_SERVICE_client_continue (tc->client);
2179 * Client asked for transmission to a peer. Process the request.
2181 * @param cls the client
2182 * @param obm the send message that was sent
2185 check_client_send (void *cls,
2186 const struct OutboundMessage *obm)
2188 struct TransportClient *tc = cls;
2190 const struct GNUNET_MessageHeader *obmm;
2192 if (CT_CORE != tc->type)
2195 return GNUNET_SYSERR;
2197 size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2198 if (size < sizeof (struct GNUNET_MessageHeader))
2201 return GNUNET_SYSERR;
2203 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2204 if (size != ntohs (obmm->size))
2207 return GNUNET_SYSERR;
2214 * Free fragment tree below @e root, excluding @e root itself.
2216 * @param root root of the tree to free
2219 free_fragment_tree (struct PendingMessage *root)
2221 struct PendingMessage *frag;
2223 while (NULL != (frag = root->head_frag))
2225 free_fragment_tree (frag);
2226 GNUNET_CONTAINER_MDLL_remove (frag,
2236 * Release memory associated with @a pm and remove @a pm from associated
2237 * data structures. @a pm must be a top-level pending message and not
2238 * a fragment in the tree. The entire tree is freed (if applicable).
2240 * @param pm the pending message to free
2243 free_pending_message (struct PendingMessage *pm)
2245 struct TransportClient *tc = pm->client;
2246 struct Neighbour *target = pm->target;
2250 GNUNET_CONTAINER_MDLL_remove (client,
2251 tc->details.core.pending_msg_head,
2252 tc->details.core.pending_msg_tail,
2255 GNUNET_CONTAINER_MDLL_remove (neighbour,
2256 target->pending_msg_head,
2257 target->pending_msg_tail,
2259 free_fragment_tree (pm);
2260 GNUNET_free_non_null (pm->bpm);
2266 * Send a response to the @a pm that we have processed a
2267 * "send" request with status @a success. We
2268 * transmitted @a bytes_physical on the actual wire.
2269 * Sends a confirmation to the "core" client responsible
2270 * for the original request and free's @a pm.
2272 * @param pm handle to the original pending message
2273 * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2274 * for transmission failure
2275 * @param bytes_physical amount of bandwidth consumed
2278 client_send_response (struct PendingMessage *pm,
2280 uint32_t bytes_physical)
2282 struct TransportClient *tc = pm->client;
2283 struct Neighbour *target = pm->target;
2284 struct GNUNET_MQ_Envelope *env;
2285 struct SendOkMessage *som;
2289 env = GNUNET_MQ_msg (som,
2290 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2291 som->success = htonl ((uint32_t) success);
2292 som->bytes_msg = htons (pm->bytes_msg);
2293 som->bytes_physical = htonl (bytes_physical);
2294 som->peer = target->pid;
2295 GNUNET_MQ_send (tc->mq,
2298 free_pending_message (pm);
2303 * Checks the message queue for a neighbour for messages that have timed
2304 * out and purges them.
2306 * @param cls a `struct Neighbour`
2309 check_queue_timeouts (void *cls)
2311 struct Neighbour *n = cls;
2312 struct PendingMessage *pm;
2313 struct GNUNET_TIME_Absolute now;
2314 struct GNUNET_TIME_Absolute earliest_timeout;
2316 n->timeout_task = NULL;
2317 earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2318 now = GNUNET_TIME_absolute_get ();
2319 for (struct PendingMessage *pos = n->pending_msg_head;
2323 pm = pos->next_neighbour;
2324 if (pos->timeout.abs_value_us <= now.abs_value_us)
2326 GNUNET_STATISTICS_update (GST_stats,
2327 "# messages dropped (timeout before confirmation)",
2330 client_send_response (pm,
2335 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2338 n->earliest_timeout = earliest_timeout;
2339 if (NULL != n->pending_msg_head)
2340 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2341 &check_queue_timeouts,
2347 * Client asked for transmission to a peer. Process the request.
2349 * @param cls the client
2350 * @param obm the send message that was sent
2353 handle_client_send (void *cls,
2354 const struct OutboundMessage *obm)
2356 struct TransportClient *tc = cls;
2357 struct PendingMessage *pm;
2358 const struct GNUNET_MessageHeader *obmm;
2359 struct Neighbour *target;
2362 GNUNET_assert (CT_CORE == tc->type);
2363 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2364 bytes_msg = ntohs (obmm->size);
2365 target = lookup_neighbour (&obm->peer);
2368 /* Failure: don't have this peer as a neighbour (anymore).
2369 Might have gone down asynchronously, so this is NOT
2370 a protocol violation by CORE. Still count the event,
2371 as this should be rare. */
2372 struct GNUNET_MQ_Envelope *env;
2373 struct SendOkMessage *som;
2375 env = GNUNET_MQ_msg (som,
2376 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2377 som->success = htonl (GNUNET_SYSERR);
2378 som->bytes_msg = htonl (bytes_msg);
2379 som->bytes_physical = htonl (0);
2380 som->peer = obm->peer;
2381 GNUNET_MQ_send (tc->mq,
2383 GNUNET_SERVICE_client_continue (tc->client);
2384 GNUNET_STATISTICS_update (GST_stats,
2385 "# messages dropped (neighbour unknown)",
2390 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2392 pm->target = target;
2393 pm->bytes_msg = bytes_msg;
2394 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2398 GNUNET_CONTAINER_MDLL_insert (neighbour,
2399 target->pending_msg_head,
2400 target->pending_msg_tail,
2402 GNUNET_CONTAINER_MDLL_insert (client,
2403 tc->details.core.pending_msg_head,
2404 tc->details.core.pending_msg_tail,
2406 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2408 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2409 if (NULL != target->timeout_task)
2410 GNUNET_SCHEDULER_cancel (target->timeout_task);
2411 target->timeout_task
2412 = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2413 &check_queue_timeouts,
2420 * Communicator started. Test message is well-formed.
2422 * @param cls the client
2423 * @param cam the send message that was sent
2426 check_communicator_available (void *cls,
2427 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2429 struct TransportClient *tc = cls;
2432 if (CT_NONE != tc->type)
2435 return GNUNET_SYSERR;
2437 tc->type = CT_COMMUNICATOR;
2438 size = ntohs (cam->header.size) - sizeof (*cam);
2440 return GNUNET_OK; /* receive-only communicator */
2441 GNUNET_MQ_check_zero_termination (cam);
2447 * Communicator started. Process the request.
2449 * @param cls the client
2450 * @param cam the send message that was sent
2453 handle_communicator_available (void *cls,
2454 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2456 struct TransportClient *tc = cls;
2459 size = ntohs (cam->header.size) - sizeof (*cam);
2461 return; /* receive-only communicator */
2462 tc->details.communicator.address_prefix
2463 = GNUNET_strdup ((const char *) &cam[1]);
2464 tc->details.communicator.cc
2465 = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2466 GNUNET_SERVICE_client_continue (tc->client);
2471 * Communicator requests backchannel transmission. Check the request.
2473 * @param cls the client
2474 * @param cb the send message that was sent
2475 * @return #GNUNET_OK if message is well-formed
2478 check_communicator_backchannel (void *cls,
2479 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2481 const struct GNUNET_MessageHeader *inbox;
2486 msize = ntohs (cb->header.size) - sizeof (*cb);
2487 if (UINT16_MAX - msize >
2488 sizeof (struct TransportBackchannelEncapsulationMessage) +
2489 sizeof (struct TransportBackchannelRequestPayload) )
2492 return GNUNET_SYSERR;
2494 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
2495 isize = ntohs (inbox->size);
2499 return GNUNET_SYSERR;
2501 is = (const char *) inbox;
2504 GNUNET_assert (msize > 0);
2505 if ('\0' != is[msize-1])
2508 return GNUNET_SYSERR;
2515 * Remove memory used by expired ephemeral keys.
2520 expire_ephemerals (void *cls)
2522 struct EphemeralCacheEntry *ece;
2525 ephemeral_task = NULL;
2526 while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
2528 if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
2530 free_ephemeral (ece);
2533 ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2542 * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
2543 * one, cache it and return it.
2545 * @param pid peer to look up ephemeral for
2546 * @param private_key[out] set to the private key
2547 * @param ephemeral_key[out] set to the key
2548 * @param ephemeral_sender_sig[out] set to the signature
2549 * @param ephemeral_validity[out] set to the validity expiration time
2552 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
2553 struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
2554 struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
2555 struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
2556 struct GNUNET_TIME_Absolute *ephemeral_validity)
2558 struct EphemeralCacheEntry *ece;
2559 struct EphemeralConfirmation ec;
2561 ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
2563 if ( (NULL != ece) &&
2564 (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
2566 free_ephemeral (ece);
2571 ece = GNUNET_new (struct EphemeralCacheEntry);
2573 ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
2574 EPHEMERAL_VALIDITY);
2575 GNUNET_assert (GNUNET_OK ==
2576 GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
2577 GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
2578 &ece->ephemeral_key);
2579 ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
2580 ec.purpose.size = htonl (sizeof (ec));
2582 ec.ephemeral_key = ece->ephemeral_key;
2583 GNUNET_assert (GNUNET_OK ==
2584 GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
2587 ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
2589 ece->ephemeral_validity.abs_value_us);
2590 GNUNET_assert (GNUNET_OK ==
2591 GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
2594 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2595 if (NULL == ephemeral_task)
2596 ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2600 *private_key = ece->private_key;
2601 *ephemeral_key = ece->ephemeral_key;
2602 *ephemeral_sender_sig = ece->sender_sig;
2603 *ephemeral_validity = ece->ephemeral_validity;
2608 * We need to transmit @a hdr to @a target. If necessary, this may
2609 * involve DV routing or even broadcasting and fragmentation.
2611 * @param target peer to receive @a hdr
2612 * @param hdr header of the message to route
2615 route_message (const struct GNUNET_PeerIdentity *target,
2616 struct GNUNET_MessageHeader *hdr)
2618 // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
2624 * Communicator requests backchannel transmission. Process the request.
2626 * @param cls the client
2627 * @param cb the send message that was sent
2630 handle_communicator_backchannel (void *cls,
2631 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2633 struct TransportClient *tc = cls;
2634 struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
2635 struct GNUNET_TIME_Absolute ephemeral_validity;
2636 struct TransportBackchannelEncapsulationMessage *enc;
2637 struct TransportBackchannelRequestPayload ppay;
2641 /* encapsulate and encrypt message */
2642 msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
2643 enc = GNUNET_malloc (sizeof (*enc) + msize);
2644 enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
2645 enc->header.size = htons (sizeof (*enc) + msize);
2646 enc->target = cb->pid;
2647 lookup_ephemeral (&cb->pid,
2649 &enc->ephemeral_key,
2651 &ephemeral_validity);
2652 // FIXME: setup 'iv'
2654 dh_key_derive (&private_key,
2659 ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
2660 ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
2661 mpos = (char *) &enc[1];
2670 ntohs (cb->header.size) - sizeof (*cb));
2674 route_message (&cb->pid,
2676 GNUNET_SERVICE_client_continue (tc->client);
2681 * Address of our peer added. Test message is well-formed.
2683 * @param cls the client
2684 * @param aam the send message that was sent
2685 * @return #GNUNET_OK if message is well-formed
2688 check_add_address (void *cls,
2689 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2691 struct TransportClient *tc = cls;
2693 if (CT_COMMUNICATOR != tc->type)
2696 return GNUNET_SYSERR;
2698 GNUNET_MQ_check_zero_termination (aam);
2704 * Ask peerstore to store our address.
2706 * @param cls an `struct AddressListEntry *`
2709 store_pi (void *cls);
2713 * Function called when peerstore is done storing our address.
2716 peerstore_store_cb (void *cls,
2719 struct AddressListEntry *ale = cls;
2722 if (GNUNET_YES != success)
2723 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2724 "Failed to store our own address `%s' in peerstore!\n",
2726 /* refresh period is 1/4 of expiration time, that should be plenty
2727 without being excessive. */
2728 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2736 * Ask peerstore to store our address.
2738 * @param cls an `struct AddressListEntry *`
2741 store_pi (void *cls)
2743 struct AddressListEntry *ale = cls;
2746 struct GNUNET_TIME_Absolute expiration;
2749 expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2750 GNUNET_HELLO_sign_address (ale->address,
2756 ale->sc = GNUNET_PEERSTORE_store (peerstore,
2759 GNUNET_HELLO_PEERSTORE_KEY,
2763 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2764 &peerstore_store_cb,
2767 if (NULL == ale->sc)
2769 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2770 "Failed to store our address `%s' with peerstore\n",
2772 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2780 * Address of our peer added. Process the request.
2782 * @param cls the client
2783 * @param aam the send message that was sent
2786 handle_add_address (void *cls,
2787 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2789 struct TransportClient *tc = cls;
2790 struct AddressListEntry *ale;
2793 slen = ntohs (aam->header.size) - sizeof (*aam);
2794 ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2796 ale->address = (const char *) &ale[1];
2797 ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2798 ale->aid = aam->aid;
2799 ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2803 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2804 tc->details.communicator.addr_tail,
2806 ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2808 GNUNET_SERVICE_client_continue (tc->client);
2813 * Address of our peer deleted. Process the request.
2815 * @param cls the client
2816 * @param dam the send message that was sent
2819 handle_del_address (void *cls,
2820 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2822 struct TransportClient *tc = cls;
2824 if (CT_COMMUNICATOR != tc->type)
2827 GNUNET_SERVICE_client_drop (tc->client);
2830 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2834 if (dam->aid != ale->aid)
2836 GNUNET_assert (ale->tc == tc);
2837 free_address_list_entry (ale);
2838 GNUNET_SERVICE_client_continue (tc->client);
2841 GNUNET_SERVICE_client_drop (tc->client);
2846 * Context from #handle_incoming_msg(). Closure for many
2847 * message handlers below.
2849 struct CommunicatorMessageContext
2852 * Which communicator provided us with the message.
2854 struct TransportClient *tc;
2857 * Additional information for flow control and about the sender.
2859 struct GNUNET_TRANSPORT_IncomingMessage im;
2862 * Number of hops the message has travelled (if DV-routed).
2863 * FIXME: make use of this in ACK handling!
2865 uint16_t total_hops;
2870 * Given an inbound message @a msg from a communicator @a cmc,
2871 * demultiplex it based on the type calling the right handler.
2873 * @param cmc context for demultiplexing
2874 * @param msg message to demultiplex
2877 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
2878 const struct GNUNET_MessageHeader *msg);
2882 * Send ACK to communicator (if requested) and free @a cmc.
2884 * @param cmc context for which we are done handling the message
2887 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2889 if (0 != ntohl (cmc->im.fc_on))
2891 /* send ACK when done to communicator for flow control! */
2892 struct GNUNET_MQ_Envelope *env;
2893 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2895 env = GNUNET_MQ_msg (ack,
2896 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2897 ack->reserved = htonl (0);
2898 ack->fc_id = cmc->im.fc_id;
2899 ack->sender = cmc->im.sender;
2900 GNUNET_MQ_send (cmc->tc->mq,
2903 GNUNET_SERVICE_client_continue (cmc->tc->client);
2909 * Communicator gave us an unencapsulated message to pass as-is to
2910 * CORE. Process the request.
2912 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2913 * @param mh the message that was received
2916 handle_raw_message (void *cls,
2917 const struct GNUNET_MessageHeader *mh)
2919 struct CommunicatorMessageContext *cmc = cls;
2920 uint16_t size = ntohs (mh->size);
2922 if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
2923 (size < sizeof (struct GNUNET_MessageHeader)) )
2925 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
2928 finish_cmc_handling (cmc);
2929 GNUNET_SERVICE_client_drop (client);
2932 /* Forward to all CORE clients */
2933 for (struct TransportClient *tc = clients_head;
2937 struct GNUNET_MQ_Envelope *env;
2938 struct InboundMessage *im;
2940 if (CT_CORE != tc->type)
2942 env = GNUNET_MQ_msg_extra (im,
2944 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2945 im->peer = cmc->im.sender;
2949 GNUNET_MQ_send (tc->mq,
2952 /* FIXME: consider doing this _only_ once the message
2953 was drained from the CORE MQs to extend flow control to CORE!
2954 (basically, increment counter in cmc, decrement on MQ send continuation! */
2955 finish_cmc_handling (cmc);
2960 * Communicator gave us a fragment box. Check the message.
2962 * @param cls a `struct CommunicatorMessageContext`
2963 * @param fb the send message that was sent
2964 * @return #GNUNET_YES if message is well-formed
2967 check_fragment_box (void *cls,
2968 const struct TransportFragmentBox *fb)
2970 uint16_t size = ntohs (fb->header.size);
2971 uint16_t bsize = size - sizeof (*fb);
2975 GNUNET_break_op (0);
2976 return GNUNET_SYSERR;
2978 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
2980 GNUNET_break_op (0);
2981 return GNUNET_SYSERR;
2983 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
2985 GNUNET_break_op (0);
2986 return GNUNET_SYSERR;
2993 * Generate a fragment acknowledgement for an @a rc.
2995 * @param rc context to generate ACK for, @a rc ACK state is reset
2998 send_fragment_ack (struct ReassemblyContext *rc)
3000 struct TransportFragmentAckMessage *ack;
3002 ack = GNUNET_new (struct TransportFragmentAckMessage);
3003 ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3004 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3005 ack->frag_uuid = htonl (rc->frag_uuid);
3006 ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3007 ack->msg_uuid = rc->msg_uuid;
3008 ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3009 if (0 == rc->msg_missing)
3010 ack->reassembly_timeout
3011 = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3013 ack->reassembly_timeout
3014 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3015 route_message (&rc->neighbour->pid,
3017 rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3019 rc->extra_acks = 0LLU;
3024 * Communicator gave us a fragment. Process the request.
3026 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3027 * @param fb the message that was received
3030 handle_fragment_box (void *cls,
3031 const struct TransportFragmentBox *fb)
3033 struct CommunicatorMessageContext *cmc = cls;
3034 struct Neighbour *n;
3035 struct ReassemblyContext *rc;
3036 const struct GNUNET_MessageHeader *msg;
3042 struct GNUNET_TIME_Relative cdelay;
3045 n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3049 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3052 finish_cmc_handling (cmc);
3053 GNUNET_SERVICE_client_drop (client);
3056 if (NULL == n->reassembly_map)
3058 n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3060 n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3061 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3062 &reassembly_cleanup_task,
3065 msize = ntohs (fb->msg_size);
3066 rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3070 rc = GNUNET_malloc (sizeof (*rc) +
3071 msize + /* reassembly payload buffer */
3072 (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3073 rc->msg_uuid = fb->msg_uuid;
3075 rc->msg_size = msize;
3076 rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3077 rc->last_frag = GNUNET_TIME_absolute_get ();
3078 rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3080 rc->reassembly_timeout.abs_value_us);
3081 GNUNET_assert (GNUNET_OK ==
3082 GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3085 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3086 target = (char *) &rc[1];
3087 rc->bitfield = (uint8_t *) (target + rc->msg_size);
3088 rc->msg_missing = rc->msg_size;
3092 target = (char *) &rc[1];
3094 if (msize != rc->msg_size)
3097 finish_cmc_handling (cmc);
3102 fsize = ntohs (fb->header.size) - sizeof (*fb);
3103 frag_off = ntohs (fb->frag_off);
3104 memcpy (&target[frag_off],
3107 /* update bitfield and msg_missing */
3108 for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3110 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3112 rc->bitfield[i / 8] |= (1 << (i % 8));
3117 /* Compute cummulative ACK */
3118 frag_uuid = ntohl (fb->frag_uuid);
3119 cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3120 cdelay = GNUNET_TIME_relative_multiply (cdelay,
3122 rc->last_frag = GNUNET_TIME_absolute_get ();
3123 rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3125 ack_now = GNUNET_NO;
3126 if (0 == rc->num_acks)
3128 /* case one: first ack */
3129 rc->frag_uuid = frag_uuid;
3130 rc->extra_acks = 0LLU;
3133 else if ( (frag_uuid >= rc->frag_uuid) &&
3134 (frag_uuid <= rc->frag_uuid + 64) )
3136 /* case two: ack fits after existing min UUID */
3137 if ( (frag_uuid == rc->frag_uuid) ||
3138 (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3140 /* duplicate fragment, ack now! */
3141 ack_now = GNUNET_YES;
3145 rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3149 else if ( (rc->frag_uuid > frag_uuid) &&
3150 ( ( (rc->frag_uuid == frag_uuid + 64) &&
3151 (0 == rc->extra_acks) ) ||
3152 ( (rc->frag_uuid < frag_uuid + 64) &&
3153 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3155 /* can fit ack by shifting extra acks and starting at
3156 frag_uid, test above esured that the bits we will
3157 shift 'extra_acks' by are all zero. */
3158 rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3159 rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3160 rc->frag_uuid = frag_uuid;
3163 if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3164 ack_now = GNUNET_YES; /* maximum acks received */
3165 // FIXME: possibly also ACK based on RTT (but for that we'd need to
3166 // determine the session used for the ACK first!)
3168 /* is reassembly complete? */
3169 if (0 != rc->msg_missing)
3172 send_fragment_ack (rc);
3173 finish_cmc_handling (cmc);
3176 /* reassembly is complete, verify result */
3177 msg = (const struct GNUNET_MessageHeader *) &rc[1];
3178 if (ntohs (msg->size) != rc->msg_size)
3181 free_reassembly_context (rc);
3182 finish_cmc_handling (cmc);
3185 /* successful reassembly */
3186 send_fragment_ack (rc);
3187 demultiplex_with_cmc (cmc,
3189 /* FIXME: really free here? Might be bad if fragments are still
3190 en-route and we forget that we finished this reassembly immediately!
3191 -> keep around until timeout?
3192 -> shorten timeout based on ACK? */
3193 free_reassembly_context (rc);
3198 * Communicator gave us a fragment acknowledgement. Process the request.
3200 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3201 * @param fa the message that was received
3204 handle_fragment_ack (void *cls,
3205 const struct TransportFragmentAckMessage *fa)
3207 struct CommunicatorMessageContext *cmc = cls;
3209 // FIXME: do work: identify original message; then identify fragments being acked;
3210 // remove those from the tree to prevent retransmission;
3212 // if entire message is ACKed, handle that as well.
3213 finish_cmc_handling (cmc);
3218 * Communicator gave us a reliability box. Check the message.
3220 * @param cls a `struct CommunicatorMessageContext`
3221 * @param rb the send message that was sent
3222 * @return #GNUNET_YES if message is well-formed
3225 check_reliability_box (void *cls,
3226 const struct TransportReliabilityBox *rb)
3228 GNUNET_MQ_check_boxed_message (rb);
3234 * Communicator gave us a reliability box. Process the request.
3236 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3237 * @param rb the message that was received
3240 handle_reliability_box (void *cls,
3241 const struct TransportReliabilityBox *rb)
3243 struct CommunicatorMessageContext *cmc = cls;
3244 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3246 if (0 == ntohl (rb->ack_countdown))
3248 struct TransportReliabilityAckMessage *ack;
3250 /* FIXME: implement cummulative ACKs and ack_countdown,
3251 then setting the avg_ack_delay field below: */
3252 ack = GNUNET_malloc (sizeof (*ack) +
3253 sizeof (struct GNUNET_ShortHashCode));
3254 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3255 ack->header.size = htons (sizeof (*ack) +
3256 sizeof (struct GNUNET_ShortHashCode));
3259 sizeof (struct GNUNET_ShortHashCode));
3260 route_message (&cmc->im.sender,
3263 /* continue with inner message */
3264 demultiplex_with_cmc (cmc,
3270 * Communicator gave us a reliability ack. Process the request.
3272 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3273 * @param ra the message that was received
3276 handle_reliability_ack (void *cls,
3277 const struct TransportReliabilityAckMessage *ra)
3279 struct CommunicatorMessageContext *cmc = cls;
3281 // FIXME: do work: find message that was acknowledged, and
3282 // remove from transmission queue; update RTT.
3283 finish_cmc_handling (cmc);
3288 * Communicator gave us a backchannel encapsulation. Check the message.
3290 * @param cls a `struct CommunicatorMessageContext`
3291 * @param be the send message that was sent
3292 * @return #GNUNET_YES if message is well-formed
3295 check_backchannel_encapsulation (void *cls,
3296 const struct TransportBackchannelEncapsulationMessage *be)
3298 uint16_t size = ntohs (be->header.size);
3300 if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3302 GNUNET_break_op (0);
3303 return GNUNET_SYSERR;
3310 * Communicator gave us a backchannel encapsulation. Process the request.
3312 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3313 * @param be the message that was received
3316 handle_backchannel_encapsulation (void *cls,
3317 const struct TransportBackchannelEncapsulationMessage *be)
3319 struct CommunicatorMessageContext *cmc = cls;
3321 if (0 != memcmp (&be->target,
3323 sizeof (struct GNUNET_PeerIdentity)))
3325 /* not for me, try to route to target */
3326 route_message (&be->target,
3327 GNUNET_copy_message (&be->header));
3328 finish_cmc_handling (cmc);
3331 // FIXME: compute shared secret
3332 // FIXME: check HMAC
3333 // FIXME: decrypt payload
3334 // FIXME: forward to specified communicator!
3335 // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3336 finish_cmc_handling (cmc);
3341 * Communicator gave us a DV learn message. Check the message.
3343 * @param cls a `struct CommunicatorMessageContext`
3344 * @param dvl the send message that was sent
3345 * @return #GNUNET_YES if message is well-formed
3348 check_dv_learn (void *cls,
3349 const struct TransportDVLearn *dvl)
3351 uint16_t size = ntohs (dvl->header.size);
3352 uint16_t num_hops = ntohs (dvl->num_hops);
3353 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
3355 if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
3357 GNUNET_break_op (0);
3358 return GNUNET_SYSERR;
3360 for (unsigned int i=0;i<num_hops;i++)
3362 if (0 == memcmp (&dvl->initiator,
3364 sizeof (struct GNUNET_PeerIdentity)))
3366 GNUNET_break_op (0);
3367 return GNUNET_SYSERR;
3369 if (0 == memcmp (&GST_my_identity,
3371 sizeof (struct GNUNET_PeerIdentity)))
3373 GNUNET_break_op (0);
3374 return GNUNET_SYSERR;
3382 * Communicator gave us a DV learn message. Process the request.
3384 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3385 * @param dvl the message that was received
3388 handle_dv_learn (void *cls,
3389 const struct TransportDVLearn *dvl)
3391 struct CommunicatorMessageContext *cmc = cls;
3393 // FIXME: learn path from DV message (if bi-directional flags are set)
3394 // FIXME: expand DV message, forward on (unless path is getting too long)
3395 finish_cmc_handling (cmc);
3400 * Communicator gave us a DV box. Check the message.
3402 * @param cls a `struct CommunicatorMessageContext`
3403 * @param dvb the send message that was sent
3404 * @return #GNUNET_YES if message is well-formed
3407 check_dv_box (void *cls,
3408 const struct TransportDVBox *dvb)
3410 uint16_t size = ntohs (dvb->header.size);
3411 uint16_t num_hops = ntohs (dvb->num_hops);
3412 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3413 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3417 if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
3419 GNUNET_break_op (0);
3420 return GNUNET_SYSERR;
3422 isize = ntohs (inbox->size);
3423 if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
3425 GNUNET_break_op (0);
3426 return GNUNET_SYSERR;
3428 itype = ntohs (inbox->type);
3429 if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
3430 (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
3432 GNUNET_break_op (0);
3433 return GNUNET_SYSERR;
3440 * Communicator gave us a DV box. Process the request.
3442 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3443 * @param dvb the message that was received
3446 handle_dv_box (void *cls,
3447 const struct TransportDVBox *dvb)
3449 struct CommunicatorMessageContext *cmc = cls;
3450 uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3451 uint16_t num_hops = ntohs (dvb->num_hops);
3452 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3453 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3457 // FIXME: if we are not the target, shorten path and forward along.
3458 // Try from the _end_ of hops array if we know the given
3459 // neighbour (shortening the path!).
3460 // NOTE: increment total_hops!
3461 finish_cmc_handling (cmc);
3464 /* We are the target. Unbox and handle message. */
3465 cmc->im.sender = dvb->origin;
3466 cmc->total_hops = ntohs (dvb->total_hops);
3467 demultiplex_with_cmc (cmc,
3473 * Client notified us about transmission from a peer. Process the request.
3475 * @param cls a `struct TransportClient` which sent us the message
3476 * @param obm the send message that was sent
3477 * @return #GNUNET_YES if message is well-formed
3480 check_incoming_msg (void *cls,
3481 const struct GNUNET_TRANSPORT_IncomingMessage *im)
3483 struct TransportClient *tc = cls;
3485 if (CT_COMMUNICATOR != tc->type)
3488 return GNUNET_SYSERR;
3490 GNUNET_MQ_check_boxed_message (im);
3496 * Incoming meessage. Process the request.
3498 * @param im the send message that was received
3501 handle_incoming_msg (void *cls,
3502 const struct GNUNET_TRANSPORT_IncomingMessage *im)
3504 struct TransportClient *tc = cls;
3505 struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
3509 demultiplex_with_cmc (cmc,
3510 (const struct GNUNET_MessageHeader *) &im[1]);
3515 * Given an inbound message @a msg from a communicator @a cmc,
3516 * demultiplex it based on the type calling the right handler.
3518 * @param cmc context for demultiplexing
3519 * @param msg message to demultiplex
3522 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3523 const struct GNUNET_MessageHeader *msg)
3525 struct GNUNET_MQ_MessageHandler handlers[] = {
3526 GNUNET_MQ_hd_var_size (fragment_box,
3527 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
3528 struct TransportFragmentBox,
3530 GNUNET_MQ_hd_fixed_size (fragment_ack,
3531 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
3532 struct TransportFragmentAckMessage,
3534 GNUNET_MQ_hd_var_size (reliability_box,
3535 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
3536 struct TransportReliabilityBox,
3538 GNUNET_MQ_hd_fixed_size (reliability_ack,
3539 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
3540 struct TransportReliabilityAckMessage,
3542 GNUNET_MQ_hd_var_size (backchannel_encapsulation,
3543 GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
3544 struct TransportBackchannelEncapsulationMessage,
3546 GNUNET_MQ_hd_var_size (dv_learn,
3547 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
3548 struct TransportDVLearn,
3550 GNUNET_MQ_hd_var_size (dv_box,
3551 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
3552 struct TransportDVBox,
3554 GNUNET_MQ_handler_end()
3558 ret = GNUNET_MQ_handle_message (handlers,
3560 if (GNUNET_SYSERR == ret)
3563 GNUNET_SERVICE_client_drop (cmc->tc->client);
3567 if (GNUNET_NO == ret)
3569 /* unencapsulated 'raw' message */
3570 handle_raw_message (&cmc,
3577 * New queue became available. Check message.
3579 * @param cls the client
3580 * @param aqm the send message that was sent
3583 check_add_queue_message (void *cls,
3584 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3586 struct TransportClient *tc = cls;
3588 if (CT_COMMUNICATOR != tc->type)
3591 return GNUNET_SYSERR;
3593 GNUNET_MQ_check_zero_termination (aqm);
3599 * Bandwidth tracker informs us that the delay until we should receive
3602 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3605 tracker_update_in_cb (void *cls)
3607 struct GNUNET_ATS_Session *queue = cls;
3608 struct GNUNET_TIME_Relative in_delay;
3611 rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
3612 in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
3614 // FIXME: how exactly do we do inbound flow control?
3619 * If necessary, generates the UUID for a @a pm
3621 * @param pm pending message to generate UUID for.
3624 set_pending_message_uuid (struct PendingMessage *pm)
3626 if (pm->msg_uuid_set)
3628 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3630 sizeof (pm->msg_uuid));
3631 pm->msg_uuid_set = GNUNET_YES;
3636 * Fragment the given @a pm to the given @a mtu. Adds
3637 * additional fragments to the neighbour as well. If the
3638 * @a mtu is too small, generates and error for the @a pm
3641 * @param pm pending message to fragment for transmission
3642 * @param mtu MTU to apply
3643 * @return new message to transmit
3645 static struct PendingMessage *
3646 fragment_message (struct PendingMessage *pm,
3649 struct PendingMessage *ff;
3651 set_pending_message_uuid (pm);
3653 /* This invariant is established in #handle_add_queue_message() */
3654 GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
3656 /* select fragment for transmission, descending the tree if it has
3657 been expanded until we are at a leaf or at a fragment that is small enough */
3659 while ( ( (ff->bytes_msg > mtu) ||
3661 (ff->frag_off == ff->bytes_msg) &&
3662 (NULL != ff->head_frag) )
3664 ff = ff->head_frag; /* descent into fragmented fragments */
3667 if ( ( (ff->bytes_msg > mtu) ||
3669 (pm->frag_off < pm->bytes_msg) )
3671 /* Did not yet calculate all fragments, calculate next fragment */
3672 struct PendingMessage *frag;
3673 struct TransportFragmentBox tfb;
3681 orig = (const char *) &ff[1];
3682 msize = ff->bytes_msg;
3685 const struct TransportFragmentBox *tfbo;
3687 tfbo = (const struct TransportFragmentBox *) orig;
3688 orig += sizeof (struct TransportFragmentBox);
3689 msize -= sizeof (struct TransportFragmentBox);
3690 xoff = ntohs (tfbo->frag_off);
3692 fragmax = mtu - sizeof (struct TransportFragmentBox);
3693 fragsize = GNUNET_MIN (msize - ff->frag_off,
3695 frag = GNUNET_malloc (sizeof (struct PendingMessage) +
3696 sizeof (struct TransportFragmentBox) +
3698 frag->target = pm->target;
3699 frag->frag_parent = ff;
3700 frag->timeout = pm->timeout;
3701 frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
3702 frag->pmt = PMT_FRAGMENT_BOX;
3703 msg = (char *) &frag[1];
3704 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
3705 tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
3707 tfb.frag_uuid = htonl (pm->frag_uuidgen++);
3708 tfb.msg_uuid = pm->msg_uuid;
3709 tfb.frag_off = htons (ff->frag_off + xoff);
3710 tfb.msg_size = htons (pm->bytes_msg);
3714 memcpy (&msg[sizeof (tfb)],
3715 &orig[ff->frag_off],
3717 GNUNET_CONTAINER_MDLL_insert (frag,
3721 ff->frag_off += fragsize;
3725 /* Move head to the tail and return it */
3726 GNUNET_CONTAINER_MDLL_remove (frag,
3727 ff->frag_parent->head_frag,
3728 ff->frag_parent->tail_frag,
3730 GNUNET_CONTAINER_MDLL_insert_tail (frag,
3731 ff->frag_parent->head_frag,
3732 ff->frag_parent->tail_frag,
3739 * Reliability-box the given @a pm. On error (can there be any), NULL
3740 * may be returned, otherwise the "replacement" for @a pm (which
3741 * should then be added to the respective neighbour's queue instead of
3742 * @a pm). If the @a pm is already fragmented or reliability boxed,
3743 * or itself an ACK, this function simply returns @a pm.
3745 * @param pm pending message to box for transmission over unreliabile queue
3746 * @return new message to transmit
3748 static struct PendingMessage *
3749 reliability_box_message (struct PendingMessage *pm)
3751 struct TransportReliabilityBox rbox;
3752 struct PendingMessage *bpm;
3755 if (PMT_CORE != pm->pmt)
3756 return pm; /* already fragmented or reliability boxed, or control message: do nothing */
3757 if (NULL != pm->bpm)
3758 return pm->bpm; /* already computed earlier: do nothing */
3759 GNUNET_assert (NULL == pm->head_frag);
3760 if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
3764 client_send_response (pm,
3769 bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
3772 bpm->target = pm->target;
3773 bpm->frag_parent = pm;
3774 GNUNET_CONTAINER_MDLL_insert (frag,
3778 bpm->timeout = pm->timeout;
3779 bpm->pmt = PMT_RELIABILITY_BOX;
3780 bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
3781 set_pending_message_uuid (bpm);
3782 rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
3783 rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
3784 rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
3785 rbox.msg_uuid = pm->msg_uuid;
3786 msg = (char *) &bpm[1];
3790 memcpy (&msg[sizeof (rbox)],
3799 * We believe we are ready to transmit a message on a queue. Double-checks
3800 * with the queue's "tracker_out" and then gives the message to the
3801 * communicator for transmission (updating the tracker, and re-scheduling
3802 * itself if applicable).
3804 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
3807 transmit_on_queue (void *cls)
3809 struct GNUNET_ATS_Session *queue = cls;
3810 struct Neighbour *n = queue->neighbour;
3811 struct QueueEntry *qe;
3812 struct PendingMessage *pm;
3813 struct PendingMessage *s;
3815 struct GNUNET_TRANSPORT_SendMessageTo *smt;
3816 struct GNUNET_MQ_Envelope *env;
3818 queue->transmit_task = NULL;
3819 if (NULL == (pm = n->pending_msg_head))
3821 /* no message pending, nothing to do here! */
3824 schedule_transmit_on_queue (queue);
3825 if (NULL != queue->transmit_task)
3826 return; /* do it later */
3828 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3829 overhead += sizeof (struct TransportReliabilityBox);
3831 if ( ( (0 != queue->mtu) &&
3832 (pm->bytes_msg + overhead > queue->mtu) ) ||
3833 (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3834 (NULL != pm->head_frag /* fragments already exist, should
3835 respect that even if MTU is 0 for
3837 s = fragment_message (s,
3839 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3843 /* Fragmentation failed, try next message... */
3844 schedule_transmit_on_queue (queue);
3847 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3848 s = reliability_box_message (s);
3851 /* Reliability boxing failed, try next message... */
3852 schedule_transmit_on_queue (queue);
3856 /* Pass 's' for transission to the communicator */
3857 qe = GNUNET_new (struct QueueEntry);
3858 qe->mid = queue->mid_gen++;
3859 qe->session = queue;
3860 // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3861 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3864 env = GNUNET_MQ_msg_extra (smt,
3866 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3867 smt->qid = queue->qid;
3869 smt->receiver = n->pid;
3873 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3874 queue->queue_length++;
3875 queue->tc->details.communicator.total_queue_length++;
3876 GNUNET_MQ_send (queue->tc->mq,
3879 // FIXME: do something similar to the logic below
3880 // in defragmentation / reliability ACK handling!
3882 /* Check if this transmission somehow conclusively finished handing 'pm'
3883 even without any explicit ACKs */
3884 if ( (PMT_CORE == s->pmt) &&
3885 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3887 /* Full message sent, and over reliabile channel */
3888 client_send_response (pm,
3892 else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3893 (PMT_FRAGMENT_BOX == s->pmt) )
3895 struct PendingMessage *pos;
3897 /* Fragment sent over reliabile channel */
3898 free_fragment_tree (s);
3899 pos = s->frag_parent;
3900 GNUNET_CONTAINER_MDLL_remove (frag,
3905 /* check if subtree is done */
3906 while ( (NULL == pos->head_frag) &&
3907 (pos->frag_off == pos->bytes_msg) &&
3911 pos = s->frag_parent;
3912 GNUNET_CONTAINER_MDLL_remove (frag,
3919 /* Was this the last applicable fragmment? */
3920 if ( (NULL == pm->head_frag) &&
3921 (pm->frag_off == pm->bytes_msg) )
3922 client_send_response (pm,
3924 pm->bytes_msg /* FIXME: calculate and add overheads! */);
3926 else if (PMT_CORE != pm->pmt)
3928 /* This was an acknowledgement of some type, always free */
3929 free_pending_message (pm);
3933 /* message not finished, waiting for acknowledgement */
3934 struct Neighbour *neighbour = pm->target;
3935 /* Update time by which we might retransmit 's' based on queue
3936 characteristics (i.e. RTT); it takes one RTT for the message to
3937 arrive and the ACK to come back in the best case; but the other
3938 side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
3939 retransmitting. Note that in the future this heuristic should
3940 likely be improved further (measure RTT stability, consider
3941 message urgency and size when delaying ACKs, etc.) */
3942 s->next_attempt = GNUNET_TIME_relative_to_absolute
3943 (GNUNET_TIME_relative_multiply (queue->rtt,
3947 struct PendingMessage *pos;
3949 /* re-insert sort in neighbour list */
3950 GNUNET_CONTAINER_MDLL_remove (neighbour,
3951 neighbour->pending_msg_head,
3952 neighbour->pending_msg_tail,
3954 pos = neighbour->pending_msg_tail;
3955 while ( (NULL != pos) &&
3956 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3957 pos = pos->prev_neighbour;
3958 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
3959 neighbour->pending_msg_head,
3960 neighbour->pending_msg_tail,
3966 /* re-insert sort in fragment list */
3967 struct PendingMessage *fp = s->frag_parent;
3968 struct PendingMessage *pos;
3970 GNUNET_CONTAINER_MDLL_remove (frag,
3974 pos = fp->tail_frag;
3975 while ( (NULL != pos) &&
3976 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3977 pos = pos->prev_frag;
3978 GNUNET_CONTAINER_MDLL_insert_after (frag,
3986 /* finally, re-schedule queue transmission task itself */
3987 schedule_transmit_on_queue (queue);
3992 * Bandwidth tracker informs us that the delay until we
3993 * can transmit again changed.
3995 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3998 tracker_update_out_cb (void *cls)
4000 struct GNUNET_ATS_Session *queue = cls;
4001 struct Neighbour *n = queue->neighbour;
4003 if (NULL == n->pending_msg_head)
4005 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4006 "Bandwidth allocation updated for empty transmission queue `%s'\n",
4008 return; /* no message pending, nothing to do here! */
4010 GNUNET_SCHEDULER_cancel (queue->transmit_task);
4011 queue->transmit_task = NULL;
4012 schedule_transmit_on_queue (queue);
4017 * Bandwidth tracker informs us that excessive outbound bandwidth was
4018 * allocated which is not being used.
4020 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4023 tracker_excess_out_cb (void *cls)
4025 /* FIXME: trigger excess bandwidth report to core? Right now,
4026 this is done internally within transport_api2_core already,
4027 but we probably want to change the logic and trigger it
4028 from here via a message instead! */
4029 /* TODO: maybe inform ATS at this point? */
4030 GNUNET_STATISTICS_update (GST_stats,
4031 "# Excess outbound bandwidth reported",
4039 * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
4040 * which is not being used.
4042 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4045 tracker_excess_in_cb (void *cls)
4047 /* TODO: maybe inform ATS at this point? */
4048 GNUNET_STATISTICS_update (GST_stats,
4049 "# Excess inbound bandwidth reported",
4056 * New queue became available. Process the request.
4058 * @param cls the client
4059 * @param aqm the send message that was sent
4062 handle_add_queue_message (void *cls,
4063 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4065 struct TransportClient *tc = cls;
4066 struct GNUNET_ATS_Session *queue;
4067 struct Neighbour *neighbour;
4071 if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
4073 /* MTU so small as to be useless for transmissions,
4074 required for #fragment_message()! */
4075 GNUNET_break_op (0);
4076 GNUNET_SERVICE_client_drop (tc->client);
4079 neighbour = lookup_neighbour (&aqm->receiver);
4080 if (NULL == neighbour)
4082 neighbour = GNUNET_new (struct Neighbour);
4083 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4084 neighbour->pid = aqm->receiver;
4085 GNUNET_assert (GNUNET_OK ==
4086 GNUNET_CONTAINER_multipeermap_put (neighbours,
4089 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4090 cores_send_connect_info (&neighbour->pid,
4091 GNUNET_BANDWIDTH_ZERO);
4093 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4094 addr = (const char *) &aqm[1];
4096 queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
4098 queue->address = (const char *) &queue[1];
4099 queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
4100 queue->qid = aqm->qid;
4101 queue->mtu = ntohl (aqm->mtu);
4102 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
4103 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
4104 queue->neighbour = neighbour;
4105 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
4106 &tracker_update_in_cb,
4108 GNUNET_BANDWIDTH_ZERO,
4109 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4110 &tracker_excess_in_cb,
4112 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
4113 &tracker_update_out_cb,
4115 GNUNET_BANDWIDTH_ZERO,
4116 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4117 &tracker_excess_out_cb,
4122 /* notify ATS about new queue */
4124 struct GNUNET_ATS_Properties prop = {
4125 .delay = GNUNET_TIME_UNIT_FOREVER_REL,
4128 .cc = tc->details.communicator.cc
4131 queue->sr = GNUNET_ATS_session_add (ats,
4136 if (NULL == queue->sr)
4138 /* This can only happen if the 'address' was way too long for ATS
4139 (approaching 64k in strlen()!). In this case, the communicator
4140 must be buggy and we drop it. */
4142 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
4143 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
4144 GNUNET_free (queue);
4145 if (NULL == neighbour->session_head)
4147 cores_send_disconnect_info (&neighbour->pid);
4148 free_neighbour (neighbour);
4150 GNUNET_SERVICE_client_drop (tc->client);
4154 /* notify monitors about new queue */
4156 struct MonitorEvent me = {
4161 notify_monitors (&neighbour->pid,
4166 GNUNET_CONTAINER_MDLL_insert (neighbour,
4167 neighbour->session_head,
4168 neighbour->session_tail,
4170 GNUNET_CONTAINER_MDLL_insert (client,
4171 tc->details.communicator.session_head,
4172 tc->details.communicator.session_tail,
4174 GNUNET_SERVICE_client_continue (tc->client);
4179 * Queue to a peer went down. Process the request.
4181 * @param cls the client
4182 * @param dqm the send message that was sent
4185 handle_del_queue_message (void *cls,
4186 const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
4188 struct TransportClient *tc = cls;
4190 if (CT_COMMUNICATOR != tc->type)
4193 GNUNET_SERVICE_client_drop (tc->client);
4196 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4198 session = session->next_client)
4200 struct Neighbour *neighbour = session->neighbour;
4202 if ( (dqm->qid != session->qid) ||
4203 (0 != memcmp (&dqm->receiver,
4205 sizeof (struct GNUNET_PeerIdentity))) )
4207 free_session (session);
4208 GNUNET_SERVICE_client_continue (tc->client);
4212 GNUNET_SERVICE_client_drop (tc->client);
4217 * Message was transmitted. Process the request.
4219 * @param cls the client
4220 * @param sma the send message that was sent
4223 handle_send_message_ack (void *cls,
4224 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
4226 struct TransportClient *tc = cls;
4227 struct QueueEntry *queue;
4229 if (CT_COMMUNICATOR != tc->type)
4232 GNUNET_SERVICE_client_drop (tc->client);
4236 /* find our queue entry matching the ACK */
4238 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4240 session = session->next_client)
4242 if (0 != memcmp (&session->neighbour->pid,
4244 sizeof (struct GNUNET_PeerIdentity)))
4246 for (struct QueueEntry *qe = session->queue_head;
4250 if (qe->mid != sma->mid)
4259 /* this should never happen */
4261 GNUNET_SERVICE_client_drop (tc->client);
4264 GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
4265 queue->session->queue_tail,
4267 queue->session->queue_length--;
4268 tc->details.communicator.total_queue_length--;
4269 GNUNET_SERVICE_client_continue (tc->client);
4271 /* if applicable, resume transmissions that waited on ACK */
4272 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
4274 /* Communicator dropped below threshold, resume all queues */
4275 GNUNET_STATISTICS_update (GST_stats,
4276 "# Transmission throttled due to communicator queue limit",
4279 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4281 session = session->next_client)
4282 schedule_transmit_on_queue (session);
4284 else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
4286 /* queue dropped below threshold; only resume this one queue */
4287 GNUNET_STATISTICS_update (GST_stats,
4288 "# Transmission throttled due to session queue limit",
4291 schedule_transmit_on_queue (queue->session);
4294 /* TODO: we also should react on the status! */
4295 // FIXME: this probably requires queue->pm = s assignment!
4296 // FIXME: react to communicator status about transmission request. We got:
4297 sma->status; // OK success, SYSERR failure
4299 GNUNET_free (queue);
4304 * Iterator telling new MONITOR client about all existing
4307 * @param cls the new `struct TransportClient`
4308 * @param pid a connected peer
4309 * @param value the `struct Neighbour` with more information
4310 * @return #GNUNET_OK (continue to iterate)
4313 notify_client_queues (void *cls,
4314 const struct GNUNET_PeerIdentity *pid,
4317 struct TransportClient *tc = cls;
4318 struct Neighbour *neighbour = value;
4320 GNUNET_assert (CT_MONITOR == tc->type);
4321 for (struct GNUNET_ATS_Session *q = neighbour->session_head;
4323 q = q->next_neighbour)
4325 struct MonitorEvent me = {
4328 .num_msg_pending = q->num_msg_pending,
4329 .num_bytes_pending = q->num_bytes_pending
4343 * Initialize a monitor client.
4345 * @param cls the client
4346 * @param start the start message that was sent
4349 handle_monitor_start (void *cls,
4350 const struct GNUNET_TRANSPORT_MonitorStart *start)
4352 struct TransportClient *tc = cls;
4354 if (CT_NONE != tc->type)
4357 GNUNET_SERVICE_client_drop (tc->client);
4360 tc->type = CT_MONITOR;
4361 tc->details.monitor.peer = start->peer;
4362 tc->details.monitor.one_shot = ntohl (start->one_shot);
4363 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4364 ¬ify_client_queues,
4366 GNUNET_SERVICE_client_mark_monitor (tc->client);
4367 GNUNET_SERVICE_client_continue (tc->client);
4372 * Signature of a function called by ATS with the current bandwidth
4373 * allocation to be used as determined by ATS.
4375 * @param cls closure, NULL
4376 * @param session session this is about
4377 * @param bandwidth_out assigned outbound bandwidth for the connection,
4378 * 0 to signal disconnect
4379 * @param bandwidth_in assigned inbound bandwidth for the connection,
4380 * 0 to signal disconnect
4383 ats_allocation_cb (void *cls,
4384 struct GNUNET_ATS_Session *session,
4385 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
4386 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
4389 GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
4391 GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
4397 * Find transport client providing communication service
4398 * for the protocol @a prefix.
4400 * @param prefix communicator name
4401 * @return NULL if no such transport client is available
4403 static struct TransportClient *
4404 lookup_communicator (const char *prefix)
4406 for (struct TransportClient *tc = clients_head;
4410 if (CT_COMMUNICATOR != tc->type)
4412 if (0 == strcmp (prefix,
4413 tc->details.communicator.address_prefix))
4416 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4417 "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
4424 * Signature of a function called by ATS suggesting transport to
4425 * try connecting with a particular address.
4427 * @param cls closure, NULL
4428 * @param pid target peer
4429 * @param address the address to try
4432 ats_suggestion_cb (void *cls,
4433 const struct GNUNET_PeerIdentity *pid,
4434 const char *address)
4436 static uint32_t idgen;
4437 struct TransportClient *tc;
4439 struct GNUNET_TRANSPORT_CreateQueue *cqm;
4440 struct GNUNET_MQ_Envelope *env;
4444 prefix = GNUNET_HELLO_address_to_prefix (address);
4447 GNUNET_break (0); /* ATS gave invalid address!? */
4450 tc = lookup_communicator (prefix);
4453 GNUNET_STATISTICS_update (GST_stats,
4454 "# ATS suggestions ignored due to missing communicator",
4459 /* forward suggestion for queue creation to communicator */
4460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4461 "Request #%u for `%s' communicator to create queue to `%s'\n",
4462 (unsigned int) idgen,
4465 alen = strlen (address) + 1;
4466 env = GNUNET_MQ_msg_extra (cqm,
4468 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4469 cqm->request_id = htonl (idgen++);
4470 cqm->receiver = *pid;
4474 GNUNET_MQ_send (tc->mq,
4480 * Communicator tells us that our request to create a queue "worked", that
4481 * is setting up the queue is now in process.
4483 * @param cls the `struct TransportClient`
4484 * @param cqr confirmation message
4487 handle_queue_create_ok (void *cls,
4488 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4490 struct TransportClient *tc = cls;
4492 if (CT_COMMUNICATOR != tc->type)
4495 GNUNET_SERVICE_client_drop (tc->client);
4498 GNUNET_STATISTICS_update (GST_stats,
4499 "# ATS suggestions succeeded at communicator",
4502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4503 "Request #%u for communicator to create queue succeeded\n",
4504 (unsigned int) ntohs (cqr->request_id));
4505 GNUNET_SERVICE_client_continue (tc->client);
4510 * Communicator tells us that our request to create a queue failed. This usually
4511 * indicates that the provided address is simply invalid or that the communicator's
4512 * resources are exhausted.
4514 * @param cls the `struct TransportClient`
4515 * @param cqr failure message
4518 handle_queue_create_fail (void *cls,
4519 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4521 struct TransportClient *tc = cls;
4523 if (CT_COMMUNICATOR != tc->type)
4526 GNUNET_SERVICE_client_drop (tc->client);
4529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4530 "Request #%u for communicator to create queue failed\n",
4531 (unsigned int) ntohs (cqr->request_id));
4532 GNUNET_STATISTICS_update (GST_stats,
4533 "# ATS suggestions failed in queue creation at communicator",
4536 GNUNET_SERVICE_client_continue (tc->client);
4541 * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
4542 * messages. We do nothing here, real verification is done later.
4544 * @param cls a `struct TransportClient *`
4545 * @param msg message to verify
4546 * @return #GNUNET_OK
4549 check_address_consider_verify (void *cls,
4550 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4559 * Given another peers address, consider checking it for validity
4560 * and then adding it to the Peerstore.
4562 * @param cls a `struct TransportClient`
4563 * @param hdr message containing the raw address data and
4564 * signature in the body, see #GNUNET_HELLO_extract_address()
4567 handle_address_consider_verify (void *cls,
4568 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4571 enum GNUNET_NetworkType nt;
4572 struct GNUNET_TIME_Absolute expiration;
4575 // FIXME: pre-check: do we know this address already?
4576 // FIXME: pre-check: rate-limit signature verification / validation!
4577 address = GNUNET_HELLO_extract_address (&hdr[1],
4578 ntohs (hdr->header.size) - sizeof (*hdr),
4582 if (NULL == address)
4584 GNUNET_break_op (0);
4587 if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
4588 return; /* expired */
4589 // FIXME: do begin actual verification here!
4590 GNUNET_free (address);
4595 * Free neighbour entry.
4599 * @param value a `struct Neighbour`
4600 * @return #GNUNET_OK (always)
4603 free_neighbour_cb (void *cls,
4604 const struct GNUNET_PeerIdentity *pid,
4607 struct Neighbour *neighbour = value;
4611 GNUNET_break (0); // should this ever happen?
4612 free_neighbour (neighbour);
4619 * Free DV route entry.
4623 * @param value a `struct DistanceVector`
4624 * @return #GNUNET_OK (always)
4627 free_dv_routes_cb (void *cls,
4628 const struct GNUNET_PeerIdentity *pid,
4631 struct DistanceVector *dv = value;
4642 * Free ephemeral entry.
4646 * @param value a `struct Neighbour`
4647 * @return #GNUNET_OK (always)
4650 free_ephemeral_cb (void *cls,
4651 const struct GNUNET_PeerIdentity *pid,
4654 struct EphemeralCacheEntry *ece = value;
4658 free_ephemeral (ece);
4664 * Function called when the service shuts down. Unloads our plugins
4665 * and cancels pending validations.
4667 * @param cls closure, unused
4670 do_shutdown (void *cls)
4674 if (NULL != ephemeral_task)
4676 GNUNET_SCHEDULER_cancel (ephemeral_task);
4677 ephemeral_task = NULL;
4679 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4684 GNUNET_ATS_transport_done (ats);
4687 if (NULL != peerstore)
4689 GNUNET_PEERSTORE_disconnect (peerstore,
4693 if (NULL != GST_stats)
4695 GNUNET_STATISTICS_destroy (GST_stats,
4699 if (NULL != GST_my_private_key)
4701 GNUNET_free (GST_my_private_key);
4702 GST_my_private_key = NULL;
4704 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
4706 GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
4709 GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
4711 GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
4714 GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
4715 ephemeral_map = NULL;
4716 GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
4717 ephemeral_heap = NULL;
4722 * Initiate transport service.
4724 * @param cls closure
4725 * @param c configuration to use
4726 * @param service the initialized service
4730 const struct GNUNET_CONFIGURATION_Handle *c,
4731 struct GNUNET_SERVICE_Handle *service)
4736 neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4738 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4740 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4742 ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4743 GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
4744 if (NULL == GST_my_private_key)
4746 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4747 _("Transport service is lacking key configuration settings. Exiting.\n"));
4748 GNUNET_SCHEDULER_shutdown ();
4751 GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
4752 &GST_my_identity.public_key);
4753 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4754 "My identity is `%s'\n",
4755 GNUNET_i2s_full (&GST_my_identity));
4756 GST_stats = GNUNET_STATISTICS_create ("transport",
4758 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
4760 peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
4761 if (NULL == peerstore)
4764 GNUNET_SCHEDULER_shutdown ();
4767 ats = GNUNET_ATS_transport_init (GST_cfg,
4775 GNUNET_SCHEDULER_shutdown ();
4782 * Define "main" method using service macro.
4786 GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
4789 &client_disconnect_cb,
4791 /* communication with core */
4792 GNUNET_MQ_hd_fixed_size (client_start,
4793 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4794 struct StartMessage,
4796 GNUNET_MQ_hd_var_size (client_send,
4797 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4798 struct OutboundMessage,
4800 /* communication with communicators */
4801 GNUNET_MQ_hd_var_size (communicator_available,
4802 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4803 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4805 GNUNET_MQ_hd_var_size (communicator_backchannel,
4806 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4807 struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4809 GNUNET_MQ_hd_var_size (add_address,
4810 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4811 struct GNUNET_TRANSPORT_AddAddressMessage,
4813 GNUNET_MQ_hd_fixed_size (del_address,
4814 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4815 struct GNUNET_TRANSPORT_DelAddressMessage,
4817 GNUNET_MQ_hd_var_size (incoming_msg,
4818 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4819 struct GNUNET_TRANSPORT_IncomingMessage,
4821 GNUNET_MQ_hd_fixed_size (queue_create_ok,
4822 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4823 struct GNUNET_TRANSPORT_CreateQueueResponse,
4825 GNUNET_MQ_hd_fixed_size (queue_create_fail,
4826 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4827 struct GNUNET_TRANSPORT_CreateQueueResponse,
4829 GNUNET_MQ_hd_var_size (add_queue_message,
4830 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4831 struct GNUNET_TRANSPORT_AddQueueMessage,
4833 GNUNET_MQ_hd_var_size (address_consider_verify,
4834 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
4835 struct GNUNET_TRANSPORT_AddressToVerify,
4837 GNUNET_MQ_hd_fixed_size (del_queue_message,
4838 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4839 struct GNUNET_TRANSPORT_DelQueueMessage,
4841 GNUNET_MQ_hd_fixed_size (send_message_ack,
4842 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
4843 struct GNUNET_TRANSPORT_SendMessageToAck,
4845 /* communication with monitors */
4846 GNUNET_MQ_hd_fixed_size (monitor_start,
4847 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
4848 struct GNUNET_TRANSPORT_MonitorStart,
4850 GNUNET_MQ_handler_end ());
4853 /* end of file gnunet-service-transport.c */