2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file transport/gnunet-service-tng.c
22 * @brief main for gnunet-service-tng
23 * @author Christian Grothoff
26 * - figure out how to transmit (selective) ACKs in case of uni-directional
27 * communicators (with/without core? DV-only?) When do we use ACKs?
28 * => communicators use selective ACKs for flow control
29 * => transport uses message-level ACKs for RTT, fragment confirmation
30 * => integrate DV into transport, use neither core nor communicators
31 * but rather give communicators transport-encapsulated messages
32 * (which could be core-data, background-channel traffic, or
33 * transport-to-transport traffic)
36 * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
39 * - use ATS bandwidth allocation callback and schedule transmissions!
42 * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
45 * - change transport-core API to provide proper flow control in both
46 * directions, allow multiple messages per peer simultaneously (tag
47 * confirmations with unique message ID), and replace quota-out with
48 * proper flow control;
50 * Design realizations / discussion:
51 * - communicators do flow control by calling MQ "notify sent"
52 * when 'ready'. They determine flow implicitly (i.e. TCP blocking)
53 * or explicitly via background channel FC ACKs. As long as the
54 * channel is not full, they may 'notify sent' even if the other
55 * peer has not yet confirmed receipt. The other peer confirming
56 * is _only_ for FC, not for more reliable transmission; reliable
57 * transmission (i.e. of fragments) is left to _transport_.
58 * - ACKs sent back in uni-directional communicators are done via
59 * the background channel API; here transport _may_ initially
60 * broadcast (with bounded # hops) if no path is known;
61 * - transport should _integrate_ DV-routing and build a view of
62 * the network; then background channel traffic can be
63 * routed via DV as well as explicit "DV" traffic.
64 * - background channel is also used for ACKs and NAT traversal support
65 * - transport service is responsible for AEAD'ing the background
66 * channel, timestamps and monotonic time are used against replay
67 * of old messages -> peerstore needs to be supplied with
68 * "latest timestamps seen" data
69 * - if transport implements DV, we likely need a 3rd peermap
70 * in addition to ephemerals and (direct) neighbours
71 * => in this data structure, we should track ATS metrics (distance, RTT, etc.)
72 * as well as latest timestamps seen, goodput, fragments for transmission, etc.
73 * ==> check if stuff needs to be moved out of "Neighbour"
74 * - transport should encapsualte core-level messages and do its
75 * own ACKing for RTT/goodput/loss measurements _and_ fragment
79 #include "gnunet_util_lib.h"
80 #include "gnunet_statistics_service.h"
81 #include "gnunet_transport_monitor_service.h"
82 #include "gnunet_peerstore_service.h"
83 #include "gnunet_hello_lib.h"
84 #include "gnunet_ats_transport_service.h"
85 #include "transport.h"
89 * What is the size we assume for a read operation in the
90 * absence of an MTU for the purpose of flow control?
92 #define IN_PACKET_SIZE_WITHOUT_MTU 128
95 * If a queue delays the next message by more than this number
96 * of seconds we log a warning. Note: this is for testing,
97 * the value chosen here might be too aggressively low!
99 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
102 * How many messages can we have pending for a given communicator
103 * process before we start to throttle that communicator?
105 * Used if a communicator might be CPU-bound and cannot handle the traffic.
107 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
110 * How many messages can we have pending for a given session (queue to
111 * a particular peer via a communicator) process before we start to
112 * throttle that queue?
114 * Used if ATS assigns more bandwidth to a particular transmission
115 * method than that transmission method can right now handle. (Yes,
116 * ATS should eventually notice utilization below allocation and
117 * adjust, but we don't want to queue up tons of messages in the
118 * meantime). Must be significantly below
119 * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
121 #define SESSION_QUEUE_LIMIT 32
124 GNUNET_NETWORK_STRUCT_BEGIN
127 * Outer layer of an encapsulated backchannel message.
129 struct TransportBackchannelEncapsulationMessage
132 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
134 struct GNUNET_MessageHeader header;
137 * Distance the backchannel message has traveled, to be updated at
138 * each hop. Used to bound the number of hops in case a backchannel
139 * message is broadcast and thus travels without routing
140 * information (during initial backchannel discovery).
145 * Target's peer identity (as backchannels may be transmitted
146 * indirectly, or even be broadcast).
148 struct GNUNET_PeerIdentity target;
151 * Ephemeral key setup by the sender for @e target, used
152 * to encrypt the payload.
154 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
157 * HMAC over the ciphertext of the encrypted, variable-size
158 * body that follows. Verified via DH of @e target and
161 struct GNUNET_HashCode hmac;
163 /* Followed by encrypted, variable-size payload */
168 * Message by which a peer confirms that it is using an
171 struct EphemeralConfirmation
175 * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
177 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
180 * How long is this signature over the ephemeral key
183 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
186 * Ephemeral key setup by the sender for @e target, used
187 * to encrypt the payload.
189 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
195 * Plaintext of the variable-size payload that is encrypted
196 * within a `struct TransportBackchannelEncapsulationMessage`
198 struct TransportBackchannelRequestPayload
202 * Sender's peer identity.
204 struct GNUNET_PeerIdentity sender;
207 * Signature of the sender over an
208 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
210 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
213 * How long is this signature over the ephemeral key
216 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
219 * Current monotonic time of the sending transport service. Used to
220 * detect replayed messages. Note that the receiver should remember
221 * a list of the recently seen timestamps and only reject messages
222 * if the timestamp is in the list, or the list is "full" and the
223 * timestamp is smaller than the lowest in the list. This list of
224 * timestamps per peer should be persisted to guard against replays
227 struct GNUNET_TIME_AbsoluteNBO monotonic_time;
229 /* Followed by a `struct GNUNET_MessageHeader` with a message
230 for a communicator */
232 /* Followed by a 0-termianted string specifying the name of
233 the communicator which is to receive the message */
239 * Outer layer of an encapsulated unfragmented application message sent
240 * over an unreliable channel.
242 struct TransportReliabilityBox
245 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
247 struct GNUNET_MessageHeader header;
250 * Number of messages still to be sent before a commulative
251 * ACK is requested. Zero if an ACK is requested immediately.
252 * In NBO. Note that the receiver may send the ACK faster
253 * if it believes that is reasonable.
255 uint32_t ack_countdown GNUNET_PACKED;
258 * Unique ID of the message used for signalling receipt of
259 * messages sent over possibly unreliable channels. Should
262 struct GNUNET_ShortHashCode msg_uuid;
267 * Confirmation that the receiver got a
268 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
269 * confirmation may be transmitted over a completely different queue,
270 * so ACKs are identified by a combination of PID of sender and
271 * message UUID, without the queue playing any role!
273 struct TransportReliabilityAckMessage
276 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
278 struct GNUNET_MessageHeader header;
283 uint32_t reserved GNUNET_PACKED;
286 * How long was the ACK delayed relative to the average time of
287 * receipt of the messages being acknowledged? Used to calculate
288 * the average RTT by taking the receipt time of the ack minus the
289 * average transmission time of the sender minus this value.
291 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
293 /* followed by any number of `struct GNUNET_ShortHashCode`
294 messages providing ACKs */
299 * Outer layer of an encapsulated fragmented application message.
301 struct TransportFragmentBox
304 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
306 struct GNUNET_MessageHeader header;
309 * Unique ID of this fragment (and fragment transmission!). Will
310 * change even if a fragement is retransmitted to make each
311 * transmission attempt unique! Should be incremented by one for
312 * each fragment transmission. If a client receives a duplicate
313 * fragment (same @e frag_off), it must send
314 * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
316 uint32_t frag_uuid GNUNET_PACKED;
319 * Original message ID for of the message that all the1
320 * fragments belong to. Must be the same for all fragments.
322 struct GNUNET_ShortHashCode msg_uuid;
325 * Offset of this fragment in the overall message.
327 uint16_t frag_off GNUNET_PACKED;
330 * Total size of the message that is being fragmented.
332 uint16_t msg_size GNUNET_PACKED;
338 * Outer layer of an fragmented application message sent over a queue
339 * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
340 * received, the receiver has two RTTs or 64 further fragments with
341 * the same basic message time to send an acknowledgement, possibly
342 * acknowledging up to 65 fragments in one ACK. ACKs must also be
343 * sent immediately once all fragments were sent.
345 struct TransportFragmentAckMessage
348 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
350 struct GNUNET_MessageHeader header;
353 * Unique ID of the lowest fragment UUID being acknowledged.
355 uint32_t frag_uuid GNUNET_PACKED;
358 * Bitfield of up to 64 additional fragments following the
359 * @e msg_uuid being acknowledged by this message.
361 uint64_t extra_acks GNUNET_PACKED;
364 * Original message ID for of the message that all the
365 * fragments belong to.
367 struct GNUNET_ShortHashCode msg_uuid;
370 * How long was the ACK delayed relative to the average time of
371 * receipt of the fragments being acknowledged? Used to calculate
372 * the average RTT by taking the receipt time of the ack minus the
373 * average transmission time of the sender minus this value.
375 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
380 * Internal message used by transport for distance vector learning.
381 * If @e num_hops does not exceed the threshold, peers should append
382 * themselves to the peer list and flood the message (possibly only
383 * to a subset of their neighbours to limit discoverability of the
384 * network topology). To the extend that the @e bidirectional bits
385 * are set, peers may learn the inverse paths even if they did not
388 * Unless received on a bidirectional queue and @e num_hops just
389 * zero, peers that can forward to the initator should always try to
390 * forward to the initiator.
392 struct TransportDVLearn
395 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
397 struct GNUNET_MessageHeader header;
400 * Number of hops this messages has travelled, in NBO. Zero if
403 uint16_t num_hops GNUNET_PACKED;
406 * Bitmask of the last 16 hops indicating whether they are confirmed
407 * available (without DV) in both directions or not, in NBO. Used
408 * to possibly instantly learn a path in both directions. Each peer
409 * should shift this value by one to the left, and then set the
410 * lowest bit IF the current sender can be reached from it (without
413 uint16_t bidirectional GNUNET_PACKED;
416 * Peers receiving this message and delaying forwarding to other
417 * peers for any reason should increment this value such as to
418 * enable the origin to determine the actual network-only delay
419 * in addition to the real-time delay (assuming the message loops
420 * back to the origin).
422 struct GNUNET_TIME_Relative cummulative_non_network_delay;
425 * Identity of the peer that started this learning activity.
427 struct GNUNET_PeerIdentity initiator;
429 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
430 excluding the initiator of the DV trace; the last entry is the
431 current sender; the current peer must not be included except if
438 * Outer layer of an encapsulated message send over multiple hops.
439 * The path given only includes the identities of the subsequent
440 * peers, i.e. it will be empty if we are the receiver. Each
441 * forwarding peer should scan the list from the end, and if it can,
442 * forward to the respective peer. The list should then be shortened
443 * by all the entries up to and including that peer. Each hop should
444 * also increment @e total_hops to allow the receiver to get a precise
445 * estimate on the number of hops the message travelled. Senders must
446 * provide a learned path that thus should work, but intermediaries
447 * know of a shortcut, they are allowed to send the message via that
450 * If a peer finds itself still on the list, it must drop the message.
452 struct TransportDVBox
455 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
457 struct GNUNET_MessageHeader header;
460 * Number of total hops this messages travelled. In NBO.
461 * @e origin sets this to zero, to be incremented at
464 uint16_t total_hops GNUNET_PACKED;
467 * Number of hops this messages includes. In NBO.
469 uint16_t num_hops GNUNET_PACKED;
472 * Identity of the peer that originated the message.
474 struct GNUNET_PeerIdentity origin;
476 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
477 excluding the @e origin and the current peer, the last must be
478 the ultimate target; if @e num_hops is zero, the receiver of this
479 message is the ultimate target. */
481 /* Followed by the actual message, which itself may be
482 another box, but not a DV_LEARN or DV_BOX message! */
486 GNUNET_NETWORK_STRUCT_END
491 * What type of client is the `struct TransportClient` about?
496 * We do not know yet (client is fresh).
501 * Is the CORE service, we need to forward traffic to it.
506 * It is a monitor, forward monitor data.
511 * It is a communicator, use for communication.
518 * Entry in our cache of ephemeral keys we currently use.
520 struct EphemeralCacheEntry
524 * Target's peer identity (we don't re-use ephemerals
525 * to limit linkability of messages).
527 struct GNUNET_PeerIdentity target;
530 * Signature affirming @e ephemeral_key of type
531 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
533 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
536 * How long is @e sender_sig valid
538 struct GNUNET_TIME_Absolute ephemeral_validity;
543 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
546 * Node in the ephemeral cache for this entry.
547 * Used for expiration.
549 struct GNUNET_CONTAINER_HeapNode *hn;
554 * Client connected to the transport service.
556 struct TransportClient;
560 * A neighbour that at least one communicator is connected to.
566 * Entry identifying transmission in one of our `struct
567 * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
568 * ensure we do not overwhelm a communicator and limit the number of
569 * messages outstanding per communicator (say in case communicator is
570 * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
571 * what the communicator can actually provide towards a particular
580 struct QueueEntry *next;
585 struct QueueEntry *prev;
588 * ATS session this entry is queued with.
590 struct GNUNET_ATS_Session *session;
593 * Message ID used for this message with the queue used for transmission.
600 * An ATS session is a message queue provided by a communicator
601 * via which we can reach a particular neighbour.
603 struct GNUNET_ATS_Session
608 struct GNUNET_ATS_Session *next_neighbour;
613 struct GNUNET_ATS_Session *prev_neighbour;
618 struct GNUNET_ATS_Session *prev_client;
623 struct GNUNET_ATS_Session *next_client;
626 * Head of DLL of unacked transmission requests.
628 struct QueueEntry *queue_head;
631 * End of DLL of unacked transmission requests.
633 struct QueueEntry *queue_tail;
636 * Which neighbour is this ATS session for?
638 struct Neighbour *neighbour;
641 * Which communicator offers this ATS session?
643 struct TransportClient *tc;
646 * Address served by the ATS session.
651 * Handle by which we inform ATS about this queue.
653 struct GNUNET_ATS_SessionRecord *sr;
656 * Task scheduled for the time when this queue can (likely) transmit the
657 * next message. Still needs to check with the @e tracker_out to be sure.
659 struct GNUNET_SCHEDULER_Task *transmit_task;
662 * Our current RTT estimate for this ATS session.
664 struct GNUNET_TIME_Relative rtt;
667 * Message ID generator for transmissions on this queue.
672 * Unique identifier of this ATS session with the communicator.
677 * Maximum transmission unit supported by this ATS session.
682 * Distance to the target of this ATS session.
689 uint32_t num_msg_pending;
694 uint32_t num_bytes_pending;
697 * Length of the DLL starting at @e queue_head.
699 unsigned int queue_length;
702 * Network type offered by this ATS session.
704 enum GNUNET_NetworkType nt;
707 * Connection status for this ATS session.
709 enum GNUNET_TRANSPORT_ConnectionStatus cs;
712 * How much outbound bandwidth do we have available for this session?
714 struct GNUNET_BANDWIDTH_Tracker tracker_out;
717 * How much inbound bandwidth do we have available for this session?
719 struct GNUNET_BANDWIDTH_Tracker tracker_in;
724 * A neighbour that at least one communicator is connected to.
730 * Which peer is this about?
732 struct GNUNET_PeerIdentity pid;
735 * Head of list of messages pending for this neighbour.
737 struct PendingMessage *pending_msg_head;
740 * Tail of list of messages pending for this neighbour.
742 struct PendingMessage *pending_msg_tail;
745 * Head of DLL of ATS sessions to this peer.
747 struct GNUNET_ATS_Session *session_head;
750 * Tail of DLL of ATS sessions to this peer.
752 struct GNUNET_ATS_Session *session_tail;
755 * Task run to cleanup pending messages that have exceeded their timeout.
757 struct GNUNET_SCHEDULER_Task *timeout_task;
760 * Quota at which CORE is allowed to transmit to this peer
763 * FIXME: not yet used, tricky to get right given multiple queues!
764 * (=> Idea: let ATS set a quota per queue and we add them up here?)
765 * FIXME: how do we set this value initially when we tell CORE?
766 * Options: start at a minimum value or at literally zero (before ATS?)
767 * (=> Current thought: clean would be zero!)
769 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
772 * What is the earliest timeout of any message in @e pending_msg_tail?
774 struct GNUNET_TIME_Absolute earliest_timeout;
780 * Types of different pending messages.
782 enum PendingMessageType
786 * Ordinary message received from the CORE service.
793 PMT_FRAGMENT_BOX = 1,
798 PMT_RELIABILITY_BOX = 2,
801 * Any type of acknowledgement.
803 PMT_ACKNOWLEDGEMENT = 3
810 * Transmission request that is awaiting delivery. The original
811 * transmission requests from CORE may be too big for some queues.
812 * In this case, a *tree* of fragments is created. At each
813 * level of the tree, fragments are kept in a DLL ordered by which
814 * fragment should be sent next (at the head). The tree is searched
815 * top-down, with the original message at the root.
817 * To select a node for transmission, first it is checked if the
818 * current node's message fits with the MTU. If it does not, we
819 * either calculate the next fragment (based on @e frag_off) from the
820 * current node, or, if all fragments have already been created,
821 * descend to the @e head_frag. Even though the node was already
822 * fragmented, the fragment may be too big if the fragment was
823 * generated for a queue with a larger MTU. In this case, the node
824 * may be fragmented again, thus creating a tree.
826 * When acknowledgements for fragments are received, the tree
827 * must be pruned, removing those parts that were already
828 * acknowledged. When fragments are sent over a reliable
829 * channel, they can be immediately removed.
831 * If a message is ever fragmented, then the original "full" message
832 * is never again transmitted (even if it fits below the MTU), and
833 * only (remaining) fragments are sent.
835 struct PendingMessage
838 * Kept in a MDLL of messages for this @a target.
840 struct PendingMessage *next_neighbour;
843 * Kept in a MDLL of messages for this @a target.
845 struct PendingMessage *prev_neighbour;
848 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
850 struct PendingMessage *next_client;
853 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
855 struct PendingMessage *prev_client;
858 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
860 struct PendingMessage *next_frag;
863 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
865 struct PendingMessage *prev_frag;
868 * Target of the request.
870 struct Neighbour *target;
873 * Client that issued the transmission request, if @e pmt is #PMT_CORE.
875 struct TransportClient *client;
878 * Head of a MDLL of fragments created for this core message.
880 struct PendingMessage *head_frag;
883 * Tail of a MDLL of fragments created for this core message.
885 struct PendingMessage *tail_frag;
888 * Our parent in the fragmentation tree.
890 struct PendingMessage *frag_parent;
893 * At what time should we give up on the transmission (and no longer retry)?
895 struct GNUNET_TIME_Absolute timeout;
898 * What is the earliest time for us to retry transmission of this message?
900 struct GNUNET_TIME_Absolute next_attempt;
903 * UUID to use for this message (used for reassembly of fragments, only
904 * initialized if @e msg_uuid_set is #GNUNET_YES).
906 struct GNUNET_ShortHashCode msg_uuid;
909 * Counter incremented per generated fragment.
911 uint32_t frag_uuidgen;
914 * Type of the pending message.
916 enum PendingMessageType pmt;
919 * Size of the original message.
924 * Offset at which we should generate the next fragment.
929 * #GNUNET_YES once @e msg_uuid was initialized
931 int16_t msg_uuid_set;
933 /* Followed by @e bytes_msg to transmit */
938 * One of the addresses of this peer.
940 struct AddressListEntry
946 struct AddressListEntry *next;
951 struct AddressListEntry *prev;
954 * Which communicator provides this address?
956 struct TransportClient *tc;
959 * The actual address.
964 * Current context for storing this address in the peerstore.
966 struct GNUNET_PEERSTORE_StoreContext *sc;
969 * Task to periodically do @e st operation.
971 struct GNUNET_SCHEDULER_Task *st;
974 * What is a typical lifetime the communicator expects this
975 * address to have? (Always from now.)
977 struct GNUNET_TIME_Relative expiration;
980 * Address identifier used by the communicator.
985 * Network type offered by this address.
987 enum GNUNET_NetworkType nt;
993 * Client connected to the transport service.
995 struct TransportClient
1001 struct TransportClient *next;
1006 struct TransportClient *prev;
1009 * Handle to the client.
1011 struct GNUNET_SERVICE_Client *client;
1014 * Message queue to the client.
1016 struct GNUNET_MQ_Handle *mq;
1019 * What type of client is this?
1021 enum ClientType type;
1027 * Information for @e type #CT_CORE.
1032 * Head of list of messages pending for this client, sorted by
1033 * transmission time ("next_attempt" + possibly internal prioritization).
1035 struct PendingMessage *pending_msg_head;
1038 * Tail of list of messages pending for this client.
1040 struct PendingMessage *pending_msg_tail;
1045 * Information for @e type #CT_MONITOR.
1050 * Peer identity to monitor the addresses of.
1051 * Zero to monitor all neighbours. Valid if
1052 * @e type is #CT_MONITOR.
1054 struct GNUNET_PeerIdentity peer;
1057 * Is this a one-shot monitor?
1065 * Information for @e type #CT_COMMUNICATOR.
1069 * If @e type is #CT_COMMUNICATOR, this communicator
1070 * supports communicating using these addresses.
1072 char *address_prefix;
1075 * Head of DLL of queues offered by this communicator.
1077 struct GNUNET_ATS_Session *session_head;
1080 * Tail of DLL of queues offered by this communicator.
1082 struct GNUNET_ATS_Session *session_tail;
1085 * Head of list of the addresses of this peer offered by this communicator.
1087 struct AddressListEntry *addr_head;
1090 * Tail of list of the addresses of this peer offered by this communicator.
1092 struct AddressListEntry *addr_tail;
1095 * Number of queue entries in all queues to this communicator. Used
1096 * throttle sending to a communicator if we see that the communicator
1097 * is globally unable to keep up.
1099 unsigned int total_queue_length;
1102 * Characteristics of this communicator.
1104 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1114 * Head of linked list of all clients to this service.
1116 static struct TransportClient *clients_head;
1119 * Tail of linked list of all clients to this service.
1121 static struct TransportClient *clients_tail;
1124 * Statistics handle.
1126 static struct GNUNET_STATISTICS_Handle *GST_stats;
1129 * Configuration handle.
1131 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1136 static struct GNUNET_PeerIdentity GST_my_identity;
1141 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1144 * Map from PIDs to `struct Neighbour` entries. A peer is
1145 * a neighbour if we have an MQ to it from some communicator.
1147 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1150 * Database for peer's HELLOs.
1152 static struct GNUNET_PEERSTORE_Handle *peerstore;
1155 * Heap sorting `struct EphemeralCacheEntry` by their
1156 * key/signature validity.
1158 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1161 * Hash map for looking up `struct EphemeralCacheEntry`s
1162 * by peer identity. (We may have ephemerals in our
1163 * cache for which we do not have a neighbour entry,
1164 * and similar many neighbours may not need ephemerals,
1165 * so we use a second map.)
1167 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1170 * Our connection to ATS for allocation and bootstrapping.
1172 static struct GNUNET_ATS_TransportHandle *ats;
1176 * Free cached ephemeral key.
1178 * @param ece cached signature to free
1181 free_ephemeral (struct EphemeralCacheEntry *ece)
1183 GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1186 GNUNET_CONTAINER_heap_remove_node (ece->hn);
1192 * Lookup neighbour record for peer @a pid.
1194 * @param pid neighbour to look for
1195 * @return NULL if we do not have this peer as a neighbour
1197 static struct Neighbour *
1198 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1200 return GNUNET_CONTAINER_multipeermap_get (neighbours,
1206 * Details about what to notify monitors about.
1211 * @deprecated To be discussed if we keep these...
1213 struct GNUNET_TIME_Absolute last_validation;
1214 struct GNUNET_TIME_Absolute valid_until;
1215 struct GNUNET_TIME_Absolute next_validation;
1218 * Current round-trip time estimate.
1220 struct GNUNET_TIME_Relative rtt;
1223 * Connection status.
1225 enum GNUNET_TRANSPORT_ConnectionStatus cs;
1230 uint32_t num_msg_pending;
1235 uint32_t num_bytes_pending;
1242 * Notify monitor @a tc about an event. That @a tc
1243 * cares about the event has already been checked.
1245 * Send @a tc information in @a me about a @a peer's status with
1246 * respect to some @a address to all monitors that care.
1248 * @param tc monitor to inform
1249 * @param peer peer the information is about
1250 * @param address address the information is about
1251 * @param nt network type associated with @a address
1252 * @param me detailed information to transmit
1255 notify_monitor (struct TransportClient *tc,
1256 const struct GNUNET_PeerIdentity *peer,
1257 const char *address,
1258 enum GNUNET_NetworkType nt,
1259 const struct MonitorEvent *me)
1261 struct GNUNET_MQ_Envelope *env;
1262 struct GNUNET_TRANSPORT_MonitorData *md;
1263 size_t addr_len = strlen (address) + 1;
1265 env = GNUNET_MQ_msg_extra (md,
1267 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1268 md->nt = htonl ((uint32_t) nt);
1270 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1271 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1272 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1273 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1274 md->cs = htonl ((uint32_t) me->cs);
1275 md->num_msg_pending = htonl (me->num_msg_pending);
1276 md->num_bytes_pending = htonl (me->num_bytes_pending);
1280 GNUNET_MQ_send (tc->mq,
1286 * Send information in @a me about a @a peer's status with respect
1287 * to some @a address to all monitors that care.
1289 * @param peer peer the information is about
1290 * @param address address the information is about
1291 * @param nt network type associated with @a address
1292 * @param me detailed information to transmit
1295 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1296 const char *address,
1297 enum GNUNET_NetworkType nt,
1298 const struct MonitorEvent *me)
1300 static struct GNUNET_PeerIdentity zero;
1302 for (struct TransportClient *tc = clients_head;
1306 if (CT_MONITOR != tc->type)
1308 if (tc->details.monitor.one_shot)
1310 if ( (0 != memcmp (&tc->details.monitor.peer,
1313 (0 != memcmp (&tc->details.monitor.peer,
1327 * Called whenever a client connects. Allocates our
1328 * data structures associated with that client.
1330 * @param cls closure, NULL
1331 * @param client identification of the client
1332 * @param mq message queue for the client
1333 * @return our `struct TransportClient`
1336 client_connect_cb (void *cls,
1337 struct GNUNET_SERVICE_Client *client,
1338 struct GNUNET_MQ_Handle *mq)
1340 struct TransportClient *tc;
1342 tc = GNUNET_new (struct TransportClient);
1343 tc->client = client;
1345 GNUNET_CONTAINER_DLL_insert (clients_head,
1348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349 "Client %p connected\n",
1356 * Release memory used by @a neighbour.
1358 * @param neighbour neighbour entry to free
1361 free_neighbour (struct Neighbour *neighbour)
1363 GNUNET_assert (NULL == neighbour->session_head);
1364 GNUNET_assert (GNUNET_YES ==
1365 GNUNET_CONTAINER_multipeermap_remove (neighbours,
1368 if (NULL != neighbour->timeout_task)
1369 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1370 GNUNET_free (neighbour);
1375 * Send message to CORE clients that we lost a connection.
1377 * @param tc client to inform (must be CORE client)
1378 * @param pid peer the connection is for
1379 * @param quota_out current quota for the peer
1382 core_send_connect_info (struct TransportClient *tc,
1383 const struct GNUNET_PeerIdentity *pid,
1384 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1386 struct GNUNET_MQ_Envelope *env;
1387 struct ConnectInfoMessage *cim;
1389 GNUNET_assert (CT_CORE == tc->type);
1390 env = GNUNET_MQ_msg (cim,
1391 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1392 cim->quota_out = quota_out;
1394 GNUNET_MQ_send (tc->mq,
1400 * Send message to CORE clients that we gained a connection
1402 * @param pid peer the queue was for
1403 * @param quota_out current quota for the peer
1406 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1407 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1409 for (struct TransportClient *tc = clients_head;
1413 if (CT_CORE != tc->type)
1415 core_send_connect_info (tc,
1423 * Send message to CORE clients that we lost a connection.
1425 * @param pid peer the connection was for
1428 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1430 for (struct TransportClient *tc = clients_head;
1434 struct GNUNET_MQ_Envelope *env;
1435 struct DisconnectInfoMessage *dim;
1437 if (CT_CORE != tc->type)
1439 env = GNUNET_MQ_msg (dim,
1440 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1442 GNUNET_MQ_send (tc->mq,
1449 * We believe we are ready to transmit a message on a queue. Double-checks
1450 * with the queue's "tracker_out" and then gives the message to the
1451 * communicator for transmission (updating the tracker, and re-scheduling
1452 * itself if applicable).
1454 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1457 transmit_on_queue (void *cls);
1461 * Schedule next run of #transmit_on_queue(). Does NOTHING if
1462 * we should run immediately or if the message queue is empty.
1463 * Test for no task being added AND queue not being empty to
1464 * transmit immediately afterwards! This function must only
1465 * be called if the message queue is non-empty!
1467 * @param queue the queue to do scheduling for
1470 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1472 struct Neighbour *n = queue->neighbour;
1473 struct PendingMessage *pm = n->pending_msg_head;
1474 struct GNUNET_TIME_Relative out_delay;
1477 GNUNET_assert (NULL != pm);
1478 if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1480 GNUNET_STATISTICS_update (GST_stats,
1481 "# Transmission throttled due to communicator queue limit",
1486 if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1488 GNUNET_STATISTICS_update (GST_stats,
1489 "# Transmission throttled due to session queue limit",
1495 wsize = (0 == queue->mtu)
1496 ? pm->bytes_msg /* FIXME: add overheads? */
1498 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1500 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1502 if (0 == out_delay.rel_value_us)
1503 return; /* we should run immediately! */
1504 /* queue has changed since we were scheduled, reschedule again */
1505 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1508 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1509 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510 "Next transmission on queue `%s' in %s (high delay)\n",
1512 GNUNET_STRINGS_relative_time_to_string (out_delay,
1515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516 "Next transmission on queue `%s' in %s\n",
1518 GNUNET_STRINGS_relative_time_to_string (out_delay,
1526 * @param session the session to free
1529 free_session (struct GNUNET_ATS_Session *session)
1531 struct Neighbour *neighbour = session->neighbour;
1532 struct TransportClient *tc = session->tc;
1533 struct MonitorEvent me = {
1534 .cs = GNUNET_TRANSPORT_CS_DOWN,
1535 .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1537 struct QueueEntry *qe;
1540 if (NULL != session->transmit_task)
1542 GNUNET_SCHEDULER_cancel (session->transmit_task);
1543 session->transmit_task = NULL;
1545 GNUNET_CONTAINER_MDLL_remove (neighbour,
1546 neighbour->session_head,
1547 neighbour->session_tail,
1549 GNUNET_CONTAINER_MDLL_remove (client,
1550 tc->details.communicator.session_head,
1551 tc->details.communicator.session_tail,
1553 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1554 while (NULL != (qe = session->queue_head))
1556 GNUNET_CONTAINER_DLL_remove (session->queue_head,
1557 session->queue_tail,
1559 session->queue_length--;
1560 tc->details.communicator.total_queue_length--;
1563 GNUNET_assert (0 == session->queue_length);
1565 (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
1567 /* Communicator dropped below threshold, resume all queues */
1568 GNUNET_STATISTICS_update (GST_stats,
1569 "# Transmission throttled due to communicator queue limit",
1572 for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
1575 schedule_transmit_on_queue (s);
1577 notify_monitors (&neighbour->pid,
1581 GNUNET_ATS_session_del (session->sr);
1582 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
1583 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
1584 GNUNET_free (session);
1585 if (NULL == neighbour->session_head)
1587 cores_send_disconnect_info (&neighbour->pid);
1588 free_neighbour (neighbour);
1596 * @param ale address list entry to free
1599 free_address_list_entry (struct AddressListEntry *ale)
1601 struct TransportClient *tc = ale->tc;
1603 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
1604 tc->details.communicator.addr_tail,
1606 if (NULL != ale->sc)
1608 GNUNET_PEERSTORE_store_cancel (ale->sc);
1611 if (NULL != ale->st)
1613 GNUNET_SCHEDULER_cancel (ale->st);
1621 * Called whenever a client is disconnected. Frees our
1622 * resources associated with that client.
1624 * @param cls closure, NULL
1625 * @param client identification of the client
1626 * @param app_ctx our `struct TransportClient`
1629 client_disconnect_cb (void *cls,
1630 struct GNUNET_SERVICE_Client *client,
1633 struct TransportClient *tc = app_ctx;
1635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636 "Client %p disconnected, cleaning up.\n",
1638 GNUNET_CONTAINER_DLL_remove (clients_head,
1647 struct PendingMessage *pm;
1649 while (NULL != (pm = tc->details.core.pending_msg_head))
1651 GNUNET_CONTAINER_MDLL_remove (client,
1652 tc->details.core.pending_msg_head,
1653 tc->details.core.pending_msg_tail,
1661 case CT_COMMUNICATOR:
1663 struct GNUNET_ATS_Session *q;
1664 struct AddressListEntry *ale;
1666 while (NULL != (q = tc->details.communicator.session_head))
1668 while (NULL != (ale = tc->details.communicator.addr_head))
1669 free_address_list_entry (ale);
1670 GNUNET_free (tc->details.communicator.address_prefix);
1679 * Iterator telling new CORE client about all existing
1680 * connections to peers.
1682 * @param cls the new `struct TransportClient`
1683 * @param pid a connected peer
1684 * @param value the `struct Neighbour` with more information
1685 * @return #GNUNET_OK (continue to iterate)
1688 notify_client_connect_info (void *cls,
1689 const struct GNUNET_PeerIdentity *pid,
1692 struct TransportClient *tc = cls;
1693 struct Neighbour *neighbour = value;
1695 core_send_connect_info (tc,
1697 neighbour->quota_out);
1703 * Initialize a "CORE" client. We got a start message from this
1704 * client, so add it to the list of clients for broadcasting of
1707 * @param cls the client
1708 * @param start the start message that was sent
1711 handle_client_start (void *cls,
1712 const struct StartMessage *start)
1714 struct TransportClient *tc = cls;
1717 options = ntohl (start->options);
1718 if ( (0 != (1 & options)) &&
1720 memcmp (&start->self,
1722 sizeof (struct GNUNET_PeerIdentity)) ) )
1724 /* client thinks this is a different peer, reject */
1726 GNUNET_SERVICE_client_drop (tc->client);
1729 if (CT_NONE != tc->type)
1732 GNUNET_SERVICE_client_drop (tc->client);
1736 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1737 ¬ify_client_connect_info,
1739 GNUNET_SERVICE_client_continue (tc->client);
1744 * Client asked for transmission to a peer. Process the request.
1746 * @param cls the client
1747 * @param obm the send message that was sent
1750 check_client_send (void *cls,
1751 const struct OutboundMessage *obm)
1753 struct TransportClient *tc = cls;
1755 const struct GNUNET_MessageHeader *obmm;
1757 if (CT_CORE != tc->type)
1760 return GNUNET_SYSERR;
1762 size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
1763 if (size < sizeof (struct GNUNET_MessageHeader))
1766 return GNUNET_SYSERR;
1768 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1769 if (size != ntohs (obmm->size))
1772 return GNUNET_SYSERR;
1779 * Free fragment tree below @e root, excluding @e root itself.
1781 * @param root root of the tree to free
1784 free_fragment_tree (struct PendingMessage *root)
1786 struct PendingMessage *frag;
1788 while (NULL != (frag = root->head_frag))
1790 free_fragment_tree (frag);
1791 GNUNET_CONTAINER_MDLL_remove (frag,
1801 * Send a response to the @a pm that we have processed a
1802 * "send" request with status @a success. We
1803 * transmitted @a bytes_physical on the actual wire.
1804 * Sends a confirmation to the "core" client responsible
1805 * for the original request and free's @a pm.
1807 * @param pm handle to the original pending message
1808 * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
1809 * for transmission failure
1810 * @param bytes_physical amount of bandwidth consumed
1813 client_send_response (struct PendingMessage *pm,
1815 uint32_t bytes_physical)
1817 struct TransportClient *tc = pm->client;
1818 struct Neighbour *target = pm->target;
1819 struct GNUNET_MQ_Envelope *env;
1820 struct SendOkMessage *som;
1824 env = GNUNET_MQ_msg (som,
1825 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1826 som->success = htonl ((uint32_t) success);
1827 som->bytes_msg = htons (pm->bytes_msg);
1828 som->bytes_physical = htonl (bytes_physical);
1829 som->peer = target->pid;
1830 GNUNET_MQ_send (tc->mq,
1832 GNUNET_CONTAINER_MDLL_remove (client,
1833 tc->details.core.pending_msg_head,
1834 tc->details.core.pending_msg_tail,
1837 GNUNET_CONTAINER_MDLL_remove (neighbour,
1838 target->pending_msg_head,
1839 target->pending_msg_tail,
1841 free_fragment_tree (pm);
1847 * Checks the message queue for a neighbour for messages that have timed
1848 * out and purges them.
1850 * @param cls a `struct Neighbour`
1853 check_queue_timeouts (void *cls)
1855 struct Neighbour *n = cls;
1856 struct PendingMessage *pm;
1857 struct GNUNET_TIME_Absolute now;
1858 struct GNUNET_TIME_Absolute earliest_timeout;
1860 n->timeout_task = NULL;
1861 earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1862 now = GNUNET_TIME_absolute_get ();
1863 for (struct PendingMessage *pos = n->pending_msg_head;
1867 pm = pos->next_neighbour;
1868 if (pos->timeout.abs_value_us <= now.abs_value_us)
1870 GNUNET_STATISTICS_update (GST_stats,
1871 "# messages dropped (timeout before confirmation)",
1874 client_send_response (pm,
1879 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
1882 n->earliest_timeout = earliest_timeout;
1883 if (NULL != n->pending_msg_head)
1884 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
1885 &check_queue_timeouts,
1891 * Client asked for transmission to a peer. Process the request.
1893 * @param cls the client
1894 * @param obm the send message that was sent
1897 handle_client_send (void *cls,
1898 const struct OutboundMessage *obm)
1900 struct TransportClient *tc = cls;
1901 struct PendingMessage *pm;
1902 const struct GNUNET_MessageHeader *obmm;
1903 struct Neighbour *target;
1906 GNUNET_assert (CT_CORE == tc->type);
1907 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1908 bytes_msg = ntohs (obmm->size);
1909 target = lookup_neighbour (&obm->peer);
1912 /* Failure: don't have this peer as a neighbour (anymore).
1913 Might have gone down asynchronously, so this is NOT
1914 a protocol violation by CORE. Still count the event,
1915 as this should be rare. */
1916 struct GNUNET_MQ_Envelope *env;
1917 struct SendOkMessage *som;
1919 env = GNUNET_MQ_msg (som,
1920 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1921 som->success = htonl (GNUNET_SYSERR);
1922 som->bytes_msg = htonl (bytes_msg);
1923 som->bytes_physical = htonl (0);
1924 som->peer = obm->peer;
1925 GNUNET_MQ_send (tc->mq,
1927 GNUNET_SERVICE_client_continue (tc->client);
1928 GNUNET_STATISTICS_update (GST_stats,
1929 "# messages dropped (neighbour unknown)",
1934 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
1936 pm->target = target;
1937 pm->bytes_msg = bytes_msg;
1938 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
1942 GNUNET_CONTAINER_MDLL_insert (neighbour,
1943 target->pending_msg_head,
1944 target->pending_msg_tail,
1946 GNUNET_CONTAINER_MDLL_insert (client,
1947 tc->details.core.pending_msg_head,
1948 tc->details.core.pending_msg_tail,
1950 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
1952 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
1953 if (NULL != target->timeout_task)
1954 GNUNET_SCHEDULER_cancel (target->timeout_task);
1955 target->timeout_task
1956 = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
1957 &check_queue_timeouts,
1964 * Communicator started. Test message is well-formed.
1966 * @param cls the client
1967 * @param cam the send message that was sent
1970 check_communicator_available (void *cls,
1971 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1973 struct TransportClient *tc = cls;
1976 if (CT_NONE != tc->type)
1979 return GNUNET_SYSERR;
1981 tc->type = CT_COMMUNICATOR;
1982 size = ntohs (cam->header.size) - sizeof (*cam);
1984 return GNUNET_OK; /* receive-only communicator */
1985 GNUNET_MQ_check_zero_termination (cam);
1991 * Communicator started. Process the request.
1993 * @param cls the client
1994 * @param cam the send message that was sent
1997 handle_communicator_available (void *cls,
1998 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2000 struct TransportClient *tc = cls;
2003 size = ntohs (cam->header.size) - sizeof (*cam);
2005 return; /* receive-only communicator */
2006 tc->details.communicator.address_prefix
2007 = GNUNET_strdup ((const char *) &cam[1]);
2008 tc->details.communicator.cc
2009 = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2010 GNUNET_SERVICE_client_continue (tc->client);
2015 * Address of our peer added. Test message is well-formed.
2017 * @param cls the client
2018 * @param aam the send message that was sent
2021 check_add_address (void *cls,
2022 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2024 struct TransportClient *tc = cls;
2026 if (CT_COMMUNICATOR != tc->type)
2029 return GNUNET_SYSERR;
2031 GNUNET_MQ_check_zero_termination (aam);
2037 * Ask peerstore to store our address.
2039 * @param cls an `struct AddressListEntry *`
2042 store_pi (void *cls);
2046 * Function called when peerstore is done storing our address.
2049 peerstore_store_cb (void *cls,
2052 struct AddressListEntry *ale = cls;
2055 if (GNUNET_YES != success)
2056 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2057 "Failed to store our own address `%s' in peerstore!\n",
2059 /* refresh period is 1/4 of expiration time, that should be plenty
2060 without being excessive. */
2061 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2069 * Ask peerstore to store our address.
2071 * @param cls an `struct AddressListEntry *`
2074 store_pi (void *cls)
2076 struct AddressListEntry *ale = cls;
2079 struct GNUNET_TIME_Absolute expiration;
2082 expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2083 GNUNET_HELLO_sign_address (ale->address,
2089 ale->sc = GNUNET_PEERSTORE_store (peerstore,
2092 GNUNET_HELLO_PEERSTORE_KEY,
2096 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2097 &peerstore_store_cb,
2100 if (NULL == ale->sc)
2102 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2103 "Failed to store our address `%s' with peerstore\n",
2105 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2113 * Address of our peer added. Process the request.
2115 * @param cls the client
2116 * @param aam the send message that was sent
2119 handle_add_address (void *cls,
2120 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2122 struct TransportClient *tc = cls;
2123 struct AddressListEntry *ale;
2126 slen = ntohs (aam->header.size) - sizeof (*aam);
2127 ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2129 ale->address = (const char *) &ale[1];
2130 ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2131 ale->aid = aam->aid;
2132 ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2136 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2137 tc->details.communicator.addr_tail,
2139 ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2141 GNUNET_SERVICE_client_continue (tc->client);
2146 * Address of our peer deleted. Process the request.
2148 * @param cls the client
2149 * @param dam the send message that was sent
2152 handle_del_address (void *cls,
2153 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2155 struct TransportClient *tc = cls;
2157 if (CT_COMMUNICATOR != tc->type)
2160 GNUNET_SERVICE_client_drop (tc->client);
2163 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2167 if (dam->aid != ale->aid)
2169 GNUNET_assert (ale->tc == tc);
2170 free_address_list_entry (ale);
2171 GNUNET_SERVICE_client_continue (tc->client);
2174 GNUNET_SERVICE_client_drop (tc->client);
2179 * Client notified us about transmission from a peer. Process the request.
2181 * @param cls the client
2182 * @param obm the send message that was sent
2185 check_incoming_msg (void *cls,
2186 const struct GNUNET_TRANSPORT_IncomingMessage *im)
2188 struct TransportClient *tc = cls;
2190 const struct GNUNET_MessageHeader *obmm;
2192 if (CT_COMMUNICATOR != tc->type)
2195 return GNUNET_SYSERR;
2197 size = ntohs (im->header.size) - sizeof (*im);
2198 if (size < sizeof (struct GNUNET_MessageHeader))
2201 return GNUNET_SYSERR;
2203 obmm = (const struct GNUNET_MessageHeader *) &im[1];
2204 if (size != ntohs (obmm->size))
2207 return GNUNET_SYSERR;
2214 * Incoming meessage. Process the request.
2216 * @param cls the client
2217 * @param im the send message that was received
2220 handle_incoming_msg (void *cls,
2221 const struct GNUNET_TRANSPORT_IncomingMessage *im)
2223 struct TransportClient *tc = cls;
2225 GNUNET_SERVICE_client_continue (tc->client);
2230 * New queue became available. Check message.
2232 * @param cls the client
2233 * @param aqm the send message that was sent
2236 check_add_queue_message (void *cls,
2237 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2239 struct TransportClient *tc = cls;
2241 if (CT_COMMUNICATOR != tc->type)
2244 return GNUNET_SYSERR;
2246 GNUNET_MQ_check_zero_termination (aqm);
2252 * Bandwidth tracker informs us that the delay until we should receive
2255 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2258 tracker_update_in_cb (void *cls)
2260 struct GNUNET_ATS_Session *queue = cls;
2261 struct GNUNET_TIME_Relative in_delay;
2264 rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
2265 in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
2267 // FIXME: how exactly do we do inbound flow control?
2272 * Fragment the given @a pm to the given @a mtu. Adds
2273 * additional fragments to the neighbour as well. If the
2274 * @a mtu is too small, generates and error for the @a pm
2277 * @param pm pending message to fragment for transmission
2278 * @param mtu MTU to apply
2279 * @return new message to transmit
2281 static struct PendingMessage *
2282 fragment_message (struct PendingMessage *pm,
2285 struct PendingMessage *ff;
2287 if (GNUNET_NO == pm->msg_uuid_set)
2289 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
2291 sizeof (pm->msg_uuid));
2292 pm->msg_uuid_set = GNUNET_YES;
2295 /* This invariant is established in #handle_add_queue_message() */
2296 GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
2298 /* select fragment for transmission, descending the tree if it has
2299 been expanded until we are at a leaf or at a fragment that is small enough */
2301 while ( ( (ff->bytes_msg > mtu) ||
2303 (ff->frag_off == ff->bytes_msg) &&
2304 (NULL != ff->head_frag) )
2306 ff = ff->head_frag; /* descent into fragmented fragments */
2309 if ( ( (ff->bytes_msg > mtu) ||
2311 (pm->frag_off < pm->bytes_msg) )
2313 /* Did not yet calculate all fragments, calculate next fragment */
2314 struct PendingMessage *frag;
2315 struct TransportFragmentBox tfb;
2323 orig = (const char *) &ff[1];
2324 msize = ff->bytes_msg;
2327 const struct TransportFragmentBox *tfbo;
2329 tfbo = (const struct TransportFragmentBox *) orig;
2330 orig += sizeof (struct TransportFragmentBox);
2331 msize -= sizeof (struct TransportFragmentBox);
2332 xoff = ntohs (tfbo->frag_off);
2334 fragmax = mtu - sizeof (struct TransportFragmentBox);
2335 fragsize = GNUNET_MIN (msize - ff->frag_off,
2337 frag = GNUNET_malloc (sizeof (struct PendingMessage) +
2338 sizeof (struct TransportFragmentBox) +
2340 frag->target = pm->target;
2341 frag->frag_parent = ff;
2342 frag->timeout = pm->timeout;
2343 frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
2344 frag->pmt = PMT_FRAGMENT_BOX;
2345 msg = (char *) &frag[1];
2346 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
2347 tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
2349 tfb.frag_uuid = htonl (pm->frag_uuidgen++);
2350 tfb.msg_uuid = pm->msg_uuid;
2351 tfb.frag_off = htons (ff->frag_off + xoff);
2352 tfb.msg_size = htons (pm->bytes_msg);
2356 memcpy (&msg[sizeof (tfb)],
2357 &orig[ff->frag_off],
2359 GNUNET_CONTAINER_MDLL_insert (frag,
2363 ff->frag_off += fragsize;
2367 /* Move head to the tail and return it */
2368 GNUNET_CONTAINER_MDLL_remove (frag,
2369 ff->frag_parent->head_frag,
2370 ff->frag_parent->tail_frag,
2372 GNUNET_CONTAINER_MDLL_insert_tail (frag,
2373 ff->frag_parent->head_frag,
2374 ff->frag_parent->tail_frag,
2381 * Reliability-box the given @a pm. On error (can there be any), NULL
2382 * may be returned, otherwise the "replacement" for @a pm (which
2383 * should then be added to the respective neighbour's queue instead of
2384 * @a pm). If the @a pm is already fragmented or reliability boxed,
2385 * or itself an ACK, this function simply returns @a pm.
2387 * @param pm pending message to box for transmission over unreliabile queue
2388 * @return new message to transmit
2390 static struct PendingMessage *
2391 reliability_box_message (struct PendingMessage *pm)
2393 if (PMT_CORE != pm->pmt)
2395 /* already fragmented or reliability boxed, or control message: do nothing */
2403 client_send_response (pm,
2409 /* FIXME: return boxed PM here! */
2415 * We believe we are ready to transmit a message on a queue. Double-checks
2416 * with the queue's "tracker_out" and then gives the message to the
2417 * communicator for transmission (updating the tracker, and re-scheduling
2418 * itself if applicable).
2420 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
2423 transmit_on_queue (void *cls)
2425 struct GNUNET_ATS_Session *queue = cls;
2426 struct Neighbour *n = queue->neighbour;
2427 struct QueueEntry *qe;
2428 struct PendingMessage *pm;
2429 struct PendingMessage *s;
2431 struct GNUNET_TRANSPORT_SendMessageTo *smt;
2432 struct GNUNET_MQ_Envelope *env;
2434 queue->transmit_task = NULL;
2435 if (NULL == (pm = n->pending_msg_head))
2437 /* no message pending, nothing to do here! */
2440 schedule_transmit_on_queue (queue);
2441 if (NULL != queue->transmit_task)
2442 return; /* do it later */
2444 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
2445 overhead += sizeof (struct TransportReliabilityBox);
2447 if ( ( (0 != queue->mtu) &&
2448 (pm->bytes_msg + overhead > queue->mtu) ) ||
2449 (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
2450 (NULL != pm->head_frag /* fragments already exist, should
2451 respect that even if MTU is 0 for
2453 s = fragment_message (s,
2455 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
2459 /* Fragmentation failed, try next message... */
2460 schedule_transmit_on_queue (queue);
2463 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
2464 s = reliability_box_message (s);
2467 /* Reliability boxing failed, try next message... */
2468 schedule_transmit_on_queue (queue);
2472 /* Pass 's' for transission to the communicator */
2473 qe = GNUNET_new (struct QueueEntry);
2474 qe->mid = queue->mid_gen++;
2475 qe->session = queue;
2476 // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
2477 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
2480 env = GNUNET_MQ_msg_extra (smt,
2482 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
2483 smt->qid = queue->qid;
2485 smt->receiver = n->pid;
2489 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
2490 queue->queue_length++;
2491 queue->tc->details.communicator.total_queue_length++;
2492 GNUNET_MQ_send (queue->tc->mq,
2495 // FIXME: do something similar to the logic below
2496 // in defragmentation / reliability ACK handling!
2498 /* Check if this transmission somehow conclusively finished handing 'pm'
2499 even without any explicit ACKs */
2500 if ( (PMT_CORE == s->pmt) &&
2501 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
2503 /* Full message sent, and over reliabile channel */
2504 client_send_response (pm,
2508 else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
2509 (PMT_FRAGMENT_BOX == s->pmt) )
2511 struct PendingMessage *pos;
2513 /* Fragment sent over reliabile channel */
2514 free_fragment_tree (s);
2515 pos = s->frag_parent;
2516 GNUNET_CONTAINER_MDLL_remove (frag,
2521 /* check if subtree is done */
2522 while ( (NULL == pos->head_frag) &&
2523 (pos->frag_off == pos->bytes_msg) &&
2527 pos = s->frag_parent;
2528 GNUNET_CONTAINER_MDLL_remove (frag,
2535 /* Was this the last applicable fragmment? */
2536 if ( (NULL == pm->head_frag) &&
2537 (pm->frag_off == pm->bytes_msg) )
2538 client_send_response (pm,
2540 pm->bytes_msg /* FIXME: calculate and add overheads! */);
2542 else if (PMT_CORE != pm->pmt)
2544 /* This was an acknowledgement of some type, always free */
2546 struct Neighbour *neighbour = pm->target;
2547 GNUNET_CONTAINER_MDLL_remove (neighbour,
2548 neighbour->pending_msg_head,
2549 neighbour->pending_msg_tail,
2555 /* message not finished, waiting for acknowledgement */
2556 // FIXME: update time by which we might retransmit 's' based on
2557 // queue characteristics (i.e. RTT)
2559 // FIXME: move 'pm' back in the transmission queue (simplistic: to
2560 // the end, better: with position depending on type, timeout,
2564 /* finally, re-schedule self */
2565 schedule_transmit_on_queue (queue);
2570 * Bandwidth tracker informs us that the delay until we
2571 * can transmit again changed.
2573 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2576 tracker_update_out_cb (void *cls)
2578 struct GNUNET_ATS_Session *queue = cls;
2579 struct Neighbour *n = queue->neighbour;
2581 if (NULL == n->pending_msg_head)
2583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2584 "Bandwidth allocation updated for empty transmission queue `%s'\n",
2586 return; /* no message pending, nothing to do here! */
2588 GNUNET_SCHEDULER_cancel (queue->transmit_task);
2589 queue->transmit_task = NULL;
2590 schedule_transmit_on_queue (queue);
2595 * Bandwidth tracker informs us that excessive outbound bandwidth was
2596 * allocated which is not being used.
2598 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
2601 tracker_excess_out_cb (void *cls)
2603 /* FIXME: trigger excess bandwidth report to core? Right now,
2604 this is done internally within transport_api2_core already,
2605 but we probably want to change the logic and trigger it
2606 from here via a message instead! */
2607 /* TODO: maybe inform ATS at this point? */
2608 GNUNET_STATISTICS_update (GST_stats,
2609 "# Excess outbound bandwidth reported",
2617 * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
2618 * which is not being used.
2620 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
2623 tracker_excess_in_cb (void *cls)
2625 /* TODO: maybe inform ATS at this point? */
2626 GNUNET_STATISTICS_update (GST_stats,
2627 "# Excess inbound bandwidth reported",
2634 * New queue became available. Process the request.
2636 * @param cls the client
2637 * @param aqm the send message that was sent
2640 handle_add_queue_message (void *cls,
2641 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2643 struct TransportClient *tc = cls;
2644 struct GNUNET_ATS_Session *queue;
2645 struct Neighbour *neighbour;
2649 if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
2651 /* MTU so small as to be useless for transmissions,
2652 required for #fragment_message()! */
2653 GNUNET_break_op (0);
2654 GNUNET_SERVICE_client_drop (tc->client);
2657 neighbour = lookup_neighbour (&aqm->receiver);
2658 if (NULL == neighbour)
2660 neighbour = GNUNET_new (struct Neighbour);
2661 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2662 neighbour->pid = aqm->receiver;
2663 GNUNET_assert (GNUNET_OK ==
2664 GNUNET_CONTAINER_multipeermap_put (neighbours,
2667 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2668 cores_send_connect_info (&neighbour->pid,
2669 GNUNET_BANDWIDTH_ZERO);
2671 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
2672 addr = (const char *) &aqm[1];
2674 queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
2676 queue->address = (const char *) &queue[1];
2677 queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
2678 queue->qid = aqm->qid;
2679 queue->mtu = ntohl (aqm->mtu);
2680 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
2681 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
2682 queue->neighbour = neighbour;
2683 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
2684 &tracker_update_in_cb,
2686 GNUNET_BANDWIDTH_ZERO,
2687 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
2688 &tracker_excess_in_cb,
2690 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
2691 &tracker_update_out_cb,
2693 GNUNET_BANDWIDTH_ZERO,
2694 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
2695 &tracker_excess_out_cb,
2700 /* notify ATS about new queue */
2702 struct GNUNET_ATS_Properties prop = {
2703 .delay = GNUNET_TIME_UNIT_FOREVER_REL,
2706 .cc = tc->details.communicator.cc
2709 queue->sr = GNUNET_ATS_session_add (ats,
2714 if (NULL == queue->sr)
2716 /* This can only happen if the 'address' was way too long for ATS
2717 (approaching 64k in strlen()!). In this case, the communicator
2718 must be buggy and we drop it. */
2720 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2721 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2722 GNUNET_free (queue);
2723 if (NULL == neighbour->session_head)
2725 cores_send_disconnect_info (&neighbour->pid);
2726 free_neighbour (neighbour);
2728 GNUNET_SERVICE_client_drop (tc->client);
2732 /* notify monitors about new queue */
2734 struct MonitorEvent me = {
2739 notify_monitors (&neighbour->pid,
2744 GNUNET_CONTAINER_MDLL_insert (neighbour,
2745 neighbour->session_head,
2746 neighbour->session_tail,
2748 GNUNET_CONTAINER_MDLL_insert (client,
2749 tc->details.communicator.session_head,
2750 tc->details.communicator.session_tail,
2752 GNUNET_SERVICE_client_continue (tc->client);
2757 * Queue to a peer went down. Process the request.
2759 * @param cls the client
2760 * @param dqm the send message that was sent
2763 handle_del_queue_message (void *cls,
2764 const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
2766 struct TransportClient *tc = cls;
2768 if (CT_COMMUNICATOR != tc->type)
2771 GNUNET_SERVICE_client_drop (tc->client);
2774 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2776 session = session->next_client)
2778 struct Neighbour *neighbour = session->neighbour;
2780 if ( (dqm->qid != session->qid) ||
2781 (0 != memcmp (&dqm->receiver,
2783 sizeof (struct GNUNET_PeerIdentity))) )
2785 free_session (session);
2786 GNUNET_SERVICE_client_continue (tc->client);
2790 GNUNET_SERVICE_client_drop (tc->client);
2795 * Message was transmitted. Process the request.
2797 * @param cls the client
2798 * @param sma the send message that was sent
2801 handle_send_message_ack (void *cls,
2802 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
2804 struct TransportClient *tc = cls;
2805 struct QueueEntry *queue;
2807 if (CT_COMMUNICATOR != tc->type)
2810 GNUNET_SERVICE_client_drop (tc->client);
2814 /* find our queue entry matching the ACK */
2816 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2818 session = session->next_client)
2820 if (0 != memcmp (&session->neighbour->pid,
2822 sizeof (struct GNUNET_PeerIdentity)))
2824 for (struct QueueEntry *qe = session->queue_head;
2828 if (qe->mid != sma->mid)
2837 /* this should never happen */
2839 GNUNET_SERVICE_client_drop (tc->client);
2842 GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
2843 queue->session->queue_tail,
2845 queue->session->queue_length--;
2846 tc->details.communicator.total_queue_length--;
2847 GNUNET_SERVICE_client_continue (tc->client);
2849 /* if applicable, resume transmissions that waited on ACK */
2850 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
2852 /* Communicator dropped below threshold, resume all queues */
2853 GNUNET_STATISTICS_update (GST_stats,
2854 "# Transmission throttled due to communicator queue limit",
2857 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2859 session = session->next_client)
2860 schedule_transmit_on_queue (session);
2862 else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
2864 /* queue dropped below threshold; only resume this one queue */
2865 GNUNET_STATISTICS_update (GST_stats,
2866 "# Transmission throttled due to session queue limit",
2869 schedule_transmit_on_queue (queue->session);
2872 /* TODO: we also should react on the status! */
2873 // FIXME: this probably requires queue->pm = s assignment!
2874 // FIXME: react to communicator status about transmission request. We got:
2875 sma->status; // OK success, SYSERR failure
2877 GNUNET_free (queue);
2882 * Iterator telling new MONITOR client about all existing
2885 * @param cls the new `struct TransportClient`
2886 * @param pid a connected peer
2887 * @param value the `struct Neighbour` with more information
2888 * @return #GNUNET_OK (continue to iterate)
2891 notify_client_queues (void *cls,
2892 const struct GNUNET_PeerIdentity *pid,
2895 struct TransportClient *tc = cls;
2896 struct Neighbour *neighbour = value;
2898 GNUNET_assert (CT_MONITOR == tc->type);
2899 for (struct GNUNET_ATS_Session *q = neighbour->session_head;
2901 q = q->next_neighbour)
2903 struct MonitorEvent me = {
2906 .num_msg_pending = q->num_msg_pending,
2907 .num_bytes_pending = q->num_bytes_pending
2921 * Initialize a monitor client.
2923 * @param cls the client
2924 * @param start the start message that was sent
2927 handle_monitor_start (void *cls,
2928 const struct GNUNET_TRANSPORT_MonitorStart *start)
2930 struct TransportClient *tc = cls;
2932 if (CT_NONE != tc->type)
2935 GNUNET_SERVICE_client_drop (tc->client);
2938 tc->type = CT_MONITOR;
2939 tc->details.monitor.peer = start->peer;
2940 tc->details.monitor.one_shot = ntohl (start->one_shot);
2941 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2942 ¬ify_client_queues,
2944 GNUNET_SERVICE_client_mark_monitor (tc->client);
2945 GNUNET_SERVICE_client_continue (tc->client);
2950 * Signature of a function called by ATS with the current bandwidth
2951 * allocation to be used as determined by ATS.
2953 * @param cls closure, NULL
2954 * @param session session this is about
2955 * @param bandwidth_out assigned outbound bandwidth for the connection,
2956 * 0 to signal disconnect
2957 * @param bandwidth_in assigned inbound bandwidth for the connection,
2958 * 0 to signal disconnect
2961 ats_allocation_cb (void *cls,
2962 struct GNUNET_ATS_Session *session,
2963 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
2964 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
2967 GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
2969 GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
2975 * Find transport client providing communication service
2976 * for the protocol @a prefix.
2978 * @param prefix communicator name
2979 * @return NULL if no such transport client is available
2981 static struct TransportClient *
2982 lookup_communicator (const char *prefix)
2984 for (struct TransportClient *tc = clients_head;
2988 if (CT_COMMUNICATOR != tc->type)
2990 if (0 == strcmp (prefix,
2991 tc->details.communicator.address_prefix))
2994 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2995 "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
3002 * Signature of a function called by ATS suggesting transport to
3003 * try connecting with a particular address.
3005 * @param cls closure, NULL
3006 * @param pid target peer
3007 * @param address the address to try
3010 ats_suggestion_cb (void *cls,
3011 const struct GNUNET_PeerIdentity *pid,
3012 const char *address)
3014 static uint32_t idgen;
3015 struct TransportClient *tc;
3017 struct GNUNET_TRANSPORT_CreateQueue *cqm;
3018 struct GNUNET_MQ_Envelope *env;
3022 prefix = GNUNET_HELLO_address_to_prefix (address);
3025 GNUNET_break (0); /* ATS gave invalid address!? */
3028 tc = lookup_communicator (prefix);
3031 GNUNET_STATISTICS_update (GST_stats,
3032 "# ATS suggestions ignored due to missing communicator",
3037 /* forward suggestion for queue creation to communicator */
3038 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3039 "Request #%u for `%s' communicator to create queue to `%s'\n",
3040 (unsigned int) idgen,
3043 alen = strlen (address) + 1;
3044 env = GNUNET_MQ_msg_extra (cqm,
3046 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
3047 cqm->request_id = htonl (idgen++);
3048 cqm->receiver = *pid;
3052 GNUNET_MQ_send (tc->mq,
3058 * Communicator tells us that our request to create a queue "worked", that
3059 * is setting up the queue is now in process.
3061 * @param cls the `struct TransportClient`
3062 * @param cqr confirmation message
3065 handle_queue_create_ok (void *cls,
3066 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
3068 struct TransportClient *tc = cls;
3070 if (CT_COMMUNICATOR != tc->type)
3073 GNUNET_SERVICE_client_drop (tc->client);
3076 GNUNET_STATISTICS_update (GST_stats,
3077 "# ATS suggestions succeeded at communicator",
3080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3081 "Request #%u for communicator to create queue succeeded\n",
3082 (unsigned int) ntohs (cqr->request_id));
3083 GNUNET_SERVICE_client_continue (tc->client);
3088 * Communicator tells us that our request to create a queue failed. This usually
3089 * indicates that the provided address is simply invalid or that the communicator's
3090 * resources are exhausted.
3092 * @param cls the `struct TransportClient`
3093 * @param cqr failure message
3096 handle_queue_create_fail (void *cls,
3097 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
3099 struct TransportClient *tc = cls;
3101 if (CT_COMMUNICATOR != tc->type)
3104 GNUNET_SERVICE_client_drop (tc->client);
3107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3108 "Request #%u for communicator to create queue failed\n",
3109 (unsigned int) ntohs (cqr->request_id));
3110 GNUNET_STATISTICS_update (GST_stats,
3111 "# ATS suggestions failed in queue creation at communicator",
3114 GNUNET_SERVICE_client_continue (tc->client);
3119 * Free neighbour entry.
3123 * @param value a `struct Neighbour`
3124 * @return #GNUNET_OK (always)
3127 free_neighbour_cb (void *cls,
3128 const struct GNUNET_PeerIdentity *pid,
3131 struct Neighbour *neighbour = value;
3135 GNUNET_break (0); // should this ever happen?
3136 free_neighbour (neighbour);
3143 * Free ephemeral entry.
3147 * @param value a `struct Neighbour`
3148 * @return #GNUNET_OK (always)
3151 free_ephemeral_cb (void *cls,
3152 const struct GNUNET_PeerIdentity *pid,
3155 struct EphemeralCacheEntry *ece = value;
3159 free_ephemeral (ece);
3165 * Function called when the service shuts down. Unloads our plugins
3166 * and cancels pending validations.
3168 * @param cls closure, unused
3171 do_shutdown (void *cls)
3175 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
3180 GNUNET_ATS_transport_done (ats);
3183 if (NULL != peerstore)
3185 GNUNET_PEERSTORE_disconnect (peerstore,
3189 if (NULL != GST_stats)
3191 GNUNET_STATISTICS_destroy (GST_stats,
3195 if (NULL != GST_my_private_key)
3197 GNUNET_free (GST_my_private_key);
3198 GST_my_private_key = NULL;
3200 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
3202 GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
3205 GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
3206 ephemeral_map = NULL;
3207 GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
3208 ephemeral_heap = NULL;
3213 * Initiate transport service.
3215 * @param cls closure
3216 * @param c configuration to use
3217 * @param service the initialized service
3221 const struct GNUNET_CONFIGURATION_Handle *c,
3222 struct GNUNET_SERVICE_Handle *service)
3227 neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
3229 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
3231 ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3232 GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
3233 if (NULL == GST_my_private_key)
3235 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3236 _("Transport service is lacking key configuration settings. Exiting.\n"));
3237 GNUNET_SCHEDULER_shutdown ();
3240 GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
3241 &GST_my_identity.public_key);
3242 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
3243 "My identity is `%s'\n",
3244 GNUNET_i2s_full (&GST_my_identity));
3245 GST_stats = GNUNET_STATISTICS_create ("transport",
3247 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
3249 peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
3250 if (NULL == peerstore)
3253 GNUNET_SCHEDULER_shutdown ();
3256 ats = GNUNET_ATS_transport_init (GST_cfg,
3264 GNUNET_SCHEDULER_shutdown ();
3271 * Define "main" method using service macro.
3275 GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
3278 &client_disconnect_cb,
3280 /* communication with core */
3281 GNUNET_MQ_hd_fixed_size (client_start,
3282 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
3283 struct StartMessage,
3285 GNUNET_MQ_hd_var_size (client_send,
3286 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
3287 struct OutboundMessage,
3289 /* communication with communicators */
3290 GNUNET_MQ_hd_var_size (communicator_available,
3291 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
3292 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
3294 GNUNET_MQ_hd_var_size (add_address,
3295 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
3296 struct GNUNET_TRANSPORT_AddAddressMessage,
3298 GNUNET_MQ_hd_fixed_size (del_address,
3299 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
3300 struct GNUNET_TRANSPORT_DelAddressMessage,
3302 GNUNET_MQ_hd_var_size (incoming_msg,
3303 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
3304 struct GNUNET_TRANSPORT_IncomingMessage,
3306 GNUNET_MQ_hd_fixed_size (queue_create_ok,
3307 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
3308 struct GNUNET_TRANSPORT_CreateQueueResponse,
3310 GNUNET_MQ_hd_fixed_size (queue_create_fail,
3311 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
3312 struct GNUNET_TRANSPORT_CreateQueueResponse,
3314 GNUNET_MQ_hd_var_size (add_queue_message,
3315 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
3316 struct GNUNET_TRANSPORT_AddQueueMessage,
3318 GNUNET_MQ_hd_fixed_size (del_queue_message,
3319 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
3320 struct GNUNET_TRANSPORT_DelQueueMessage,
3322 GNUNET_MQ_hd_fixed_size (send_message_ack,
3323 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
3324 struct GNUNET_TRANSPORT_SendMessageToAck,
3326 /* communication with monitors */
3327 GNUNET_MQ_hd_fixed_size (monitor_start,
3328 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
3329 struct GNUNET_TRANSPORT_MonitorStart,
3331 GNUNET_MQ_handler_end ());
3334 /* end of file gnunet-service-transport.c */